rabbitmq想实现延迟队列,使用的 workerman/rabbitmq扩展

jk12097

问题描述

项目需要实现延迟执行,发布消息后延迟 5~10 秒执行,
使用的 workerman/rabbitmq扩展,不要说哪个扩展能实现,已经用workerman/rabbitmq扩展写了很多多列了,想再换扩展有点费劲

程序代码

$client = Client::factory([
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'heartbeat' => 60,
    ])->connect();

    $channel = $client->channel();
    $exchange_name = 'delayed_demo_exchange';
    $queue_name = 'demo_delay_queue';
    // 创建延迟交换机,交换机类型为 direct
    $channel->exchangeDeclare($exchange_name, 'x-delayed-message', false, true, false, false, [
        'x-delayed-type' => 'direct',
    ]);

    // 创建延迟队列
    $queueName = 'delayed_queue';
    $channel->queueDeclare($queue_name, false, true, false, false);
    $channel->queueBind($queue_name, $exchange_name);

    // 发送消息
    $message = 'Delayed Message at ' . time();
    $channel->publish($message, ['delivery-mode' => 2,'x-delay' => 5000], '', $queue_name);
    // 将消息发布到延迟交换机
    $channel->publish($msg, $exchangeName, $queueName);

报错信息

Bunny\Exception\ClientException: PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange
type in /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ClientMethods.php:1280<br />
Stack trace:<br />
#0 /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ClientMethods.php(1240): Bunny\AbstractClient->awaitQueueDeclareOk(1)<br />
#1 /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ChannelMethods.php(111): Bunny\AbstractClient->queueDeclare(1, 'demo_delay_queu...', false, true, false, false, false, Array)<br />
#2 /Users/winds/Project/PHP/b_new/app/comm/service/pub/PubMq.php(506): Bunny\Channel->queueDeclare('demo_delay_queu...', false, true, false, false)<br />
#3 /Users/winds/Project/PHP/b_new/app/api/controller/CommonController.php(27): app\comm\service\pub\PubMq::demoPub(123456)<br />
#4 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(335): app\api\controller\CommonController->login(Object(support\Request))<br />
#5 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(358): Webman\App::Webman\{closure}(Object(support\Request))<br />
#6 /Users/winds/Project/PHP/b_new/app/middleware/GlobalMiddleware.php(15): Webman\App::Webman\{closure}(Object(support\Request))<br />
#7 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(351): app\middleware\GlobalMiddleware->process(Object(support\Request), Object(Closure))<br />
#8 /Users/winds/Project/PHP/b_new/vendor/webman/log/src/Middleware.php(96): Webman\App::Webman\{closure}(Object(support\Request))<br />
#9 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(351): Webman\Log\Middleware->process(Object(support\Request), Object(Closure))<br />
#10 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(662): Webman\App::Webman\{closure}(Object(support\Request))<br />
#11 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(156): Webman\App::findRoute(Object(Workerman\Connection\TcpConnection), '/api/v1/Common/...', 'POST/api/v1/Com...', Object(support\Request), 200)<br />
#12 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Connection/TcpConnection.php(749): Webman\App->onMessage(Object(Workerman\Connection\TcpConnection), Object(support\Request))<br />
#13 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Events/Select.php(400): Workerman\Connection\TcpConnection->baseRead(Resource id #349)<br />
#14 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1735): Workerman\Events\Select->run()<br />
#15 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1537): Workerman\Worker::forkOneWorkerForLinux(Object(Workerman\Worker))<br />
#16 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1517): Workerman\Worker::forkWorkersForLinux()<br />
#17 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(585): Workerman\Worker::forkWorkers()<br />
#18 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/support/App.php(152): Workerman\Worker::runAll()<br />
#19 /Users/winds/Project/PHP/b_new/start.php(5): support\App::run()<br />
#20 {main}

操作系统及workerman/webman等框架组件具体版本

系统:macos 15.2
webman版本:1.6.8
workerman/rabbitmq版本:2.0.0

158 2 0
2个回答

SillyDog

延迟交换机插件启用了嘛?

SillyDog

消费

<?php

declare(strict_types=1);

namespace app\command;

use Bunny\Channel;
use Bunny\Message;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\Events\Fiber;
use Workerman\Events\Revolt;
use Workerman\RabbitMQ\Client;
use Workerman\Worker;

class DelayReceive extends Command
{
    protected static $defaultName = 'delayReceive';
    protected static $defaultDescription = 'delayReceive';

    /**
     * @return void
     */
    protected function configure()
    {
        $this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
    }

    /**
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     */
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $worker = new Worker();
        $worker->eventLoop = Revolt::class;
        $worker->onWorkerStart = function () {
            $exchange_name = 'delayed_demo_exchange';
            $queue_name = 'demo_delay_queue';
            $routing_key = 'delayed_key';
            $client = Client::factory([
                'host' => 'rabbitmq',
                'port' => 5672,
                'user' => 'guest',
                'password' => 'guest',
                'vhost' => '/',
                'heartbeat' => 60,
            ])->connect();
            $channel = $client->channel();

            // 创建延迟队列
            $channel->queueDeclare($queue_name, false, true, false, false, false, [
                [
                    'x-delayed-type' => 'direct',
                ]
            ]);
            // Consumer
            $channel->consume(
                function (Message $message, Channel $channel, \Bunny\AbstractClient $client) {
                    echo " [>] Received ", $message->content, "Now:",date("Y-m-d H:i:s"), "\n";

                },
                $queue_name,
                $routing_key,
                false,
                true,
                false,
                false,
                [
                    'x-delayed-type' => 'direct',
                ]
            );

            $client->run();

            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
        };
        Worker::runAll();
        return self::SUCCESS;
    }

}

生产

<?php

namespace app\command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\RabbitMQ\Client;

class Delay extends Command
{
protected static $defaultName = 'delay';
protected static $defaultDescription = 'delay';

/**
 * @return void
 */
protected function configure()
{
    // $this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
}

/**
 * @param InputInterface $input
 * @param OutputInterface $output
 * @return int
 */
protected function execute(InputInterface $input, OutputInterface $output): int
{
    // $name = $input->getArgument('name');
    // $output->writeln('Hello delay');

    $client = Client::factory([
        'host' => 'rabbitmq',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'heartbeat' => 60,
    ])->connect();

    $channel = $client->channel();
    $exchange_name = 'delayed_demo_exchange';
    $queue_name = 'demo_delay_queue';
    $routing_key = 'delayed_key';
    // 创建延迟交换机,交换机类型为 direct
    $channel->exchangeDeclare(
        $exchange_name,
        'x-delayed-message',
        false,
        true,
        false,
        false,
        true,
        [
            'x-delayed-type' => 'direct',
        ]
    );

    // 创建延迟队列
    $channel->queueDeclare($queue_name, false, true, false, false);
    $channel->queueBind($queue_name, $exchange_name, $routing_key);
    while (true) {
        // 发送消息
        $message = 'Delayed Message at ' . time();
        $channel->publish($message, ['delivery-mode' => 2,'x-delay' => 5000], $exchange_name, $routing_key);
        sleep(1); // 1s 循环一次
    }
    return self::SUCCESS;
}

}

写了个demo 你先对照一下
  • jk12097 2天前

    直接贴您的代码运行通过,感谢大神!

×
🔝