定时器为什么没读取到connect的数据呢

jimi2025

问题描述

请问下各位神仙哥哥姐姐,这段代码中的定时器为什么没有正确的读取到客户端链接信息?
帮忙检阅看看,跪拜
日志:
连接ID: 2
收到客户端消息: {"type":"sub","params":"market_ti"}
订阅主题: market_ti
定时任务:广播最新数据
当前活跃客户端数: 2
检查客户端 ID: 1
客户端 1 未在连接列表中

// Redis 配置
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 创建 WebSocket Worker
$ws_worker = new Worker("websocket://0.0.0.0:2346", $context);
$ws_worker->transport = 'ssl';
$ws_worker->count = 1;

// 客户端连接逻辑
$ws_worker->onConnect = function ($connection) use ($redis, $httpClient, $ws_worker) {
    echo "新连接加入\n";
    echo "new connection from ip " . $connection->getRemoteIp() . "\n";
    echo "连接ID: {$connection->id}\n";

    // 在连接时记录连接ID
    $redis->sAdd("client:{$connection->id}:connected", true);  // 确保客户端连接的标识被保存
    $ws_worker->connections[$connection->id] = $connection;

    // 消息处理
    $connection->onMessage = function ($connection, $message) use ($redis, $httpClient) {
        echo "收到客户端消息: $message\n";
        $msg = json_decode($message, true);
        if ($msg && isset($msg['type'], $msg['params']) && $msg['type'] === 'sub') {
            $theme = $msg['params'];
            echo "订阅主题: $theme\n";

            // 将客户端连接和主题存入 Redis
            $redis->sAdd("themes:$theme:clients", $connection->id);
            $redis->sAdd("client:$connection->id:themes", $theme);

            // 返回历史数据
            $historyData = ($theme === 'market_ti') 
                ? fetchMergedallData($httpClient)
                : fetchKlineData($theme, $httpClient, 2);

            $connection->send(json_encode([
                'type' => 'history',
                'data' => $historyData,
            ]));
        }
    };

    // 断开连接处理
    $ws_worker->onClose = function ($connection) use ($redis, $ws_worker) {
        echo "连接断开\n";

        // 从 Redis 清除相关数据
        $themes = $redis->sMembers("client:$connection->id:themes");
        foreach ($themes as $theme) {
            $redis->sRem("themes:$theme:clients", $connection->id);
        }
        $redis->del("client:$connection->id:themes");
        $redis->del("client:$connection->id:connected");

        // 从 WebSocket worker 的连接列表中删除该连接
        unset($ws_worker->connections[$connection->id]);
    };
};

// 定时任务:每 10 秒广播数据
Timer::add(10, function () use ($redis, $httpClient, $ws_worker) {
    echo "定时任务:广播最新数据\n";

    // 获取所有主题的客户端连接
    $themes = $redis->keys("themes:*:clients");

    foreach ($themes as $themeKey) {
        $theme = str_replace(['themes:', ':clients'], '', $themeKey);
        if ($theme === 'market_ti') {
            $data = fetchMergedallData($httpClient);
        } else {
            $data = fetchKlineData($theme, $httpClient, 1);
            $depthData = fetchdepthData($theme, $httpClient);
        }

        // 获取主题下的所有客户端
        $clients = $redis->sMembers($themeKey);
        echo "当前活跃客户端数: " . count($clients) . "\n";  // 输出当前主题下的客户端数

        foreach ($clients as $clientId) {
            echo "检查客户端 ID: $clientId\n";

            // 从 WebSocket 连接列表中获取客户端连接
            if (isset($ws_worker->connections[$clientId])) {
                $connection = $ws_worker->connections[$clientId];
                if ($connection->isOpened()) {
                    echo "找到客户端 $clientId\n";
                    $connection->send(json_encode([
                        'type' => 'update',
                        'kline' => $data,
                        'depth' => $depthData,
                    ]));
                } else {
                    echo "客户端 $clientId 不在线,无法发送数据\n";  // 客户端不在线
                }
            } else {
                echo "客户端 $clientId 未在连接列表中\n";  // 客户端未找到
            }
        }
    }
});
58 0 0
0个回答

×
🔝