分享一个基于workerman的rabbitmq客户端生产者、消费者(基于Timer实现事件驱动)

zgh419566

通过综合分析、研究和探索workerman和PhpAmqpLibr相关手册,经过长期的实践,现分享一套基于workerman的rabbitmq客户端生产者和消费者代码,供大家测试,使用。

个人觉得这套代码比workerman官方的代码逻辑更清晰,更便于使用

希望walkor采用,这样的话我也算为开源社区做了一些贡献。

rabbitmq 生产者

rabbitmq_productor.php

<?php
require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;

$worker = new Worker();
//开启进程数量
$worker->count = 4;
$worker->name = "rabbitmq_productor";

$date = date("Y-m-d");
Worker::$pidFile = "var/mq_service_productor.pid";
Worker::$logFile = "var/mq_service_productor_logFile.log";
Worker::$stdoutFile = "var/mq_service_productor_stdout.log";

$worker->onWorkerStart = function () {
    global $rabbit_connection, $rabbit_channel ,  $rabbitmq_exchange_name , $rabbitmq_queueName;

    $rabbitmq_exchange_name = "exchange_name";
    $rabbitmq_queueName = "queuePrefix_QueueName";

    // 连接 rabbitmq 服务
    $rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

    // 获取信道
    $rabbit_channel = $rabbit_connection->channel();

    //声明创建交换机
    $rabbit_channel->exchange_declare( $rabbitmq_exchange_name , 'topic', false, true, false);

    // 声明创建队列
    $rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);

    // 绑定队列
    $rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);

    //可以修改时间间隔,如果为0.002秒,则每秒产生500*4=2000条
    Timer::add( 0.002 , function() {
        global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;

        //需要向rabbitmq队列投递消息的内容,通常为数组,经过json转换再发送
        $data_all = array(
            'name' => "张三",
            'time' => time(),
        );
        $data_all_out_json = json_encode($data_all , JSON_UNESCAPED_UNICODE );
        $data_all_out_msg = new AMQPMessage($data_all_out_json, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        //向队列里面写内容
        @$rabbit_channel->basic_publish($data_all_out_msg , $rabbitmq_exchange_name , $rabbitmq_queueName);
    });
};

Worker::runAll();

rabbitmq 消费者

rabbitmq_comsumer.php

<?php

require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;

$worker = new Worker();
//开启进程数量
$worker->count = 10;
$worker->name = "rabbitmq_comsumer";

$date = date("Y-m-d");
Worker::$pidFile = "var/rabbitmq_comsumer.pid";
Worker::$logFile = "var/rabbitmq_comsumer_logFile.log";
Worker::$stdoutFile = "var/rabbitmq_comsumer_stdout.log";

$worker->onWorkerStart = function () {
    global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;

    $rabbitmq_exchange_name = "exchange_name";
    $rabbitmq_queueName = "queuePrefix_QueueName";

    // 连接 rabbitmq 服务
    $rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

    // 获取信道
    $rabbit_channel = $rabbit_connection->channel();

    // 声明队列
    $rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);

    // 绑定队列
    $rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);

    // 消费者订阅队列
    $rabbit_channel->basic_consume($rabbitmq_queueName , '', false, false, false, false,
        function ($msg){
            global $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
            $data_all_str = $msg->body;
            // 消息确认,表明已经收到这条信息
            @$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            //echo "{$data_all_str}\n";

            //这里是业务处理逻辑
            //如果这条消息处理失败,你可以在这里将其再次放回消息队列(最好给消息做个放回去的次数判断,避免无限失败和无限循环)
        });

    //这里是重点,网上很多教程起的使用while死循环容易导致程序异步代码无法执行,这种方法能避免
    //按照每个进程每秒处理1000条来设定定时器,每个进程每秒消费1000条,4个进程每秒消费4000条,经过实际验证,将时间改小也无法提升单个进程的处理速度
    //实际测试,4个进程每秒的消费能力有4000左右,可以满足很多中小型系统的应用,如果想提升系统处理能力,
    //可以增加消费者进程数量来解决,比如我将进程数量提升到10个,每秒处理能力约为1万
    //这个机制,希望能力更强的你来进行优化
    Timer::add( 0.0001 , function() {
        global $rabbit_channel;
        if( count($rabbit_channel->callbacks) > 0 ){
            $rabbit_channel->wait();
        }
    });
};

