在gatewaywork的bussiness中使用redis-queue,onconnect时间中,给队列发送数据
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{
// 向当前client_id发送数据
// Gateway::sendToClient($client_id, "当前在线".Gateway::getAllClientIdCount()." \r\n");
// 向所有人发送
try {
$client = new Client('redis://127.0.0.1:6379');
$client->send('user-1', ['demo', 'data']);
echo "aaa",PHP_EOL;
} catch (\Throwable $th) {
print_r($th);
}
另外在start_queue.php中消费
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
use \Workerman\Worker;
use \Workerman\Timer;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use Workerman\RedisQueue\Client;
use \Workerman\Autoloader;
// 自动加载类
require_once __DIR__ . '/../../vendor/autoload.php';
// bussinessWorker 进程
$worker = new Worker();
// worker名称
$worker->name = 'queue';
// bussinessWorker进程数量
$worker->count = 1;
// 服务注册地址
// $worker->registerAddress = '127.0.0.1:1238';
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// 订阅
$client->subscribe('user-1', function($data){
echo "user-1\n";
// var_export($data);
file_put_contents('a.txt','bbb');
});
// $client->send('user-1', ['some', 'data']);
};
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
经过测试,100次连接,只消费不到一半,有的时候 只有10次不到。单独在start_queue中用定时器,定时推送消费,就没什么问题。而且,似乎还有粘包的问题,因为发现有几行输出是多个aaa连着的,没有换行
不好意思,是我搞错了,我在application下直接执行了一个消费队列进程,导致在目录下面stop的时候,没结束这个消费进程
我还看出个问题,每个客户端连接时都会初始化一个Client ,这会导致Client实例越来越多,最终会超出系统限制报错。
你应该将初始化Client放在onWorkerStart里,像你消费那样,只初始化一次。
嗯,这是写个demo,redisqueue已经处理了断开连接的问题,但是超时这种不知道有没有处理