有大佬实现过不阻塞的定时器服务吗

lcmg

问题描述

比如我有很多定时器,有指定时间执行的比如每天0点28分执行【28 0 】,有固定间隔时间段执行的比如每五分钟执行异常【0 /1 】,还可以动态的添加管理这些定时器

为此你搜索到了哪些方案及不适用的原因

有参考过插件市场的yzh52521/webman-task,启动两个服务,一个提供http请求处理添加删除操作,一个启动自定义进程来实现定时任务的执行,在内部http和自定义进程通讯来进行数据交互
但是原生的Crontab定时任务是阻塞执行的,如果有一个执行很久会导致剩下的不能再对应时间执行,阻塞期间http服务与自定义进程之间的通讯还会挂起
然后试过使用swoole做事件循环,在new Crontab的callback中使用go(function(){}),这样确实可以实现不阻塞的执行定时程序,也不会阻塞自定义进程的通讯服务,但是随着定时程序的数量、耗时增多,后面查看日志会发现时不时的exit with status 11、exit with status 65280报错,然后进程自动重启,在runtimes下面也看不到具体的报错日志
截图

后面又试过直接使用swoole开启tcp服务,在workerStart里面初始化定时器,在receive里面接受http的请求来管理定时器执行,因为swoole不支持linux的crontab表达式,webman的实现是生成一个回调函数,拿到下一分钟要执行的定时任务依次Timer::add生成定时器,我把Timer::add替换成\Swoole\Timer::after就可以在swoole中使用crontab表达式,执行的时候也是Co::create()执行的,然后发现定时器可以正常按时间执行,但是定时器执行后tcp服务就被挂起了,无法和http交互
截图

请问各位大佬有类似实现过对应功能吗,怎么解决支持秒级的定时器执行,并且可以进行通讯、定时任务执行期间不阻塞服务的

480 4 0
4个回答

six

架构一般是一个定时进程,一堆http业务进程。
定时进程负责定时触发任务,使用workerman/http-client异步调用,这样定时进程不会有任何阻塞操作。
http业务进程负责处理具体的定时任务,任务本身是否阻塞没有太大影响,只要http进程足够就行。

  • 暂无评论
小Z先生

可以考虑使用队列进行业务逻辑处理,定时器只用于发送队列消息,这样就不需要考虑定时器的阻塞问题了

  • 暂无评论
软饭工程师

使用php协程,参考文档
https://www.workerman.net/a/1723
https://ripple.cloudtay.com/docs/basic/defer/
在定时器里面用协程执行任务,定时器会立即返回,开始执行下一个定时器,并不会阻塞,这样解决了多个定时器互相阻塞的问题

  • latin 9天前

    协程还是当前进程执行,定时任务里如果是阻塞调用,整体进程还是阻塞的。这样就协程没有作用

  • 胡桃 9天前

    Fiber/Revolt 那一套遇到死循环确实只能傻眼,但是 Swoole 有抢占式调度。

  • lcmg 9天前

    后续还是用swoole实现了,swoole开启的tcp服务设置运行协程和一键协程化之后,tcp的通讯服务和定时任务就互不阻塞了。其中最主要的问题还是用swoole作为webman的eventloop之后,看起来是互相冲突导致webman子进程重启,没这个问题的话两个一起用还是挺方便的
    $server = new Server($host, (int)$port, SWOOLE_PROCESS);
    // 设置服务器为异步非阻塞模式
    $server->set([
    'worker_num' => 1,
    'enable_coroutine' => true,
    'max_coroutine' => 3000,
    'daemonize' => true,
    'log_file' => runtime_path() . '/logs/swoole.log',
    'log_date_format' => '%Y-%m-%d %H:%M:%S',
    'log_rotation' => SWOOLE_LOG_ROTATION_DAILY,
    'display_errors' => true,
    'hook_flags' => SWOOLE_HOOK_ALL,
    ]);
    // 当有客户端数据到达时
    $server->on('receive', function (Server $server, $fd, $reactor_id, $data) {
    Co::create(function () use ($server, $fd, $data) {
    // echo "Received data from client {$fd}: {$data}\n";
    try {
    $data = json_decode($data, true);
    $method = $data['method']?:'';
    $args = $data['args']?:[];
    $res = $this->onReceive($method,$args);
    $server->send($fd,$res);
    }catch (\Exception $exception){
    $server->send($fd,json_encode(['code' => 0, 'msg' => $exception->getMessage() ]));
    }
    });
    });

efnic

插件市场:https://www.workerman.net/app/view/cron
最关键的逻辑是使用composer require symfony/process

×
🔝