Worker::runAll();

附上我的代码运行情况

2883 6 9
6个回答

walkor

感谢分享

  • doit 2022-12-06

    大佬,有没有准备集成到webman框架中?

  • chaz6chez 2022-12-06

    webman插件有rabbitmq的消费者插件:
    https://www.workerman.net/plugin/67

  • chaz6chez 2022-12-06

    插件包含生产者、消费者;同步、异步;延迟队列、普通队列;

  • doit 2022-12-06

    看见了的,在研究的,已在生产环境使用了?

  • chaz6chez 2022-12-06

    用了很久了

  • doit 2022-12-06

    @chaz6chez,安装测试了一下,发现每创建一个队列,默认创建了一个交换机,请教一下同一类型为何不使用同一个交换机呢?对于已经创建好的队列,如何指定交换机和队列名称呢。还有就是消息消费失败重试如何操作比较好呢?

  • chaz6chez 2022-12-06

    @doit FastBuilder是一个实现点对点消费模式的消费队列Builder,在这个模型里就是“N个生产者对应一个业务对应一个交换机对应一个队列对应N个消费者”;简单讲就是一个业务对应一条队列。这样的好处是所有业务是隔离的,业务对于rabbitMQ是透明的,它仅仅只做队列的分发而已。
    FastBuilder默认以ClassName为名,也有默认的消费失败重试方式,通常来说有两种重试机制,队头阻塞重试,回到队尾重试;
    如果要实现其他消费模式的话,可以继承Builder,自行实现即可

evilk

666,感谢分享,赞一个,有机会试用一下
另外,对于其他方面,说一下个人愚见(跟楼主这个没有太大关系)
很多时候,当遇到的确需要使用MQ的情况,中小型公司,真的不会花额外的成本去用专业的MQ,最多用redis
我知道,可能很多人会说,这样很操蛋,但事实上,有很多公司,就是这样,成本能低就低
但redis目前有一个很大的问题,目前的webman队列,是使用的普通的list结构,这个是没有ack机制的
这对于某些对消息可靠性有比较高的要求的情况,就很尴尬
我们目前使用webman的redis队列插件,都是用在对消息可靠性要求不是特别高的场景,比如通知等
我有个想法,基于redis stream 写一套类似的消息队列插件
这样就可以真正满足实际需求了
有时间一定写一个出来,相信这个可能会更加实用

  • 暂无评论
monty

大佬,require_once ("./Lib_global.php");这个是什么文件,用到了吗,能发一下吗?

  • zgh419566 2022-10-25

    一些预定义的变量,设置PHP运行环境等

    <?php
    define('START_TIME', microtime(true));
    define('START_MEM', memory_get_usage());
    define('DS', DIRECTORY_SEPARATOR);

    //defined('ROOT_PATH') or define('ROOT_PATH', dirname(realpath(APP_PATH)) . DS);
    defined('ROOT_PATH') or define('ROOT_PATH', dirname($_SERVER['SCRIPT_FILENAME']) . DS);
    defined('LOG_PATH') or define('LOG_PATH', ROOT_PATH . 'Log' . DS);
    // 环境常量
    define('IS_CLI', PHP_SAPI == 'cli' ? true : false);
    define('IS_WIN', strpos(PHP_OS, 'WIN') !== false);

    $config = array();
    $global = array();

    // do NOT run this script through a web browser
    if (!isset($_SERVER['argv'][0]) || isset($_SERVER['REQUEST_METHOD']) || isset($_SERVER['REMOTE_ADDR'])) {
    die('<br><strong>This script is only meant to run at the command line.</strong>');
    }

    / let PHP run just as long as it has to /
    ini_set('max_execution_time', '0');

    //这个必须有
    ini_set('memory_limit','320M');

    //error_reporting('E_ALL');

    / this should be auto-detected, set it manually if needed /
    $config["server_os"] = (strstr(PHP_OS, "WIN")) ? "win32" : "unix";

    $config['root_path'] = ROOT_PATH;

    $config['logPath'] = LOG_PATH;

    chdir($config['root_path']);

  • monty 2022-10-25

    谢谢啦,测了一下没这些也能跑,哈哈

