用户提交了一万条左右的数据更新操作,用了一个A异步队列将数据放进去,在A队列中,将一万条数据循环放入B异步队列,奇怪的事情发生了,B异步可能只有几十条任务,死活这一万条数据加不进B队列。
尝试去掉A队列,直接循环将一万条数据丢入B队列,结果成功了。
//redis-queue/redis
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => null, // 密码,字符串类型,可选参数
'db' => 2, // 数据库
'prefix' => '', // key 前缀
'max_attempts' => 3, // 消费失败后,重试次数
'retry_seconds' => 5, // 重试间隔,单位秒
'wait_timeout'=>8640000,
'connect_timeout'=>86400,
]
],
];
//redis-queue/process
return [
'consumer' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 1, // 可以设置多进程同时消费
'constructor' => [
// 消费者类目录
'consumer_dir' => app_path() . '/queue/redis'
],
'user' => 'www',
'group' => 'www',
],
];
// A队列
public $queue = 'keyword-queue';
// 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
public $connection = 'default';
// 消费
public function consume($data)
{
foreach ($data['keywords'] as $key=>$keyword) {
// 队列名
$queue = 'zhishu-queue';
// 数据,可以直接传数组,无需序列化
dump($keyword.' '.$key);
// 投递消息
Client::send($queue, ['keyword' => $keyword, 'type' => $data['type'], 'id' => $data['id'],'status'=>$data['status']]);
}
}
//B队列
// 要消费的队列名
public $queue = 'zhishu-queue';
// 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
public $connection = 'default';
// 消费
public function consume($data)
{
$lock = Cache::lock(KeywordsService::LOCK_BY_ID . $data['id'], 60);
$process_task_key = snowflake_id();
try {
$lock->block(60);
// 等待最多 5 秒后获得的锁...
$keyword = KeywordsService::getById($data['id']);
if ($keyword->status == 1 && $data['status'] == $keyword->status) {
$keyword->current += 1;
if ($keyword->current >= $keyword->total) {
$keyword->status = 2;
}
$keyword->save();
KeywordsService::saved($keyword, $process_task_key);
}
$lock?->release();
delete_process_key($process_task_key);
} catch (\Throwable $exception) {
$lock?->release();
delete_process_keys($process_task_key);
throw $exception;
}
"workerman/webman-framework": "^1.5.0"
文档有说异步投送原理,异步投递是先把消息放在本地内存里,等进程空闲时发送给redis。
如果数据还没投送完进程退出就会导致数据丢失。
还有如果你的进程有lock相关的操作,我想也会影响投递,因为进程被你lock的时间段里,内存中的消息无法投递到redis。
感觉你应该用同步投递,文档说重要数据应该用同步投递。
嗯 你说的有道理 我后面想了一下 上万条数据放内存里可能也受PHP内存参数控制,目前已经改成同步方式放入