关于workerman/rabbitmq项目的建议

chaz6chez

我在使用该项目过程中,曾出现过timer无限制递增的情况,也出现过服务端主动踢出连接无法消费等问题,遂自己重写了一个amqp客户端;
后来我回过头观察分析workerman/rabbitmq源码的时候,发现了一些可以被建议的地方:

  1. 源码在如下位置会反复创建定时器

Client.php 160 - 170 行位置已经创建了一个持续的定时器


    })->then(function () {
            $this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, true);

            $this->state = ClientStateEnum::CONNECTED;
            return $this;

        });

Client.php 226 - 243 行位置依然在反复创建一次性的定时器

public function onHeartbeat()
    {
        $now = microtime(true);
        $nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"];

        if ($now >= $nextHeartbeat) {
            $this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
            $this->flushWriteBuffer()->done(function () {
                $this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, false);
            });

            if (is_callable($this->options['heartbeat_callback'] ?? null)) {
                $this->options['heartbeat_callback']->call($this);
            }
        } else {
            $this->heartbeatTimer = Timer::add($nextHeartbeat - $now, [$this, "onHeartbeat"], null, false);
        }
    }

这里我理解是为了判断时间差,更精确的进行触发回调,但是我个人认为本身workerman的定时器是精确到毫秒的,这里没有必要再做冗余的判断,直接调用即可,代码如下:

    public function onHeartbeat(): void
    {
        $this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
        $this->flushWriteBuffer()->then(
            function () {
                if (is_callable(
                    isset($this->options['heartbeat_callback'])
                        ? $this->options['heartbeat_callback']
                        : null
                )) {
                // 这里我并没有沿用bunny的回调触发方式,而是自己写了一个,如有需要,将该方法改回bunny方法就好了
                    ($this->options['heartbeat_callback'])($this);
                    // $this->options['heartbeat_callback']->call($this);
                }
            },
            function (\Throwable $throwable){
                if($this->log){
                    $this->log->notice(
                        'OnHeartbeatFailed',
                        [
                            $throwable->getMessage(),
                            $throwable->getCode(),
                            $throwable->getFile(),
                            $throwable->getLine()
                        ]
                    );
                }
                AbstractProcess::kill("OnHeartbeatFailed-{$throwable->getMessage()}");
            });
    }

AbstractProcess::kill()这个方法实际上是一个简单的杀死当前进程的方法,因为bunny的客户端使用的是异步promise的执行方式,在遇到错误的时候会调用then中的onRejected,我认为,在一定情况下如果心跳失败了,会影响当前链接的活性,随之会被服务端踢出,但客户端并没有完善的重连机制,就造成了假死,所以我在这个位置加入杀死当前进程的方法,让workerman的主进程重新拉起一个进程,该进程也会重新连接,重新处理和消费,不会影响工作流,kill方法的代码如下:

    public static function kill(?string $log = null)
    {
        if(self::$_masterPid === ($pid = posix_getpid())){
            self::stopAll(SIGKILL, $log);
        }else{
            self::log("(pid:{$pid}) {$log}");
            posix_kill($pid, SIGKILL);
        }
    }

如上述所说的,在Client.php 182 - 220的disconnect方法中也没有重连的方案,在使用rabbitmq的管理后台将该链接断开后,该进程就始终保持了一个僵尸进程的角色,无法退出也无法消费,所以我建议改进如下:

    public function disconnect($replyCode = 0, $replyText = '') : Promise\PromiseInterface
    {
        if ($this->state === ClientStateEnum::DISCONNECTING) {
            return $this->disconnectPromise;
        }

        if ($this->state !== ClientStateEnum::CONNECTED) {
            return Promise\reject(new ClientException("Client is not connected."));
        }

        $this->state = ClientStateEnum::DISCONNECTING;

        $promises = [];

        if ($replyCode === 0) {
            foreach ($this->channels as $channel) {
                $promises[] = $channel->close($replyCode, $replyText);
            }
        }
        else{
            foreach($this->channels as $channel){
                $this->removeChannel($channel->getChannelId());
            }
        }

        if ($this->heartbeatTimer) {
            Timer::del($this->heartbeatTimer);
            $this->heartbeatTimer = null;
        }

        return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
            if (!empty($this->channels)) {
                throw new \LogicException("All channels have to be closed by now.");
            }
            if($replyCode !== 0){
                return null;
            }
            return $this->connectionClose($replyCode, $replyText, 0, 0);
        })->then(function () use ($replyCode, $replyText){
            $this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
            $this->closeStream();
            $this->init();
            if($replyCode !== 0){
            // 杀死当前进程,交主进程重启
                AbstractProcess::kill("{$replyCode}-{$replyText}");
            }
            return $this;
        });
    }

最后,我是非常喜欢workman及相关的生态组件的,本意是想直接使用workerman生态相关的组件,但当时我所处的项目上线非常急迫,所以抛开了workerman/rabbitmq自行写了一套casual/amqp,我个人希望workerman能够越来越好,加油;

casual/amqp项目地址:https://github.com/chaz6chez/simple-amqp

1835 1 1
1个评论

chaz6chez

接上述,如果Workerman能够提供一个子进程重启的命令或者方法的话,是极好的,目前我是用AbstractProcess继承Worker,然后增加的一个kill方法,但是也是使用了KILL的信号,不知道有没有其他更优雅的信号或者方法可以使用呢?

  • walkor 2021-12-30

    非常感谢你的建议!
    如果可以的话,请给 workerman/rabbitmq发个pr。

    业务代码都是运行在子进程,所以不用判断master_pid,workerman重启当前进程直接调用 Worker::stopAll();就行,也可以用 posix_kill(posix_getpid(), SIGINT);

  • chaz6chez 2021-12-30

    好的,随后我本地测试通过后提交一个PR,希望Workerman生态越来越好

  • walkor 2021-12-30

    太太感谢了!!!

  • chaz6chez 2021-12-30

    已提交PR,保险起见,你们还是回归测一下~

  • walkor 2021-12-30

    非常感谢,已经合并!👍

  • chaz6chez 2022-01-01

    我打算来长期帮忙维护workerman/rabbitmq这个项目了,我把rabbitmq引入我的casual/amqp这个项目使用了,目前是在生产环境运行的

  • walkor 2022-01-02

    太好了,真的非常感谢!!

  • chaz6chez 2022-01-04

    老哥,有没有共同维护组件的小组或交流群啊

  • walkor 2022-01-04

    直接在webman交流群里交流就行

年代过于久远,无法发表评论

chaz6chez

4314
积分
0
获赞数
0
粉丝数
2018-11-16 加入
🔝