rabbitmq 异步(workerman)和同步的生产者和消费者

2.0.x-dev 版本
2025-01-09 版本更新时间
132 安装
4 star

简介

rabbitmq 是一个异步(workerman)和同步的PHP客户端,用于异步(workerman)和同步的生产者和消费者。

封装了workerman/rabbitmq, 有类似webman/redis-queue的分组消费者, 也有按文件的单个worker

安装

composer require roiwk/rabbitmq

使用

Config Demo

// 配置格式
$config = [
    'host' => '127.0.0.1',
    'port' => 5672,
    'vhost' => '/',
    'mechanism' => 'AMQPLAIN',
    'user' => 'username',
    'password' => 'password',
    'timeout' => 10,
    'heartbeat' => 60,
    'heartbeat_callback' => function(){},
    'error_callback'     => null,
];

一. webman中自定义进程--消费者

1.process.php

'hello-rabbitmq' => [
    'handler' => app\queue\rabbitmq\Hello::class,
    'count'   => 1,
    'constructor' => [
        'rabbitmqConfig' => $config,
        //'logger' => Log::channel('hello'),
    ],
]

2.app\queue\rabbitmq\Hello.php

namespace app\queue\rabbitmq;

use Roiwk\Rabbitmq\AbstractConsumer;
use Roiwk\Rabbitmq\Producer;
use Bunny\Channel;
use Bunny\Message;
use Bunny\AbstractClient;

class Hello extends AbstractConsumer
{
    protected bool $async = true;

    protected string $queue = 'hello';

    protected array $consume = [
        'noAck' => true,
    ];

    public function consume(Message $message, Channel $channel, AbstractClient $client)
    {
        echo " [x] Received ", $message->content, "\n";
    }
}

二.webman中自定义进程--分组消费者

类似webman-queue插件, 分组将消费者放同一个文件夹下, 使用同一个worker, 多个进程数处理
1.process.php

'hello-rabbitmq' => [
    'handler' => Roiwk\Rabbitmq\GroupConsumers::class,
    'count'   => 2,
    'constructor' => [
        'consumer_dir' => app_path().'/queue/rabbimq',
        'rabbitmqConfig' => $config,
        //'logger' => Log::channel('hello'),
    ],
]

2.在 app_path().'/queue/rabbimq' 目录下创建php文件, 继承Roiwk\Rabbitmq\AbstractConsumer即可, 同上app\queue\rabbitmq\Hello.php

赞助商