RabbitMQ 断开后,主动 Worker::stopAll 是出于什么考虑呢?
我想实现服务自动恢复,如果RabbitMQ断开之后,会定时重连。我的服务在onWorkerStart时比较耗时。
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
return $this;
});
输出为:
RabbitMQ client disconnected: [506] Connection closed by server unexpectedly
子进程stopAll就会退出当前子进程,主进程会重新拉起子进程,在重新拉起子进程的过程中,子进程会对rabbitmq进行重连,这是最简单的重连机制的实现方案;当然还有的做法是可以不用重启进程,在进程内部用代码实现重连,代码复杂度会更高,相对这种方案,维护起来也不方便,两种方案比较起来也没有明显的性能差异,所以选择一种代码维护方便的即可。
@chaz6chez 谢谢回复, 那我只需要重写一下现在的客户端即可。
你想要的不是立即重连,而是定时重连是吗?
另外你使用的哪一个插件或者组件包?版本是多少?
是的,定时重连。 使用的是"workerman/rabbitmq": "^1.0"
之前重启方案是我提交的PR,你也可以按照以下代码进行修改
如果你有空,你可以提交这个pr,如果暂时没空,等会儿我去提交个pr
上面定时器写错了,用EV_TIMER_ONCE不要用EV_TIMER
我自己实现的重连方案没并有使用 worker::stopAll, 我是希望不重启子进程。 我现在想重写Client ,去掉Woker::stopAll(), 之后出现这样的错误:
Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in /home/vagrant/code/wanlay-scheduler/vendor/bunny/bunny/src/Bunny/AbstractClient.php:311
Stack trace:
0 /home/vagrant/code/wanlay-scheduler/vendor/bunny/bunny/src/Bunny/Async/Client.php(300): Bunny\AbstractClient->read()
1 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Events/Select.php(311): Bunny\Async\Client->onDataAvailable()
2 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(1635): Workerman\Events\Select->loop()
3 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(1426): Workerman\Worker::forkOneWorkerForLinux()
4 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(1400): Workerman\Worker::forkWorkersForLinux()
5 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(560): Workerman\Worker::forkWorkers()
6 /home/vagrant/code/wanlay-scheduler/vendor/workerman/webman-framework/src/support/App.php(131): Workerman\Worker::runAll()
7 /home/vagrant/code/wanlay-scheduler/start.php(4): support\App::run()
不建议自行实现重启功能,本身客户端高度依赖eventloop,很多准备工作是在进程启动的时候做的,比如加入定时器、加入事件监听,如果自行实现重启,需要对这些工作都要重新再做一遍
以上支持定时重启的特性,我已经提交PR了,等待walkor老大合并
你上述方式这样写,仅仅只是关闭了流的监听关闭了心跳定时器移除了缓冲区,但是你重连的时候,你还需要把流的监听事件加上,并且为rabbitmq-client的连接进行重新建立,重新获取client-id等等
谢谢大佬 !太给力了, 我最开始的思路是想,如果断开了连接,通过定时器去尝试连接。 连接成功后覆盖之前的连接对象。
目前发送MQ仅仅是服务的一部分,直接重启我觉得影响了我其他的工作。
那你可以参考这个client是如何进行连接的,然后在你的定时器里把连接工作做完,然后重连这里就按你自己写的方式写;当然我个人觉得重启当前进程是最快的做法,丝毫不会影响其他的工作。
这个我再做一个特性吧,就在重连这个地方增加一个注册的回调事件,如果加入了注册回调事件,就执行回调事件,如果没有回调事件,就自动重启
感谢PR 已经合并发了版本
@walkor 老大,根据需求,新增了用户自定义注册回调;我又调整了一个pr,已提交
https://github.com/walkor/rabbitmq/pull/15
@mo 你可以参考https://github.com/walkor/rabbitmq/pull/15的方式,在回调里面得到传入的client实例,然后对client实例进行处理销毁,重新new一个client代替你之前使用的client实例即可
非常感谢!!!