借助redis的发布/订阅模式实现实时通信,由于进程订阅redis频道后会阻塞,必须使用异步redis组件。使用之后能够成功订阅频道,但是过一段时间redis链接就会断开。
代码如下:
public static function onMessage($client_id, $message)
{
echo '订阅前收到消息:', $client_id, PHP_EOL;
self::$factory->createClient('redis://127.0.0.1:6379')->then(
function (Client $client) use ($client_id) {
$client->subscribe($client_id);
$client->on('message', function ($channel, $message){
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
});
},
function ($exception) {
echo $exception->getMessage();
}
);
echo '订阅后收到消息:', $client_id, PHP_EOL;
self::$loop->run();
}
public static function onWorkerStart($worker)
{
self::$loop = Worker::getEventLoop();
self::$factory = new Factory(self::$loop);
}
错误信息如下:
process_timeout:
#1 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(226): pcntl_signal_dispatch()
#2 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(205): React\EventLoop\StreamSelectLoop->waitForStreamActivity(NULL)
#3 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Events/React/Base.php(252): React\EventLoop\StreamSelectLoop->run()
#4 /Volumes/Data/Code/WorkerFastCGI/server/Events.php(55): Workerman\Events\React\Base->run()
#5 : Server\Events::onMessage('7f0000011b58000...', '{"msg_type":500...')
#6 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/gateway-worker/src/BusinessWorker.php(395): call_user_func('Server\\Events::...', '7f0000011b58000...', '{"msg_type":500...')
#7 : GatewayWorker\BusinessWorker->onGatewayMessage(Object(Workerman\Connection\AsyncTcpConnection), Array)
#8 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Connection/TcpConnection.php(645): call_user_func(Array, Object(Workerman\Connection\AsyncTcpConnection), Array)
#9 : Workerman\Connection\TcpConnection->baseRead(Resource id #71)
#10 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(238): call_user_func(Array, Resource id #71)
#11 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(205): React\EventLoop\StreamSelectLoop->waitForStreamActivity(NULL)
#12 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Events/React/Base.php(252): React\EventLoop\StreamSelectLoop->run()
#13 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Events/React/Base.php(135): Workerman\Events\React\Base->run()
#14 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(2226): Workerman\Events\React\Base->loop()
#15 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/gateway-worker/src/BusinessWorker.php(197): Workerman\Worker->run()
#16 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(1352): GatewayWorker\BusinessWorker->run()
#17 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(1184): Workerman\Worker::forkOneWorkerForLinux(Object(GatewayWorker\BusinessWorker))
#18 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(1158): Workerman\Worker::forkWorkersForLinux()
#19 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(478): Workerman\Worker::forkWorkers()
#20 /Volumes/Data/Code/WorkerFastCGI/server.php(37): Workerman\Worker::runAll()
#21 {main}
按照手册,不要调用self::$loop->run();,去掉试下
谢谢老大,之前主要是根据github上的demo写的,没注意到这点,去掉self::$loop->run();确实好了!
如果是redis订阅的话,在onWorkerStart里执行一次订阅就好,不必每次onMessage执行一次订阅。
参考 https://github.com/walkor/phpsocket.io/issues/62
非常感谢指点,我是这样想的:根据客户端的请求动态的订阅和取消订阅,不过后来仔细想了下,使用gateway的sendToGroup也能实现同样的功能。不知道这两种方式哪种更好,利用redis的发布/订阅模式实现端到端的对对多通信会不会有什么问题?