powerbowen

  • 暂无评论
zgh419566

分享一个进一步优化的方法,不使用Timer驱动,使用Swoole协程方式

//全局里面加个:
Worker::$eventLoopClass = 'Workerman\Events\Swoole';    //将事件引擎替换为Swoole,前提是安装了Swoole,我安装的是4.8.12
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);   //将sleep一键协程化
Swoole\Coroutine::set(['enable_deadlock_check' => false]);    //取消时间死锁检查机制,避免报错

将原来的以下代码进行替换
/*
    //按照每个进程每秒处理1万条来设定定时器
    Timer::add( 0.001 , function() {
        global $rabbit_channel;
        if( count($rabbit_channel->callbacks) > 0 ){
            $rabbit_channel->wait();
        }
    });
*/

//替换为:
// 开始消费
while (count($rabbit_channel->callbacks)) {
        $rabbit_channel->wait();
        usleep(1000);
}
  • zgh419566 2022-12-21

    分享一下我写的rabbitmq客户端类库,支持定时与服务器握手,支持发生错误时进行生重连

    <?php
    /**

    • Created by PhpStorm.
    • User: Administrator
    • Date: 2019/6/13 0013
    • Time: 2:04
      */

    //composer require php-amqplib/php-amqplib

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use PhpAmqpLib\Wire\AMQPWriter;
    use PhpAmqpLib\Wire\AMQPReader;

    class rabbitmq_client {
    public $connection = "";
    public $channel = "";
    public $is_connected = false ;
    public $exchange_name = "" ;
    public $queen_name = "";
    public $comsume_callback = null;
    public $config_option = array(
    'host' => "127.0.0.1",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'exchange_name' => 'default_exchange_name',
    'queen_name' => 'default_queen_name',
    );

    public function __construct($option = array()){
        $this->config_option['host'] = $option['host'];
        $this->config_option['port'] = $option['port'];
        $this->config_option['user'] = $option['user'];
        $this->config_option['password'] = $option['password'];
        $this->exchange_name = $option['exchange_name'];
        $this->queen_name = $option['queen_name'];
    }
    
    function app_log($log){
        //将日志信息发送给日志服务器
        $ts = round(microtime(true) - time() , 6);
        @list($ts1 , $ts2) = @explode("." , $ts);
    
        $logData = "{$ts1}.{$ts2} {$log}";
    
        //logToFile($logData);
        if(function_exists("logToScreen") == true){
            logToScreen($logData , true);
        }else{
            echo $logData."\n";
        }
    }
    
    public function connect(){
        try{
            $this->connection = new AMQPStreamConnection(
                $this->config_option['host'] ,
                $this->config_option['port'] ,
                $this->config_option['user'] ,
                $this->config_option['password'] ,
                '/' ,
                false ,
                'AMQPLAIN' ,
                null,
                'en_US' ,
                3.0 ,
                3.0 ,
                null ,
                true ,
                60
            );
    
            if( $this->connection ->isConnected() == true){
    
                $this->channel = $this->connection->channel();
    
                //声明交换机
                $this->channel->exchange_declare( $this->exchange_name , 'topic', false, true, false);
    
                // 声明队列
                $this->channel->queue_declare( $this->queen_name , false, true, false, false);
    
                // 绑定队列
                $this->channel->queue_bind($this->queen_name , $this->exchange_name , $this->queen_name );
    
                $this->is_connected = true;
                $this->app_log("rabbitmq connected");
                return true;
            }
        }catch (Exception $e) {
            $this->app_log("error catched :".$e->getMessage());
            $this->is_connected = false;
        }
        return false;
    }
    
    public function reconnect(){
        if( $this->is_connected == false ){
            if( $this->connect() == true ){
                //重新连接到服务器
                $this->is_connected = true;
                return true;
            }
        }
        return false;
    }
    
    /**
     * @return void
     *
     * 向服务器发送
     */
    function write_heartbeat(){
        if($this->is_connected == true){
            try{
                //app_log("heartbeat");
                $pkt = new AMQPWriter();
                $pkt->write_octet(8);
                $pkt->write_short(0);
                $pkt->write_long(0);
                $pkt->write_octet(0xCE);
                $this->connection->write($pkt->getvalue());
            }catch (Exception $e) {
                $this->app_log("error catched :".$e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }
        }else{
            // false
            if( $this->connect() == true ){
                //尝试连接到服务器
                $this->is_connected = true;
            }
        }
    }
    
    /**
     * @param $data
     * @param $queen_name
     * @param $is_persistent
     * @param $is_debug
     * @return void
     */
    function publish( $data = "" , $is_persistent = true , $exchange_name_input = "" , $queen_name_input = ""){
        $delivery_mod = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        if($is_persistent == false){
            $delivery_mod = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
        }
    
        $exchange_name = "";
        if(strlen($exchange_name_input) > 0){
            $exchange_name = $exchange_name_input;
        }else{
            $exchange_name = $this->exchange_name;
        }
    
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->exchange_name;
        }
    
        $rabbit_msg = new AMQPMessage($data , ['content_type'=>'text/plain','delivery_mode'=>$delivery_mod]); //定义消息
        try{
            // 发送消息
            $this->channel->basic_publish($rabbit_msg, $exchange_name, $queen_name);
        }catch (Exception $e) {
            $this->app_log("error catched :".$e->getMessage());
            $this->is_connected = false;
            if( $this->reconnect() == true ){
                $this->is_connected = true;
            }
        }
    }
    
    //在做消费时,对流量进行控制,防止出现丢数据
    function set_comsume_qos( $prefetch_size = 0 , $prefetch_count = 1 ){
        $this->channel->basic_qos( $prefetch_size , $prefetch_count ,false);   //当有消息在处理时不要发过来
    }
    
    /*
    function comsume_callback($msg){
        //收到MQ消息
        $message_body = $msg->body;
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        //echo "[x] Received ", $message_body, "\n";
        //redis_add_statistic( $redis , "rabbitmq:qos_test_consumption"  , 0.1);
    }
    */
    
    /*
    Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
    Swoole\Coroutine::set(['enable_deadlock_check' => false]);
     * */
    function comsume_swoole_go( $queen_name_input = ""){
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->queen_name;
        }
        if($this->is_connected == true) {
            go(function () {
                // 消费者订阅队列
                try {
                    if( !$this->comsume_callback ){
                        $this->app_log("function comsume_callback must be set");
                        return false;
                    }
                    $this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
                } catch (Exception $e) {
                    $this->app_log("error catched :" . $e->getMessage());
                    $this->is_connected = false;
                    $this->reconnect();
                }
    
                // 开始消费
                try {
                    /*
                    while ( count($this->channel->callbacks) ) {
                        $this->channel->wait();
                        usleep(1);
                    }*/
                    while ( $this->channel->is_consuming() ) {
                        $this->channel->wait();
                        usleep(1);
                    }
                } catch (Exception $e) {
                    app_log("error catched when consuming:" . $e->getMessage());
                    $this->is_connected = false;
                    $this->reconnect();
                }
            });
        }else{
            // false
            $this->connect();
        }
    }
    
    function comsume( $queen_name_input = ""){
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->queen_name;
        }
    
        if($this->is_connected == true) {
            // 消费者订阅队列
            try {
                if( !$this->comsume_callback ){
                    $this->app_log("function comsume_callback must be set");
                    return false;
                }
                $this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
            } catch (Exception $e) {
                $this->app_log("error catched :" . $e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }
    
            // 开始消费
            try {
                /*
                while ( count($this->channel->callbacks) ) {
                    $this->channel->wait();
                    usleep(1);
                }*/
                while ( $this->channel->is_consuming() ) {
                    $this->channel->wait();
                }
            } catch (Exception $e) {
                app_log("error catched when consuming:" . $e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }
        }else{
            // false
            $this->connect();
        }
    }

    }

    生产者:

    chdir(dirname($_SERVER['SCRIPT_FILENAME']));

    include_once __DIR__ . '/vendor/autoload.php';
    include_once("./Lib_global.php");
    include_once("./Lib_functions_rabbitmq.php");

    use Workerman\Worker;
    use Workerman\Lib\Timer;
    use Workerman\Connection\AsyncTcpConnection;
    use Workerman\Connection\TcpConnection;
    use Swoole\Coroutine;

    $worker = new Worker();

    //开启进程数量
    $worker->count = 2;
    $processName = "test_mq_pub";
    $worker->name = $processName;
    $date_ymd = date("Y-m-d");
    Worker::$pidFile = ROOT_PATH."var/{$processName}.pid";
    Worker::$logFile = ROOT_PATH."var/{$processName}_logFile.log";
    Worker::$stdoutFile = ROOT_PATH."var/{$processName}_stdout.log";
    Worker::$eventLoopClass = 'Workerman\Events\Swoole';

    $redis = "";
    $is_debug = false; //全局配置,是否开启调试模式
    $rabbitmq_client = "";

    function app_log($log){
    global $workerId;
    //将日志信息发送给日志服务器
    $ts = round(microtime(true) - time() , 6);
    @list($ts1 , $ts2) = @explode("." , $ts);

    //logToFile(".{$ts2} [$workerId] ".$log);
    logToScreen(".{$ts2} [$workerId] ".$log , true);

    }

    $worker->onWorkerStart = function() {
    global $redis, $worker, $workerId , $rabbitmq_client;

    Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
    Swoole\Coroutine::set(['enable_deadlock_check' => false]);
    
    //根据daemon顺序延时,这是确保系统正常运行的关键
    usleep(1000 * 100 * ($worker->id + 1));
    $workerId = $worker->id ;
    echo date("Y-m-d H:i:s") . " 服务进程{$workerId}已经启动!\n";
    
    //连接到Redis服务器
    $redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
    //定期与redis握手,避免被断掉,该动作每个进程都得执行
    Timer::add( 60 , function (){
        global $redis, $worker;
        $redis_status = $redis->ping();
        if($redis_status == false){
            $redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
        }
    });
    
    $rabbitmq_config_option = array();
    $rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
    $rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
    $rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
    $rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
    $rabbitmq_config_option['exchange_name'] = "4b_ads_CLASS_mqTest";
    $rabbitmq_config_option['queen_name'] = "4b_ads_CLASS_mqTest";
    $rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);
    
    //不断尝试与Rabbitmq服务器建立连接
    while( $rabbitmq_client->is_connected == false ){
        if($rabbitmq_client->connect() == true){
            break;
        }
        $rabbitmq_client->app_log("rabbitmq server connect failed");
        sleep(1);
    }
    //执行定时握手任务
    Timer::add( 55 , function ()use($rabbitmq_client) {
        // 发送心跳数据
        $rabbitmq_client->write_heartbeat();
    });
    
    Timer::add( 0.001 , function ()use($rabbitmq_client) {
        if($rabbitmq_client->is_connected == true){
            $data_json = GetRandStr(128);
            $rabbitmq_msg = json_encode($data_json);
            $rabbitmq_client->publish($rabbitmq_msg);
        }
    });

    };

    Worker::runAll();

    消费者:

    chdir(dirname($_SERVER['SCRIPT_FILENAME']));

    include_once __DIR__ . '/vendor/autoload.php';
    include_once("./Lib_global.php");
    include_once("./Lib_functions_rabbitmq.php");

    use Workerman\Worker;
    use Workerman\Lib\Timer;
    use Workerman\Connection\AsyncTcpConnection;
    use Workerman\Connection\TcpConnection;
    use Swoole\Coroutine;

    $worker = new Worker();

    //开启进程数量
    $worker->count = 2;
    $processName = "test_mq_sub";
    $worker->name = $processName;
    $date_ymd = date("Y-m-d");
    Worker::$pidFile = ROOT_PATH . "var/{$processName}.pid";
    Worker::$logFile = ROOT_PATH . "var/{$processName}_logFile.log";
    Worker::$stdoutFile = ROOT_PATH . "var/{$processName}_stdout.log";
    Worker::$eventLoopClass = 'Workerman\Events\Swoole';

    $redis = "";
    $is_debug = false; //全局配置,是否开启调试模式
    $rabbitmq_client = "";

    function app_log($log)
    {
    global $workerId;
    //将日志信息发送给日志服务器
    $ts = round(microtime(true) - time(), 6);
    @list($ts1, $ts2) = @explode(".", $ts);

    //logToFile(".{$ts2} [$workerId] ".$log);
    logToScreen(".{$ts2} [$workerId] " . $log, true);

    }

    $worker->onWorkerStart = function () {
    global $redis, $worker, $workerId, $rabbitmq_client;

    Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
    Swoole\Coroutine::set(['enable_deadlock_check' => false]);
    
    //根据daemon顺序延时,这是确保系统正常运行的关键
    usleep(1000 * 100 * ($worker->id + 1));
    echo date("Y-m-d H:i:s") . " 服务进程{$worker->id}已经启动!\n";
    $workerId = $worker->id ;
    
    //连接到Redis服务器
    $redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
    //定期与redis握手,避免被断掉,该动作每个进程都得执行
    Timer::add( 60 , function (){
        global $redis, $worker;
        $redis_status = $redis->ping();
        if($redis_status == false){
            $redis = redis_connect(REDIS_SERVER , REDIS_PORT , REDIS_PASSWORD , REDIS_DBNAME);
        }
    });
    
    //初始化Rabbitmq连接
    $rabbitmq_config_option = array();
    $rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
    $rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
    $rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
    $rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
    $rabbitmq_config_option['exchange_name'] = "4b_ads_CLASS_mqTest";
    $rabbitmq_config_option['queen_name'] = "4b_ads_CLASS_mqTest";
    $rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);
    
    //不断尝试与Rabbitmq服务器建立连接
    while( $rabbitmq_client->is_connected == false ){
        if($rabbitmq_client->connect() == true){
            break;
        }
        $rabbitmq_client->app_log("rabbitmq server connect failed");
        sleep(1);
    }
    //执行定时握手任务
    Timer::add( 55 , function ()use($rabbitmq_client) {
        // 发送心跳数据
        $rabbitmq_client->write_heartbeat();
    });
    
    $rabbitmq_client->comsume_callback = function ($msg){
        //收到MQ消息
        $message_body = $msg->body;
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        //echo "[x] Received ", $message_body, "\n";
    };
    
    //$rabbitmq_client->comsume_swoole_go();   //采用协程方式处理
    $rabbitmq_client->comsume();               //传统方式

    };

    Worker::runAll();

zgh419566

在使用时发现或多或少有一些问题(最大的问题是CPU抢占问题,导致workerman内的基于定时任务长时间得不到执行),毕竟官方的内容都是同步机制的,我在想有没有可能使用异步实现。
经过长时间的研究,终于解决了这个问题,个人认为比官方基于bunny+React的方式更好使用一些。

https://www.workerman.net/a/1485

  • 暂无评论
年代过于久远,无法发表回答
🔝