webman-redis-queue
是为 Webman 框架设计的高效、灵活的 Redis
队列插件。利用 Redis Stream 的强大特性,该插件专注于提供可靠和高性能的消息队列解决方案,适合处理大规模的数据流和复杂的队列操作。
通过 Composer 安装 webman-redis-queue
:
composer require solarseahorse/webman-redis-queue:^1.0.1
删除延时消息:
新增 removeDelayedMessage
方法,允许移除一条延时消息。
批量删除延时消息:
新增 removeDelayedMessages
方法,允许一次性移除多个指定的延时消息。
检查延时消息存在性:
新增 hasDelayedMessageExists
方法,用于检查延时消息是否存在。
批量检查延时消息存在性:
新增 hasDelayedMessagesExist
方法,用于批量检查多个延时消息是否存在。
DelayedMessageRemoveException
和 DelayedMessageCheckException
异常类型。我们非常欢迎并鼓励您在测试环境中尝试这个插件,并且分享您的使用体验。您的反馈对我们改进插件、修复潜在的问题以及发布未来的稳定版本非常重要。如果您在使用过程中遇到任何问题或有任何建议,请通过 GitHub Issues
与我联系。
如果您对改进 webman-redis-queue 有兴趣,欢迎任何形式的贡献,包括但不限于:提交问题、提供反馈、或直接向代码库提交改进。您的贡献将帮助我们更快地推出稳定、功能丰富的正式版本。
配置文件自动生成在 config/plugin/solarseahorse/webman-redis-queue目录下。
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => null, // 密码,字符串类型,可选参数
'db' => 0, // 数据库
'prefix' => 'webman_redis_queue_', // key 前缀
'timeout' => 2, // Timeout
'ping' => 55, // Ping
'reconnect' => true, // 断线重连
'max_retries' => 5, // 最大重连次数
'retry_interval' => 5 , // 重连间隔 s
]
],
];
在webman集群下,每个节点需要连接同一个redis。
注意:开启此选项能增加队列运行稳定性,但如果队列进程过多,redis恢复后可能造成突发大量连接数,因为每个进程都有一个redis连接。
默认开启,当Redis发生重载
,重启
等情况会尝试重连,超过最大重试次数后会报错并重启进程(webman默认行为)。
推荐为插件配置单独日志通道,参考链接 webman日志
<?php
return [
'enable' => true, // 启用日志
'handlers' => support\Log::channel('default') // 默认通道 default
];
在队列消费业务逻辑中可以这样使用日志,使用方法和官方的Log
类使用方法一致。
LogUtility::warning('Error:', [
'data' => $consumerMessage->getData(),
'errorMessage' => $e->getMessage()
]);
<?php
return [
'send-email' => [
'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
'count' => 20, // 在目录模式中,目录下所有队列是共用进程
'constructor' => [
// 支持目录和类 推荐使用类名
'consumer_source' => \App\queue\test\Email::class
]
]
];
插件对消费类对位置没有固定要求,符合加载规范即可。
教程以app/queue/SendEmail.php
举例,目录和文件需自行创建。
继承 SolarSeahorse\WebmanRedisQueue\Consumer
,配置连接标识,并实现抽象方法consume
, 一个最基础的消费类就创建好了。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 连接标识,对应config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
}
}
编写完成后需要在队列配置文件
process.php
中新增队列配置。
通过 php webman solar:make:consumer
命令可快速创建一个消费类。
示例操作:
webman % php webman solar:make:consumer
Please enter the name of the queue: sendCode
Please enter the number of processes (default 1): 1
Please enter the path to create the class in [app/queue]: app/queue/test
最终将会在 app/queue/test
目录中创建 SendCode.php
文件。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendCode extends Consumer
{
// 连接标识,对应config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
// 消费
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
var_dump($messageId);
}
}
队列配置文件process.php
也会自动更新。
<?php
return array (
'sendCode' =>
array (
'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
'count' => 1,
'constructor' =>
array (
'consumer_source' => 'app\\queue\\test\\SendCode',
),
),
);
protected string $connection = 'default';
protected string $queueName = '';
protected string $groupName = '';
protected string $streamKey = '';
protected int $prefetchCount = 1;
protected int $blockTime = 5000;
protected float $consumerTimerInterval = 0.5;
protected int $maxAttempts = 5;
protected int $retrySeconds = 60;
protected bool $autoAck = true;
ack
方法。protected bool $autoDel = true;
protected int $delayedQueueOnceHandlerCount = 128;
protected int $delayedMessagesTimerInterval = 1;
protected int $delayedMessagesMaxWorkerCount = 1;
protected string $delayedTaskSetKey = '';
protected string $delayedDataHashKey = '';
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;
PENDING_PROCESSING_RETRY
或 PENDING_PROCESSING_IGNORE
。PENDING_PROCESSING_RETRY
当消息挂起超时会进行异常重试。PENDING_PROCESSING_IGNORE
当消息挂起超时时,触发死信处理
方便排查错误,除此之外只清理pending
列表,不做其他处理。PENDING_PROCESSING_RETRY
, 根据队列场景选择合适的处理策略,比如发送短信验证码
Redis Stream
特性,未ack的消息会在pending
列表中不会丢失,这类场景就适合配置PENDING_PROCESSING_IGNORE
protected int $pendingTimout = 300;
pending
列表。protected int $checkPendingTimerInterval = 60;
protected int $onceCheckPendingCount = 50;
通过pushMessage
方法可快速向队列投递一条消息。
/**
* @param mixed|QueueMessageInterface $data
* @return string|bool
* @throws QueueMessagePushException
*/
public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;
// 消息内容,无需序列化
$message = [
'dummy' => 'ok'
];
// 生产者工厂方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
->pushMessage($message);
// 通过消费类工厂方法 创建一个生产者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
// 投递QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 或者通过QueueMessageFactory创建一条消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);
// 修改队列数据
$message->setData(['dummy' => 'no']);
// 设置错误次数
$message->setFailCount(3);
// 通过上方两种方法投递均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
var_export($messageId); // 返回stream的字符串ID 或 false
有时候我们需要一次投递大量队列时,可以通过pushMessages
方法,批量投递消息,此方法会开启Redis
的pipeline
管道投递,提高与redis
的交互性能。
/**
* @param array|QueueMessageInterface[] $dataArray
* @return array|false
* @throws QueueMessagePushException
*/
public function pushMessages(array $dataArray): array|bool;
// 投递5w条消息
$dataArr = array_fill(0, 50000, null);
for ($i = 0; $i < 50000; $i++) {
$dataArr[$i] = ['dummy' => uniqid()];
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
// QueueMessage方式
for ($i = 0; $i < 50000; $i++) {
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);
//$message->setData(json_encode(['123']));
//$message->setFailCount(1);
// ....
$dataArr[$i] = $message;
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
var_export($messageIds); // 返回Stream消息ID列表 或 false
数组投递实际是通过数组创建一个
QueueMessage
对象
延时消息的作用:
定时任务:
延时消息可以用来实现定时任务。例如,你可能想在未来的某个时间点执行特定操作,如发送提醒、更新状态等。
延迟处理:
在某些情况下,立即处理消息并不理想或可能。延时消息允许应用程序延迟处理,直到最合适的时机。
限流:
延时消息可以帮助对系统内部的请求进行限流,防止在短时间内因大量请求而过载。
解耦和异步处理:
在复杂的系统中,延时消息可以用来解耦不同组件间的直接交互,提高系统的可扩展性和维护性。
通过 scheduleDelayedMessage
方法快速投递一条延时消息。
/**
* @param mixed|QueueMessageInterface $data
* @param int $delay
* @param string $identifier
* @return bool
* @throws ScheduleDelayedMessageException
*/
public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;
// 消息内容
$message = [
'type' => 'warning',
'to' => 'xxxx@email.com',
'content' => '.....'
];
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);
// QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 设置延时
$message->setDelay(60);
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 使用第二个参数会替换之前对象的延时设置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);
如果我们想避免消息被重复发送等情况,通过延时队列的特性可以很简单实现。通过scheduleDelayedMessage
方法的第三个参数identifier
传递一个自定义的延时消息ID,同样的消息ID,消息将会被替换,延时时间从修改开始重新计算。
如果消息已经进入stream队列将无法实现替换,必须在延时时间内,类似实现一个“防抖”效果,消息在时间段内发送多次最终只处理一次。
// 消息内容
$message = [
'type' => 'warning',
'to' => 'xxxx@email.com',
'content' => '.....'
];
// 通过type,to参数生成一个唯一ID
$identifier = md5(serialize([
'type' => 'warning',
'to' => 'xxxx@email.com',
]));
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);
// QueueMessage对象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 设置延时
$message->setDelay(60);
// 设置identifier
$message->setIdentifier($identifier);
// 投递一条延时消息 60秒后处理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 传递参数会替换对象之前的延时和ID设置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);
当一次需要投递大量延时消息时,可以通过scheduleDelayedMessages
方法发送。
// 投递10w条延时消息
$dataArr = array_fill(0, 100000, null);
for ($i = 0; $i < 100000; $i++) {
$dataArr[$i] = [
'delay' => 2, // 延时时间
'data' => ['dummy' => uniqid()], // 队列数据
'identifier' => '' // 自定义ID
];
}
// 批量投递
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
// QueueMessage对象
for ($i = 0; $i < 100000; $i++) {
$message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);
// 设置延时
$message->setDelay(60);
// 设置identifier
$message->setIdentifier('');
$dataArr[$i] = $message;
}
// 批量投递
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
多redis只需要在队列配置
connection
连接标识,投递方式没有任何变化。
新功能 (v1.0.1)
以下功能在插件的
v1.0.1
版本中新增。
有时候我们想移除某个或多个延时队列时,可以使用removeDelayedMessage
和removeDelayedMessages
方法实现,使用hasDelayedMessageExists
和hasDelayedMessagesExist
判断一条或多条延时消息是否存在。
只有任务还存在延时队列中才能移除,如果已经进入
Stream
队列中将无法移除。
/**
* @param string $identifier
* @return bool
*/
public function removeDelayedMessage(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function removeDelayedMessages(array $identifiers): array|bool;
/**
* @param string $identifier
* @return bool
*/
public function hasDelayedMessageExists(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function hasDelayedMessagesExist(array $identifiers): array|bool;
代码示例:
$consumer = new app\queue\test\SendEmail();
$queueProducer = $consumer::createQueueProducer();
// 添加一条延时消息 通过业务数据生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');
// 通过QueueMessage对象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);
$queueMessage->setDelay(60);
// 自定义消息ID 不设置将默认生成 通过getIdentifier()获取
$queueMessage->setIdentifier('test_id');
// 获取消息ID
$id = $queueMessage->getIdentifier();
// 判断消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier')); // true or false
// 移除一条延时消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false
// 判断多条消息是否存在 返回一个数组
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));
//.array (
// 0 => 1706383223.0,
// 1 => 1706383223.0,
// 2 => false
//).
// 移除多条延时消息 返回一个数组
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));
//
//.array (
// 0 => 1,
// 1 => 1,
// 2 => 0
//).
消费消息时会调用消费类的consume
方法,并传递一个实现ConsumerMessageInterface
接口对象。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 连接标识,对应redis.php的配置 默认default
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
// 禁用错误重试 如果消费失败将不会异常重试
$consumerMessage->disableFailRetry();
// 手动触发错误重试,此方法会调用disableFailRetry方法,所以后续报错不会再触发异常重试。
// 没有禁用错误重试的情况下,消费异常默认会调用此方法。
$consumerMessage->triggerError(new \Exception('triggerError'));
// 监听消费异常事件
$consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
// 这里可以处理消费异常逻辑
// 禁用错误重试
$consumerMessage->disableFailRetry();
// 添加日志等等
// 如果在消费方法中自行捕获 Throwable 此事件不会触发
});
// 业务逻辑执行完毕,ack确认消息 默认自动ack,但通常建议在业务逻辑中显式调用,比如ack失败进行事务回滚等等。
$isAcked = $consumerMessage->ack();
if (!$isAcked) {
}
// 或通过getAckStatus方法获取结果
if (!$consumerMessage->getAckStatus()) {
}
// 获取原始队列消息 QueueMessage对象
$queueMessage = $consumerMessage->getQueueMessage();
// 获取消息错误次数...
$failCount = $queueMessage->getFailCount();
// 更多...
}
}
上方示例主要演示可调用的方法,下面使用一个更加贴合实际的demo,更快了解消费业务逻辑的编写。
场景特点:获取验证码的操作一般由用户手动触发,在这类场景中,错误重试应用户在前端UI倒计时结束后重新手动发起,如果业务出现崩溃,再次上线后重新发送验证码给用户已经没有意义了。我们可以通过配置适应这类场景,代码示例:
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 连接标识,对应redis.php的配置 默认default
protected string $connection = 'default';
// 将pending处理策略调整为PENDING_PROCESSING_IGNORE 消息挂起超时将不会进行重试
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
// 监听异常
$consumerMessage->onError(function (\Throwable $e){
// 记录邮件发送失败日志
});
// 禁用重试
$consumerMessage->disableFailRetry();
// 发送一封邮件 ....
// 确认消息
$consumerMessage->ack();
}
}
消费类继承的抽象类Consumer
默认实现了handlerFailRetry
方法,在触发异常重试时,会调用此方法,如果您想自定义错误重试逻辑,或加入更多自定义的处理,在本插件中可以轻松实现,并且每个队列都支持自定义配置。
/**
* 处理错误重试
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return bool
* @throws ScheduleDelayedMessageException
* @throws RedisException
* @throws Throwable
*/
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
$queueMessage = $consumerMessage->getQueueMessage();
// 检查是否超过最大重试次数
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信处理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
$queueMessage->incrementFailCount(); // Fail count + 1
// 计算下次重试时间
$retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;
// 更新下次重试时间
$queueMessage->updateNextRetry($retrySeconds);
// 设置消息延时
$queueMessage->setDelay($retrySeconds);
// 设置消息ID 避免重复任务
$queueMessage->setIdentifier($messageId);
// 重新发布至延时队列
return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
}
默认实现的代码如上,我们只需要重写此方法就可以自定义错误处理的业务逻辑。
代码示例:
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;
class SendEmail extends Consumer
{
// 连接标识,对应redis.php的配置 默认default
protected string $connection = 'default';
// 将pending处理策略调整为PENDING_PROCESSING_IGNORE 消息挂起超时将不会进行重试
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 获取消息ID
$messageId = $consumerMessage->getMessageId();
// 获取队列数据
$data = $consumerMessage->getData();
// 监听异常
$consumerMessage->onError(function (\Throwable $e){
// 记录邮件发送失败日志
});
// 禁用重试
$consumerMessage->disableFailRetry();
// 发送一封邮件 ....
// 确认消息
$consumerMessage->ack();
}
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
// 不改动原本的错误处理 也可以完全自定义实现。
parent::handlerFailRetry($messageId, $consumerMessage, $e);
// 如果队列在业务数据库中还有一个tasks表进行调度,在这里可以更新task数据 比如 错误次数+1
}
}
在handlerFailRetry
方法中,默认有这一段:
// 检查是否超过最大重试次数
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信处理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
那么,我们如果需要自定义死信处理或加入额外的业务逻辑可以通过重写handlerDeadLetterQueue
方法实现。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
当我们设置pending处理策略为PENDING_PROCESSING_IGNORE
时,消息如果挂起超时,将不会触发异常重试,而是直接调用死信处理。默认情况下,死信处理会新增一条日志,方便排查问题。
默认情况下需要配置有效的日志(log.php) 默认行为才有效。也可以通过重写方法完全自行实现,记录在业务的数据库中,这也是推荐的做法,可以针对业务实现更加灵活的异常处理。
/**
* 处理死信 超过最大重试次数或pending超时PENDING_PROCESSING_IGNORE策略 会调用此方法
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return void
*/
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
$queueMessage = $consumerMessage->getQueueMessage();
// 添加日志
LogUtility::warning('dead_letter_queue: ', [
'messageId' => $messageId,
'message' => $queueMessage->toArray(),
'failCount' => $queueMessage->getFailCount(),
'errorMsg' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
// 更多...
}
代码示例:
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
// 保持默认行为
parent::handlerDeadLetterQueue($messageId, $consumerMessage, $e); // TODO: Change the autogenerated stub
// 如果队列在业务数据库中还有一个tasks表进行调度,在这里可以更新task数据
}
抽象类Consumer
中,默认定义了handlerPendingTimeoutMessages
方法,用于处理pending超时的消息。
消费者读取了一条消息后,消息会进入pending
列表,不会被当前和其他消费者再次读取,当业务逻辑没有执行完毕,服务出现掉线,崩溃时,消息并没有ack
,消息会一直保存在pending
列表中,pending
列表只能通过ack
移除,如果长期不处理,可能造成pending
列表堆积,造成大量内存占用,当持续时间大于$pendingTimout
属性的时间(默认300秒),会调用此方法进行处理。
默认情况下,在
PENDING_PROCESSING_IGNORE
策略中,我们认为pending超时消息是死信,不会再次处理,PENDING_PROCESSING_RETRY
会进行异常重试。
/**
* 处理消息挂起超时 当pending列表中有超时未ack的消息会触发此方法
* @param string $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param string $consumerName
* @param int $elapsedTime
* @param int $deliveryCount
* @return void
* @throws RedisException
* @throws ScheduleDelayedMessageException
* @throws Throwable
*/
public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
{
switch ($this->getPendingProcessingStrategy()) {
case self::PENDING_PROCESSING_IGNORE: // 忽略pending超时
// 确认消息
$consumerMessage->ack();
// 触发死信处理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
'PENDING_PROCESSING_IGNORE: Message pending timeout.'
));
break;
case self::PENDING_PROCESSING_RETRY: // pending超时重试
// 触发死信处理
if ($deliveryCount + 1 > $this->getMaxAttempts()) {
// ack消息
$consumerMessage->ack();
$this->handlerDeadLetterQueue(
$messageId,
$consumerMessage,
new Exception(
'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
));
return;
}
// 处理重试
$handlerStatus = $this->handlerFailRetry(
$messageId,
$consumerMessage,
new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
);
if ($handlerStatus) {
$consumerMessage->ack();
}
break;
}
}
有时候我们需要操作或维护队列时,可以直接获取队列的Redis连接进行操作,比如编写自定义脚本等。
// 获取队列的Redis连接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis扩展一致
$redisConnection->xLen();
$redisConnection->sAdd();
// 在消费类中可以直接使用$this->getRedisConnection();
....更多
php webman solar:make:consumer
php webman solar:remove:consumer
php webman solar:clean:redis:data
php webman solar:consumer:list
Key
标识Handler
进程类Count
进程数Consumer
消费者类名Stream Length
当前队列总长度(不包含Pending列表中的数量)Delay Set Length
当前延时队列任务数Pending List Length
当前Pending列表长度+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| Key | Handler | Count | Consumer | Stream Length | Delay Set Length | Pending List Length |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| SendCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20 | app\queue\test\SendCode | 1996 | 950 | >=500 |
| SendEmail | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20 | app\queue\test\SendEmail | 0 | 0 | 0 |
| SendSmsCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 1 | app\queue\SendSmsCode | 0 | 0 | 0 |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
使用场景:
stream
中,一般少量数据时我们无需在意,但如果堆积数量过大可能造成内存占用和性能问题。当
autoDel
属性为true
时,消息会自动删除,无法对历史数据进行处理和分析,如果业务需要对历史队列消息进行回溯请设置为false
代码示例:
这里我们使用了webman
中自定义脚本
的编写,可以将脚本加入定时任务中,定期清理或处理历史消息。
下方代码只是示例,请确保在测试环境充分测试。
<?php
use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../support/bootstrap.php';
// 获取队列的Redis连接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis扩展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示从 Stream 的最开始读取
$end = '+'; // 表示读取到 Stream 的最末尾
$count = 100; // 指定要读取的消息数量
// 读取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);
$deleteMessageIds = [];
foreach ($messages as $messageId => $message) {
// 解析原始消息内容
$messageArr = QueueMessage::parseRawMessage($message);
if (!$messageArr) { // 未知消息
$deleteMessageIds[] = $messageId;
continue;
}
// 转换为QueueMessage方便操作
$queueMessage = QueueMessage::createFromArray($messageArr);
// 通过获取消息时间戳,如果消息已经存在超过1个小时 标记删除。
if (time() - $queueMessage->getTimestamp() > 3600) {
$deleteMessageIds[] = $messageId;
}
}
// 批量删除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);
目前插件没有实现在其他项目投递的标准实现,可通过业务需求开发队列提交接口实现。
我想使用这个插件,不知道这个功能完善不?有知道的朋友简单说两句的吗?
感觉好麻烦呀 看都看了半天 这用起来 感觉得学习一段时间呀
我本意是想使用专业一点的消息队列,但又不想用【rabbitmq、Kafka、rocketmq等】,就想用redis的stream实现。就不知道本文提到的这个插件【webman-redis-queue】咋样,坑多不多?
我以前都是简单使用redis的list实现消息队列的
说实话 他这种 应该没有经过实际检验 我建议先别用 等成熟再用 mq 还是 rabbitmq kafka 等专业 处理数据不会丢失 不会有坑的,要是数据量不大 用群主的 redis list 的 插件队列 redis设置成每次更新数据都持久化 丢失数据的概率极低 我们每天处理几百万的数据 都不丢失 性能也够用 rabbitmq 啥的你得自己实现消费者代码
好,那还是用walkor大佬的,谢谢
不用客气,webman你们生产使用了嘛
正在使用,也在其他同事朋友,知乎掘金等平台向别人安利
现在又遇到定时任务重复执行的问题,在踩坑填坑
哦哦,挺好用的 我们生产用了两年多了 ,嘎嘎稳定 性能没得说
简单+ 高性能,必须点赞webman
这个我们解决了 ,你用的crontab嘛 redis分布式锁 可以实现 咋说呢 网络稳定 是没任何问题的
好的。我是用的workerman/crontab。我这感觉是前2天配置的定时任务,没有在重启webman进程中杀掉,一直在某个地方运行,我再看看
我们是在 config 里面增加一个task.php 配置文件 <?php
//0 1 2 3 4 5
//| | | | | |
//| | | | | +------ day of week (0 - 6) (Sunday=0)
//| | | | +------ month (1 - 12)
//| | | +-------- day of month (1 - 31)
//| | +---------- hour (0 - 23)
//| +------------ min (0 - 59)
//+-------------- sec (0-59)[可省略,如果没有0位,则最小时间粒度是分钟]
return [
//定时日志打包
'logPack' => [
'enable' => true, //开关
'rule' => '/1 ', //任务执行表达式
'parameter' => [
3,//保留天数
['log'],//打包的文件后缀
['workerman.log'],//排除日志文件
'holdLog.zip'//打包的包名
], //调用任务参数
'class' => \lib\Task::class, //使用的类
'method' => 'logPack', //方法名
'queue' => false,//是否使用队列处理
'only_ones' => true,//同一时刻是否只执行一次
],
];
然后 在process进程里面 Task.php <?php
//处理定时任务进程
namespace process;
use Workerman\Crontab\Crontab;
class Task
{
protected $queueName = 'task';//处理任务的队列名称
public function onWorkerStart()
{
//获取任务列表
$taskList = config('task', []);
foreach ($taskList as $key => $item) {
if(!$item['enable']){
continue;
}
new Crontab($item['rule'], function () use ($key, $item) {
$onlyOnes = $item['only_ones'] ?? false;
if($onlyOnes){
$ttl = 1000;
$arr = explode(' ', $item['rule']);
if(count($arr) == 6 && $arr[0] == '*/1'){
$ttl = 600;
}
if(!\lib\RateLimiting::getLock($key, $ttl)){
return false;
}
}
$queue = $item['queue'] ?? false;
//如果开启队列处理
if($queue){
queueSend($this->queueName, $item);
}else{
task($this->queueName, $item);
}
});
}
}
}
最后在执行对应的方法实现的
你这是魔改了,我就是用的官网上基础的操作。。。好的,我再看下,谢谢
哈哈 也不是魔改 就是简单封装了一下,我们之前是多个机器部署 每个任务 都不一样 有的需要 每个机器都执行 有的就需要有一个机器执行就行,而且还要防止 有机器挂了 任务还得跑
你的业务复杂些,需要封装处理,我这边还是比较简单直接的