各位吴彦祖们,求限制QPS思路

MarkGo

背景:
对方提供了个产品信息接口,只支持拉操作,限制QPS为20,产品规格和价格都可能实时变化,现在设计思路就是通过Timer把该接口的产品每隔5分钟丢去队列中,再通过队列数量来进行拉取。

问题:
队列数量设置为10;curl中配合usleep通过记录上次请求时间毫秒数来计算延迟请求,控制每次CURL相隔500毫秒。
但是对方监控中发现,实际请求QPS会达到30多或40,后来通过减少队列数量到5,QPS才保持在20以内,
请问问各位吴彦祖对于这类限制QPS查询频率的有没什么好的方法实现?

补充
感谢各位吴彦祖的回答,我现在实现方法类似:

Task.php

<?php

namespace process;

use support\Db;
use support\Redis;
use Webman\RedisQueue\Client;
use Workerman\Timer;

class Task
{
    public function onWorkerStart()
    {
        Timer::add(30 * 60,function(){
            $rs = Db::table('xxxxx')
            ->where(......)
            ->get();
            foreach ($rs as $row){
                /*防止上一轮任务没跑完又重新添加进来了
                */
                $key = 'apiName_'.$row->key;
                $isInQueue = Redis::get($key);
                if($isInQueue) continue;
                Redis::set($key,1);

                Client::send('apiSync',['key'=>$row->key]);
            }
        });
    }
}

Queue.php

<?php

namespace app\queue\redis;

use app\lib\helper;
use support\Redis;
use Webman\RedisQueue\Consumer;

class apiSync implements Consumer
{
    public $queue = 'apiSync';
    public $connection = 'default';

    public function consume($data)
    {
        $key = $data['key'];

        for($i = 0;$i<=15;$i++){
            $checkInDate = date('Y-m-d',strtotime("+ ${i} day"));
            helper::updatePrice($key);
        }
        Redis::del('apiName_'.$key);
        return true;
    }
}

helper:

<?php
namespace app\lib;

use Exception;

class apisync
{
    const url = 'xxxxx';
    private static $lstReqTime = 0;

    public static function updatePrice($key){
        return self::req('/xxxxxx',['key'=>$key],1000);
    }

    public static function req(string $uri,array $body=[],$qpsLimit=false): response
    {
        $reqTime = msectime();
        if($qpsLimit !== false){
            $lessTime = $reqTime - self::$lstReqTime*1;
            if($lessTime < $qpsLimit){
                usleep(($qpsLimit-$lessTime)*1000);
            }
        }
        self::$lstReqTime = msectime();

        $jsonStr = json_encode($body,JSON_UNESCAPED_UNICODE);
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_URL, self::url . $uri);
        curl_setopt($ch, CURLOPT_POSTFIELDS, $jsonStr);
        curl_setopt($ch, CURLOPT_TIMEOUT, 5);
        curl_setopt($ch, CURLOPT_ENCODING, 'gzip,deflate');
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_HTTPHEADER, array(
            'Content-Type: application/json; charset=utf-8',
            'Content-Length: ' . strlen($jsonStr)
        )
                   );
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        return new response($httpCode,$response);
    }
}

实现过程:
通过Timer设定30分钟,全量查一次需要拉取的产品,丢入redis队列中,其中防止重复入列,所以增加了个判断任务是否存在于队列中。
队列发起请求的时候,会计算本次请求毫秒数 - 上次请求毫秒数,如果大于1秒,则执行查询,否则通过usleep来延迟本次查询。

但是现在实际出现情况是:
开5个redis消费,设定查询间隔是1000毫秒,实际产生QPS会达到10~20,理论应该QPS是3~8吧?毕竟usleep并不是那么精准。
另外就是redis队列确实会堆积,还有通过php start.php status -d 会看到队列一堆busy,几分钟后status抛出错误退出了。

1447 2 0
2个回答

chen
$key = date('YmdHis');
$redis->incr($key);
if ($redis->get($key) >= 20) {
  // 延迟1s执行
}
  • MarkGo 2022-03-31

    你这......

  • damao 2022-03-31

    不要延迟,那样会堆积请求,超过20次就直接返回上一次结果。

onetobig

也是用 redis,稍微调整下

$key = 'lock:queue';
while($r = \support\Redis::zIncrBy($key, 1, time())) {
    if ($r > 20) {
        sleep(1);
        continue; // 请求未发出,继续循环
    }
    // do something
    break; // 请求已发出,结束循环
}
  • 暂无评论
年代过于久远,无法发表回答
×
🔝