使用GatewayWorker同时做客户端和服务端。
做客户端使用异步请求一个ws连接,不断接受消息,将其发送到Channel通道中。
做服务端订阅Channel通道,将消息转发给普通用户连接。
我的问题不是如何实现,而是如何在服务器配置有限的情况下,尽最大可能提升并发量。
服务器配置是4核4G高性能云服务器,目前不使用阶梯式递增压测,最高并发可维持在1w左右。
超过1w,就会exit_process进程,杀掉一些连接。
我想知道的是,请问大家还有什么方案来实现吗?
我目前还没有加上数据处理部分,我想加上数据处理部分并发又会下降不少。
需要实时转发的数据处理部分是否可以通过task来实现?
使用协程可以来处理不是需要实时转发的数据,比如把接收到的数据存储到数据库这些阻塞操作?
开启4个Gateway进程,12个Business进程,1个Channel进程,1个Register进程。
目前有四个方案:
第一个是单一订阅频道&直接sendToAll消息。
第二个是在onConnect中加入一个连接分组策略,多个订阅频道&sendToGroup消息。
第三个是在onWorkerStart加入一个进程分组策略,即异步连接只产生在一个进程A中,通过心跳来判断进程A是否一直正常处理,不正常时打开第二个进程B来处理消息,如此反复。其他同第二个方案一样。
第四个是把客户端和服务端进行分布式部署。异步请求以及发送消息给通道的操作 与 用户连接订阅通道消息并发送消息给用户 完全分开部署。
第一个方案代码如下:
public static function onWorkerStart($businessWorker)
{
// 异步建立连接
if ($businessWorker->id === 0) {
// 连接通道
Channel\Client::connect('0.0.0.0', 2206);
// 创建异步连接
$connection = new AsyncTcpConnection('ws://101.82.23.49:9765');
// 设置ssl
$connection->transport = 'ssl';
// 设置成功连接时
$connection->onConnect = function ($con) {
// 发送消息
$con->send('{"op": "subscribe","args":[]}');
};
// 设置成功接收到消息时
$connection->onMessage = function ($connection, $data) {
// 转发消息给通道接受
Channel\Client::publish('data_channel', $data);
};
// 执行异步连接
$connection->connect()
}
// 订阅心跳信号
Channel\Client::on('data_channel', function($data) {
Gateway::sendToAll($data);
});
}
第二个方案代码如下:
use \Workerman\Redis\Connection as RedisConnection;
public static function onConnect($client_id)
{
$redis = new RedisConnection('localhost', 6379);
$group_ans = $redis->get('group_ans') ?: 0;
$increment = $redis->get('increment') ?: 1;
Label:
if ($group_ans > self::MAX_GROUP - 1) {
// group
$group_ans = 0;
// 增量
$increment++;
$redis->set('increment', $increment);
}
if (Gateway::getClientCountByGroup('group_' . $group_ans) > (int)(self::MIN_USERS * $increment)) {
// group
$group_ans++;
$redis->set('group_ans', $group_ans);
// callback
goto Label;
}
Gateway::joinGroup($client_id, 'group_' . $group_ans);
}
第三个方案代码如下:
use \Workerman\Redis\Connection as RedisConnection;
public static function onWorkerStart($businessWorker)
{
// 连接Redis
self::$redis = new Redis();
self::$redis->connect('localhost', 6379);
// 连接通道
Channel\Client::connect('0.0.0.0', 2206);
// 如果是0号进程,作为主进程
if ($businessWorker->id === 0) {
// 获取数据的代码...
self::getData($businessWorker);
}
// 如果是备份进程
if ($businessWorker->id >= 1 && $businessWorker->id <= self::MAX_GROUP - 1) {
// 设置
self::$last_heartbeat_time[$businessWorker->id] = time();
// 订阅心跳信号
Channel\Client::on('heart_channel', function($data) use ($businessWorker) {
self::$last_heartbeat_time[$businessWorker->id] = time();
});
// 定期检查心跳信号
Timer::add(3, function() use ($businessWorker) {
// 如果1秒钟没有收到上一个进程的心跳信号,并且成功获取了锁
if (time() - self::$last_heartbeat_time[$businessWorker->id] > 1 && self::$redis->set('lock', 1, ['nx', 'ex' => 3])) {
// 开始获取数据的代码...
self::getData($businessWorker);
}
});
}
// 订阅通道数据
Channel\Client::on('data_channel', function($data) use ($businessWorker) {
// 发送
Gateway::sendToGroup('group_' . $businessWorker->id, $data);
});
}
需要更改系统策略,比如超过一定用户链接数之后,使用读取缓存的方式获取消息,例如:使用redis缓存一下积累的消息,使用http方式获取一下,当然还有其他方式,如果只是单纯的去转发消息,很容易引发消息风暴,导致带宽跑满
有这么多热心大佬,
WK
一定会越来越好,用的都有信心。