workerman使用redis-queue出现多次消费的情况

神奇的海螺

问题描述

使用workerman的redis-queue,日志里面会出现多次消费记录,望大佬们帮忙看下

程序代码或配置

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;

//日志
use Monolog\Logger;
use Monolog\Handler\StreamHandler;

require_once __DIR__ . '/../function.php';

$consumer = new Worker();

$consumer->count = 8;

$consumer->onWorkerStart = function($consumer){

    global $db, $config, $logger;

    //数据库链接
    $db = new \Workerman\MySQL\Connection(
        $config['database']['mysql']['host'], //数据库链接
        $config['database']['mysql']['port'], //数据库端口
        $config['database']['mysql']['user'], //数据库用户
        $config['database']['mysql']['pass'], //数据库密码
        $config['database']['mysql']['name']  //数据库名字
    );

    //队列链接
    $client = new Client($config['redis']['host'], [
        'auth' => $config['redis']['password']
    ]);

    //日志
    $logger = new Logger('xianyu');

    //订阅
    $client->subscribe('xianyu', function($data){

        global $logger, $config;

        $logger->pushHandler(new StreamHandler($config['log']['path'] . date('Y/m/') . date('d') . '_xianyu.log'));

        // 要调用的类名,加上Consumer命名空间
        $class_name = "\\Consumer\\Xianyu";
        // 要调用的方法名
        $method = isset($data['method']) ? $data['method'] : '';
        if(class_exists($class_name)){
            $class = new $class_name;
            $callback = [$class, $method];
            if(is_callable($callback)){
                call_user_func_array($callback, ['data' => $data['data']]);
            }else{
                $logger->error("$class_name::$method not exist\n");
            }
        }else{
            $logger->error("不存在\n");
        }
    });

};

// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START')){
    Worker::runAll();
}

操作系统环境及workerman/webman等具体版本

Workerman version:4.1.13 PHP version:8.0.20

669 1 1
1个回答

walkor 打赏

消费失败(抛出异常)时,消息会放到延迟队列等待重试,会再次尝试消费。
还有就是入队列时就重复了,从你的日志来看像是重复入队列了,因为时间间隔不像是消费失败重试。

  • 神奇的海螺 2023-09-23

    感谢大佬回答,我也怀疑过是入队列重复的问题,然后就手动调用的入队列的方法,如果特别频繁的调用入队列,重复的次数就越来越多
    public function test(){
    workerQueue('xianyu', 'send', ['ticket_id' => 11111]);
    }

    function workerQueue($queue, $method, $data, $delay = 0){
    $config = config('cache.stores.redis');
    $redis = new \Redis;
    $redis->connect($config['host'], $config['port']);
    $redis->auth($config['password']);
    $data = [
    'method' => $method,
    'data' => $data
    ];

        $delay = 0;
        $queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前为redis-queue-waiting
        $queue_delay = '{redis-queue}-delayed';//1.0.5版本之前为redis-queue-delayed
    
        $now = time();
        $package_str = json_encode([
                'id'       => rand(),
                'time'     => $now,
                'delay'    => $delay,
                'attempts' => 0,
                'queue'    => $queue,
                'data'     => $data
        ]);
        if ($delay) {
                return $redis->zAdd($queue_delay, $now + $delay, $package_str);
        }
        return $redis->lPush($queue_waiting.$queue, $package_str);

    }

    消费队列处理的方法,会不会是这里的问题
    public function send($data = []){

        global $db, $config, $logger;
    
        try{
    
            //业务逻辑,如果有问题记录日志
            throw new \Exception('没有查找到可售票档');
    
            $logger->info('同步成功:'. json_encode($ticket));
        }catch(\Exception $e){
            $logger->error('同步失败:'. $e->getMessage());
        }

    }

  • walkor 2023-09-23

    这个你要自己测试了

  • 神奇的海螺 2023-09-23

    好的,感谢🙏

  • 神奇的海螺 2023-09-23

    发现问题了,队列只消费了一次,日志重复记录了,把new Logger放消息订阅里面就好了

年代过于久远,无法发表回答
×
🔝