在webman里如何在worker启动时全局初始化rabbitmq连接对象

cqqjj1029

写了一个rabbitmq的工具类,下面贴代码,需要生产消息时直接

\util\Rabbitmq\publishWorkerQueue($queueName, $msg);

需要消费消息时:

\util\Rabbitmq\consumeWorkerQueue($queueName, $callback);

我一直有个疑问,这个rabbit的connection对象,应该是在worker启动时就创建好,然后在需要的地方直接调用就行,否则像现在这样,每生产一次要建立一次连接再销毁,应该会浪费资源吧。
但是我想不明白该在什么地方怎么写这个全局建立连接对象的方法,对于logger对象也是同样的疑问,希望可以得到指点。

具体代码如下:

<?php

namespace util;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use support\Log;

class Rabbitmq
{
    protected static function Connection()
    {
        $connection = new AMQPStreamConnection(
            env('RABBITMQ_HOST', '127.0.0.1'),
            env('RABBITMQ_PORT', 5672),
            env('RABBITMQ_USER', 'guest'),
            env('RABBITMQ_PASSWORD', 'guest')
        );
        return $connection;
    }

    /**
     * 发布消息到worker队列,支持一次性发布多个消息
     *
     * @author Aaron <chenqiang@h024.cn>
     *
     * @param string $queueName     队列名称
     * @param string|array $msgData 需要入队的消息,单一消息为字符串类型,多个消息是数组类型
     */
    public static function publishWorkerQueue(string $queueName = '', $msgData)
    {
        $log = Log::channel('producer');
        if ($queueName == '') {
            $queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
        }
        $connection = self::Connection();
        $channel = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        if (!is_array($msgData)) {
            $msgData = array($msgData);
        }
        // 遍历数组,对每一个元素做入队操作
        foreach ($msgData as $dataBody) {
            // 把字符串类型的元素入队,忽略其他类型的元素
            try {
                $dataBody = (string)$dataBody;
            } catch (\Exception $e) {
                $dataBody = json_encode($dataBody);
            }
            $msg = new AMQPMessage(
                $dataBody,
                array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
            );
            $log->debug("{$queueName}入队数据: {$dataBody}");
            $channel->basic_publish($msg, '', $queueName);
        }
        $channel->close();
        $connection->close();
    }

    /**
     * 从一个worker队列里消费一条记录
     *
     * @author Aaron <chenqiang@h024.cn>
     *
     * @param string $queueName     队列名称
     * @param callable $callback    消费的回调函数,接收值$msg了队列中的一条消息
     * @param bool $autoAck         自动确认消费,默认为false,需要在消费回调里手动执行$msg->ack()做消费成功确认
     */
    public static function consumeWorkerQueue(string $queueName, callable $callback, bool $autoAck = false)
    {
        $log = Log::channel('consumer');
        if ($queueName == '') {
            $queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
        }
        $connection = self::Connection();
        $channel = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false); // 默认自动确认
        $channel->basic_qos(null, 1, null); // 一次只消费一条
        $channel->basic_consume($queueName, '', false, $autoAck, false, false, $callback);
        while ($channel->is_open()) {
            $channel->wait();
            pcntl_signal_dispatch();    // 针对exit with status 9
        }
    }
}
1731 2 0
2个回答

chaz6chez

进程与进程之间是资源阻隔的,在当前进程里,global或静态变量都可以承载这个连接资源,如果你想要全局的,所有进程共享,那不行,除非用线程

  • 暂无评论
2548a

文档里有写,地址: https://www.workerman.net/doc/webman/others/bootstrap.html
自己注意,初始化的是当前进程有效的.

  • cqqjj1029 2022-05-07

    这个文档我看了,同时也参考了support\bootstrap\Session和support\bootstrap\LaravelDb,但还是没想明白具体我该怎么写,比如我创建app/bootstrap/Rabbitmq.php文件,在它的start文件法写了$connection = new AMQPStreamConnection(...),然后呢,怎么能让process中的RabbitConsumer.php能够访问到这个$connection呢?

  • chaz6chez 2022-05-07

    数据库连接是怎么实现进程内单例的,你就可以怎么实现,最简单的方式就是class中用static变量保存new AMQPStreamConnection(...),然后下次用的时候都用这个static变量

年代过于久远,无法发表回答
×
🔝