目前需求是 第三方接口有请求限制
接口请求频率限制:200次/秒。
消息条数限制:12000条/分钟。按接收消息数量计算条数,若一次发送给500个用户,计作500条。每次最多500用户
目前使用方案是动态双维度限流方案,
当投递数据交小时 是不会出现错误的,较大就会
目前是4个进程
初步怀疑是不是一直投递,然后队列消费不满足规则后,又投递,导致redis响应体过大超出缓存区导致的
模拟投递
// 每次500用户)
// 预期结果:
// - 前24次请求成功(24×500=12000)
// - 后续请求自动延迟到下一分钟
// - 每秒请求数不超过200次
for ($i = 1; $i <= 100; $i++) {
$users = range(1, 500); // 每次500用户
SmartBatchProducer::push($users, $i);
}
投递代码
<?php
namespace mineout\timTool;
class SmartBatchProducer
{
const MINUTE_LIMIT = 12000; // 每分钟最大消息数
const SECOND_LIMIT = 200; // 每秒最大请求数
const MAX_BATCH_SIZE = 500; // 每批最大用户数
public static function push(array $userIds, string $message)
{
$optimalBatch = self::calculateOptimalBatch();
$batches = array_chunk($userIds, min($optimalBatch, self::MAX_BATCH_SIZE));
foreach ($batches as $key => $batch) {
\Webman\RedisQueue\Redis::connection('batchPush')->send('queue_tim_batch_push', [
'users' => $batch,
'message' => $message . '-' . $key,
]);
}
}
private static function calculateOptimalBatch()
{
$currentMinute = (int)(time() / 60);
$used = self::getMinRedis($currentMinute);
$remaining = 12000 - $used;
$elapsed = time() % 60;
$timeLeft = max(60 - $elapsed, 1); // 防除零
// QPS限制下的最大批量(每批500用户消耗24次请求配额)
$maxByQps = floor(200 * $timeLeft * 500 / 12000);
// 配额限制下的最大批量
$maxByQuota = floor($remaining * 500 / (200 * $timeLeft));
// 动态调整系数(根据时间衰减)
$decayFactor = 1 - ($elapsed / 60) * 0.5; // 时间越晚越保守
$finalBatch = min(500, max(10, min($maxByQps, $maxByQuota) * $decayFactor));
return (int)$finalBatch;
}
private static function getMinRedis($currentMinute)
{
return \Webman\RedisQueue\Redis::connection('batchPush')->get("im:min:{$currentMinute}") ?: 0;
}
}
队列消费者
<?php
namespace app\queue\timBatchPush;
use support\Log;
use Webman\RedisQueue\Consumer;
/**
* 创建批量推送队列
* @package app\queue\timGroup
*/
class TimBatchPush implements Consumer
{
public $queue = 'queue_tim_batch_push';
public $connection = 'batchPush';
const MINUTE_LIMIT = 12000; // 每分钟最大消息数
const SECOND_LIMIT = 200; // 每秒最大请求数
const MAX_BATCH_SIZE = 500; // 每批最大用户数
// Lua脚本(原子化双维度检查)
const LUA_SCRIPT = <<<LUA
--[[
腾讯IM双维度动态限流脚本(生产级优化版)
优化点:
1. 动态计算有效分片范围,避免无效查询
2. 确保dynamic_limit最小值为1
3. 精确计算最早可用时间
4. 增强边界条件处理
--]]
-- KEYS[1]: 分钟级计数器键前缀(示例:im:min)
-- KEYS[2]: 秒级分片键前缀(示例:im:win)
-- ARGV[1]: 当前时间戳(毫秒)
-- ARGV[2]: 本批次用户数量
-- ARGV[3]: 最大消息数(12000)
-- ARGV[4]: 最大请求次数(200)
-- ARGV[5]: 时间窗口(毫秒,60000)
local WINDOW_SPLIT = 200 -- 分片粒度200ms
local current_time = tonumber(ARGV[1])
local window_start = current_time - ARGV[5]
local MAX_PARTS = math.ceil(ARGV[5]/WINDOW_SPLIT) -- 计算总窗口分片数
-- ===== 分钟级检查 =====
local current_minute = math.floor(current_time / 60000)
local min_key = KEYS[1]..":"..current_minute
local min_count = tonumber(redis.call('GET', min_key) or 0)
local remaining = tonumber(ARGV[3]) - min_count
if remaining <= 0 then
return {0, 'minute',
60 - (current_time % 60000)/1000, -- 剩余时间(秒)
0, -- 当前窗口请求数
0 -- 动态限制值
}
end
-- ===== 动态窗口计算 =====
local current_part = math.floor(current_time / WINDOW_SPLIT)
local start_part = math.floor(window_start / WINDOW_SPLIT)
-- 生成有效分片范围(仅包含时间窗口内的分片)
local active_parts = {}
for i=0, MAX_PARTS-1 do
local part_id = current_part - i
if part_id * WINDOW_SPLIT >= window_start then
table.insert(active_parts, part_id)
else
break -- 超出窗口范围的分片无需处理
end
end
-- 构建分片键集合
local keys = {}
for _, part_id in ipairs(active_parts) do
table.insert(keys, KEYS[2]..":"..part_id) -- 计数键
table.insert(keys, KEYS[2]..":ts:"..part_id) -- 时间戳键
end
-- 批量获取分片数据
local responses = {}
if #keys > 0 then
responses = redis.call('MGET', unpack(keys))
end
-- 统计有效请求数
local total_reqs = 0
local oldest_valid_ts = current_time
for i=1, #responses, 2 do
local count = tonumber(responses[i]) or 0
local ts = tonumber(responses[i+1]) or 0
-- 精确时间窗口校验
if ts >= window_start then
total_reqs = total_reqs + count
if ts < oldest_valid_ts then
oldest_valid_ts = ts -- 记录最早有效分片时间
end
end
end
-- ===== 动态速率计算 =====
local time_elapsed = (current_time % 60000) / 1000
local time_left = math.max(60 - time_elapsed, 0.1)
local raw_limit = remaining / (time_left * tonumber(ARGV[2]))
local dynamic_limit = math.max(1, math.min(
tonumber(ARGV[4]),
math.floor(raw_limit + 0.5) -- 四舍五入且最小值1
))
-- ===== 限流判断 =====
if total_reqs >= dynamic_limit then
local retry_after = (oldest_valid_ts + ARGV[5] - current_time) / 1000
retry_after = math.max(retry_after, 0.1) -- 最小延迟0.1秒
return {0, 'second',
retry_after, -- 精确重试时间(秒)
total_reqs, -- 当前窗口请求数
dynamic_limit -- 动态限制值
}
end
-- ===== 通过检查,更新数据 =====
-- 更新当前分片
local current_win_key = KEYS[2]..":"..current_part
redis.call('INCRBY', current_win_key, 1)
redis.call('PEXPIRE', current_win_key, ARGV[5] + 2000)
-- 记录分片时间戳
local ts_key = KEYS[2]..":ts:"..current_part
redis.call('SET', ts_key, current_time, 'PX', ARGV[5] + 2000)
-- 更新分钟级计数
redis.call('INCRBY', min_key, tonumber(ARGV[2]))
redis.call('PEXPIRE', min_key, 61000)
return {1,
dynamic_limit,
math.floor(remaining / time_left),
total_reqs + 1,
#active_parts
}
LUA;
public function consume($data)
{
$logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
// 参数验证
if (empty($data['users']) || !is_array($data['users'])) {
throw new \InvalidArgumentException("无效的用户数据");
}
$users = $data['users'];
$userCount = count($users);
$currentTimestamp = microtime(true);
// 执行原子检查
$result = $this->getRedis()->eval(
self::LUA_SCRIPT,
[
$this->minuteKey(), // KEYS[1]
$this->secondKey(), // KEYS[2]
$currentTimestamp, // ARGV[1]
$userCount, // ARGV[2]
self::MINUTE_LIMIT, // ARGV[3]
self::SECOND_LIMIT, // ARGV[4]
60000,
],
2
);
if ($result[0] === 1) {
$this->sendToTencentIM($users, $data['message']);
$logs .= "[INFO]\t 发送成功, allowed_qps=> {$result[1]} 次/秒" . PHP_EOL;
$this->writeLog($logs);
} else {
$this->handleRateLimit($data, $result, $logs);
}
}
private function getRedis()
{
return \Webman\RedisQueue\Redis::connection('batchPush');
}
private function minuteKey(): string
{
return 'im:min:' . (int)(time() / 60);
}
private function secondKey(): string
{
$currentSecond = (int)time();
$shard = crc32((string)$currentSecond) % 16;
return "im:sec:{$currentSecond}:{$shard}";
}
private function sendToTencentIM(array $users, string $message)
{
// 实际请求代码
}
private function handleRateLimit($data, array $result, $logs)
{
$retryData = $data;
// 计算精确延迟
$delay = match ($result[1]) {
'minute' => 60 - (time() % 60) + 1,
'second' => max(ceil($result[2] - microtime(true)), 0.1),
default => 1
};
// 添加随机抖动(±5%)
$delay *= mt_rand(950, 1050) / 1000;
$delaySeconds = ceil($delay);
\Webman\RedisQueue\Redis::connection('batchPush')->send($this->queue, $retryData, $delaySeconds);
$allowed_qps = $result[3] ?? 0;
$logs .= "[WARNING]\t 触发限流, type=> {$result[1]}, allowed_qps=> {$allowed_qps} ,delay=> {$delaySeconds}" . PHP_EOL;
$this->writeLog($logs);
}
public function onConsumeFailure(\Throwable $e, $package)
{
$data = $package['data'];
$attempts = $package['attempts'];
$logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
if ($attempts == 5) {
$logs .= "[ERROR]\t 发送失败, 超过最大重试次数,不在重试" . PHP_EOL;
$logs .= "[INFO]\t 错误信息:" . $e->getMessage() . PHP_EOL;
$this->writeLog($logs);
} else {
$delay = $package['max_attempts'] * ($attempts + 1);
$logs .= "[ERROR]\t 发送失败,{$delay}秒后重试" . PHP_EOL;
$logs .= "[INFO]\t 错误信息:" . $e->getMessage() . PHP_EOL;
$this->writeLog($logs);
}
}
private function writeLog($log)
{
Log::channel('tim_batch_push')->log('info', $log);
}
}
ValueError: strpos(): Argument #3 ($offset) must be contained in argument #1 ($haystack) in /webman-v2/vendor/workerman/redis/src/Protocols/Rephp:53
RuntimeException: Protocol Workerman\Redis\Protocols\Redis Error package. package_length=-1 in webman-v2/vendor/workerman/workerman/Connection/TcpConnection.php:724
Stack trace:
系统:macos
vendor/webman/redis-queue/src/Redis.php

加三行代码
试下