workerman如何使用worker进程处理繁重的业务?

lt

to:Dear walkor,
我的项目采用workerman的GW/BW模型,设置了4个GW和4个BW进程,BW的onmessage收到客户的命令消息后,要生成一个全局的订单编号,然后生成订单保存到MYSQL数据库。测试的时候,客户端开了4个进程不停地发送订单命令,就会卡住了,其他新的客户端不能登录了,GW进程运行只能正常能收到消息,但BW卡住了。
我现在要开新的worker进程专门处理繁重的业务,包括生成订单和保存订单等,该怎么做?是收到命令的时候实时建立进程吗?

18807 8 11
8个回答

walkor 打赏

可以在本机或者其它服务器甚至服务器集群预先建立一些任务进程处理繁重的业务,任务进程数可以开多一些,例如cpu的10倍。

任务进程服务端

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function($connection, $task_data)
{
    // 假设发来的是json数据
    $task_data = json_decode($task_data, true);
    // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
    $task_result = ......
         // 发送结果
         $connection->send(json_encode($task_result));
    };
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

在workerman中调用

use \Workerman\Connection\AsyncTcpConnection;

....

// 与远程task服务建立异步链接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://ip:12345');
// 任务及参数数据
$task_data = array(
    'function' => 'send_mail',
    'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 发送数据
$task_connection->send(json_encode($task_data));
// 异步获得结果
$task_connection->onMessage = function($task_connection, $task_result)
{
    // 结果
    var_dump($task_result);
    // 获得结果后记得关闭链接
    $task_connection->close();
};
// 执行异步链接
$task_connection->connect();

这样,繁重的任务交给本机或者其它服务器的进程去做,任务完成后会异步收到结果,BussinessWorker进程就不会阻塞了

  • alpha 2021-07-21

    你好,我使用$task_connection->connect();报错 Fatal error: Call to a member function add() on null;这个是什么原因

lt

谢谢老大的精彩解答,我明白怎么做了。
还有个小小的疑问:高并发的状态下,大量动态new AsyncTcpConnection('Text://ip:12345');会不会性能影响比较大?

  • 暂无评论
walkor 打赏

没影响

  • 暂无评论
lt

追加个问题:如果业务处理时间比较长,需不需要在异步客户端和处理进程服务端两端加心跳?分布式部署的方式

  • 暂无评论
walkor 打赏

不用

  • 暂无评论
tianmc

任务进程服务端这个程序,假如开了10个task进程,每隔task进程都在忙,是不是其它的请求就会被卡住?

  • 暂无评论
walkor 打赏

任务进程服务端这个程序,假如开了10个task进程,每隔task进程都在忙,是不是其它的请求就会被卡住?

看情况
假设有10个task进程处理耗时任务
假如workerman会收到 聊天发邮件 两种请求,发邮件 是耗时操作,如果在当前进程执行,则会影响当前进程所有的后续请求,这时我们就转到task进程去异步去运行,执行完毕后异步通知客户端。

而转发聊天消息是非常快的,workerman框架本身异步非阻塞IO不会造成任何阻塞或者卡住进程的情况,加上耗时的任务都交给了task进程处理,所以聊天消息请求不会受到任何影响,即使task进程全部在忙于发邮件

再看task进程
如果瞬时来了10个发邮件请求,那么10个task进程同时运行5秒(假设每个发邮件请求执行5秒)才能全部运行完成,那么在运行期间,如果再次收到发邮件请求,则这个请求会在task进程排队,也就是等待约5秒后才能轮到这个发邮件的请求在task进程执行,也就是这个请求从接收到处理完毕耗时约10秒,但是这个是task进程的阻塞,不会影响主流程的运行,比如不影响聊天请求

如果task进程运行的任务繁重(长时间阻塞),并且有不少的繁重任务请求量,则可以开启更多的task进程处理,比如500个task进程差不多可以每秒处理100个耗时5秒的请求,单台服务器无法满足时(通常是cpu 100%),可以增加服务器实现集群处理。

  • 暂无评论
xtype

建议使用消息队列来处理繁重而又不需要实时返回的数据的业务

  • 暂无评论
年代过于久远,无法发表回答
×
🔝