我在使用该项目过程中,曾出现过timer无限制递增的情况,也出现过服务端主动踢出连接无法消费等问题,遂自己重写了一个amqp客户端;
后来我回过头观察分析workerman/rabbitmq源码的时候,发现了一些可以被建议的地方:
Client.php 160 - 170 行位置已经创建了一个持续的定时器
})->then(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, true);
$this->state = ClientStateEnum::CONNECTED;
return $this;
});
Client.php 226 - 243 行位置依然在反复创建一次性的定时器
public function onHeartbeat()
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"];
if ($now >= $nextHeartbeat) {
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->done(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, false);
});
if (is_callable($this->options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback']->call($this);
}
} else {
$this->heartbeatTimer = Timer::add($nextHeartbeat - $now, [$this, "onHeartbeat"], null, false);
}
}
这里我理解是为了判断时间差,更精确的进行触发回调,但是我个人认为本身workerman的定时器是精确到毫秒的,这里没有必要再做冗余的判断,直接调用即可,代码如下:
public function onHeartbeat(): void
{
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->then(
function () {
if (is_callable(
isset($this->options['heartbeat_callback'])
? $this->options['heartbeat_callback']
: null
)) {
// 这里我并没有沿用bunny的回调触发方式,而是自己写了一个,如有需要,将该方法改回bunny方法就好了
($this->options['heartbeat_callback'])($this);
// $this->options['heartbeat_callback']->call($this);
}
},
function (\Throwable $throwable){
if($this->log){
$this->log->notice(
'OnHeartbeatFailed',
[
$throwable->getMessage(),
$throwable->getCode(),
$throwable->getFile(),
$throwable->getLine()
]
);
}
AbstractProcess::kill("OnHeartbeatFailed-{$throwable->getMessage()}");
});
}
AbstractProcess::kill()这个方法实际上是一个简单的杀死当前进程的方法,因为bunny的客户端使用的是异步promise的执行方式,在遇到错误的时候会调用then中的onRejected,我认为,在一定情况下如果心跳失败了,会影响当前链接的活性,随之会被服务端踢出,但客户端并没有完善的重连机制,就造成了假死,所以我在这个位置加入杀死当前进程的方法,让workerman的主进程重新拉起一个进程,该进程也会重新连接,重新处理和消费,不会影响工作流,kill方法的代码如下:
public static function kill(?string $log = null)
{
if(self::$_masterPid === ($pid = posix_getpid())){
self::stopAll(SIGKILL, $log);
}else{
self::log("(pid:{$pid}) {$log}");
posix_kill($pid, SIGKILL);
}
}
如上述所说的,在Client.php 182 - 220的disconnect方法中也没有重连的方案,在使用rabbitmq的管理后台将该链接断开后,该进程就始终保持了一个僵尸进程的角色,无法退出也无法消费,所以我建议改进如下:
public function disconnect($replyCode = 0, $replyText = '') : Promise\PromiseInterface
{
if ($this->state === ClientStateEnum::DISCONNECTING) {
return $this->disconnectPromise;
}
if ($this->state !== ClientStateEnum::CONNECTED) {
return Promise\reject(new ClientException("Client is not connected."));
}
$this->state = ClientStateEnum::DISCONNECTING;
$promises = [];
if ($replyCode === 0) {
foreach ($this->channels as $channel) {
$promises[] = $channel->close($replyCode, $replyText);
}
}
else{
foreach($this->channels as $channel){
$this->removeChannel($channel->getChannelId());
}
}
if ($this->heartbeatTimer) {
Timer::del($this->heartbeatTimer);
$this->heartbeatTimer = null;
}
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
// 杀死当前进程,交主进程重启
AbstractProcess::kill("{$replyCode}-{$replyText}");
}
return $this;
});
}
最后,我是非常喜欢workman及相关的生态组件的,本意是想直接使用workerman生态相关的组件,但当时我所处的项目上线非常急迫,所以抛开了workerman/rabbitmq自行写了一套casual/amqp,我个人希望workerman能够越来越好,加油;
casual/amqp项目地址:https://github.com/chaz6chez/simple-amqp
接上述,如果Workerman能够提供一个子进程重启的命令或者方法的话,是极好的,目前我是用AbstractProcess继承Worker,然后增加的一个kill方法,但是也是使用了KILL的信号,不知道有没有其他更优雅的信号或者方法可以使用呢?
非常感谢你的建议!
如果可以的话,请给
workerman/rabbitmq
发个pr。业务代码都是运行在子进程,所以不用判断master_pid,workerman重启当前进程直接调用
Worker::stopAll();
就行,也可以用posix_kill(posix_getpid(), SIGINT);
好的,随后我本地测试通过后提交一个PR,希望Workerman生态越来越好
太太感谢了!!!
已提交PR,保险起见,你们还是回归测一下~
非常感谢,已经合并!👍
我打算来长期帮忙维护workerman/rabbitmq这个项目了,我把rabbitmq引入我的casual/amqp这个项目使用了,目前是在生产环境运行的
太好了,真的非常感谢!!
老哥,有没有共同维护组件的小组或交流群啊
直接在webman交流群里交流就行