请问下各位神仙哥哥姐姐,这段代码中的定时器为什么没有正确的读取到客户端链接信息?
帮忙检阅看看,跪拜
日志:
连接ID: 2
收到客户端消息: {"type":"sub","params":"market_ti"}
订阅主题: market_ti
定时任务:广播最新数据
当前活跃客户端数: 2
检查客户端 ID: 1
客户端 1 未在连接列表中
// Redis 配置
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 创建 WebSocket Worker
$ws_worker = new Worker("websocket://0.0.0.0:2346", $context);
$ws_worker->transport = 'ssl';
$ws_worker->count = 1;
// 客户端连接逻辑
$ws_worker->onConnect = function ($connection) use ($redis, $httpClient, $ws_worker) {
echo "新连接加入\n";
echo "new connection from ip " . $connection->getRemoteIp() . "\n";
echo "连接ID: {$connection->id}\n";
// 在连接时记录连接ID
$redis->sAdd("client:{$connection->id}:connected", true); // 确保客户端连接的标识被保存
$ws_worker->connections[$connection->id] = $connection;
// 消息处理
$connection->onMessage = function ($connection, $message) use ($redis, $httpClient) {
echo "收到客户端消息: $message\n";
$msg = json_decode($message, true);
if ($msg && isset($msg['type'], $msg['params']) && $msg['type'] === 'sub') {
$theme = $msg['params'];
echo "订阅主题: $theme\n";
// 将客户端连接和主题存入 Redis
$redis->sAdd("themes:$theme:clients", $connection->id);
$redis->sAdd("client:$connection->id:themes", $theme);
// 返回历史数据
$historyData = ($theme === 'market_ti')
? fetchMergedallData($httpClient)
: fetchKlineData($theme, $httpClient, 2);
$connection->send(json_encode([
'type' => 'history',
'data' => $historyData,
]));
}
};
// 断开连接处理
$ws_worker->onClose = function ($connection) use ($redis, $ws_worker) {
echo "连接断开\n";
// 从 Redis 清除相关数据
$themes = $redis->sMembers("client:$connection->id:themes");
foreach ($themes as $theme) {
$redis->sRem("themes:$theme:clients", $connection->id);
}
$redis->del("client:$connection->id:themes");
$redis->del("client:$connection->id:connected");
// 从 WebSocket worker 的连接列表中删除该连接
unset($ws_worker->connections[$connection->id]);
};
};
// 定时任务:每 10 秒广播数据
Timer::add(10, function () use ($redis, $httpClient, $ws_worker) {
echo "定时任务:广播最新数据\n";
// 获取所有主题的客户端连接
$themes = $redis->keys("themes:*:clients");
foreach ($themes as $themeKey) {
$theme = str_replace(['themes:', ':clients'], '', $themeKey);
if ($theme === 'market_ti') {
$data = fetchMergedallData($httpClient);
} else {
$data = fetchKlineData($theme, $httpClient, 1);
$depthData = fetchdepthData($theme, $httpClient);
}
// 获取主题下的所有客户端
$clients = $redis->sMembers($themeKey);
echo "当前活跃客户端数: " . count($clients) . "\n"; // 输出当前主题下的客户端数
foreach ($clients as $clientId) {
echo "检查客户端 ID: $clientId\n";
// 从 WebSocket 连接列表中获取客户端连接
if (isset($ws_worker->connections[$clientId])) {
$connection = $ws_worker->connections[$clientId];
if ($connection->isOpened()) {
echo "找到客户端 $clientId\n";
$connection->send(json_encode([
'type' => 'update',
'kline' => $data,
'depth' => $depthData,
]));
} else {
echo "客户端 $clientId 不在线,无法发送数据\n"; // 客户端不在线
}
} else {
echo "客户端 $clientId 未在连接列表中\n"; // 客户端未找到
}
}
}
});