启 worker,在onWorkerStart 开启定时器 1秒获取 Redis 队列数据。
从log中看,消费了队列数据,偶现会没有执行。
没有用workerman-queue
$asyncJobWorker->onWorkerStart = function () use ($client) {
// 获取配置
foreach (Yii::$app->service->workermanQueue->jobConfig() as $config) {
// 订阅队列名,回调方案使用配置的 handler execute 来处理数据
// 这里的 回调,如果返回 true, 运行后会从 queue 中删除,否则 不删除
$client->subscribe($config['name'], function ($data, $jobId, $ttr, $attempt) use ($config) {
try {
// 可以理解为注入
$handler = Yii::createObject([
'class' => $config['handler'],
'task' => $data
]);
/** @var $handler JobInterface */
return $handler->execute(null);
} catch (\Exception $e) {
Yii::error('workerman queue exception:' . $e->getTraceAsString(), 'error');
return false;
}
});
}
};
/**
* 任务配置
* @return \string[][]
* @desc 每次修改 需要 重启 workerman-queue
*/
public function jobConfig()
{
return [
['name' => self::ASYNC_JOB_QUEUE_NAME, 'handler' => AsyncJob::class],
];
}
AsyncJob excute
public function execute($queue)
{
try {
$task = $this->task;
if (isset($task['type'])) {
// 根据 任务里面的 type 获取执行的方法
$functionName = lcfirst(Inflector::id2camel($task['type'], '_') . 'Execute');
$log = new AsyncJobLog();
$log->execute_function = $functionName;
$log->task = json_encode($task, JSON_UNESCAPED_UNICODE);
$log->save(false);
// 偶现 下面的不执行了
// 难道是 $this->$functionName($task); 要改成 $returnVal = call_user_func([$this, $functionName], $task);?
$returnVal = $this->$functionName($task);
if ($returnVal === true) {
$log->is_finish = 1;
} else {
$log->is_finish = 0;
$log->task_print = $returnVal;
}
$log->save(false);
} else {
Util::errorHandle(new \Exception('Async Job Execute:(' . json_encode($task, JSON_UNESCAPED_UNICODE) . ') type Not set!'));
}
} catch (\Exception $e) {
Util::errorHandle($e);
}
return true;
}
用的yii2 跟 workerman 来结合,处理 队列数据的。