我在onconnect的时候订阅个事件 但是总有connection会收不到

wrkingcs
$ws_worker->onConnect  = function ($connection){
    global $ws_worker;
    $key = 'connection_'.SERVER_ID.'_'.$ws_worker->id.'_'.$connection->id;

//111111111111111111111111111111111111111111111111
    Channel\Client::on($key, function($event_data)use($connection){
        //222222222222222222222222
                Channel\Client::on('aaaaaa', function($multi_data)use($connection){

                    $connection->send('订阅的aaa事件');

                });    

            $connection->send(‘111111111处订阅的事件’);
    });
};

在onstart中 给每个connection订阅$key事件 $key中是唯一的 在以后中在222222处订阅群发aaaaa事件 但是每次只有最后一个连接可以接受到aaaaaa事件 为什么

3313 3 0
3个回答

walkor 打赏

aaaaaa事件被反复覆盖,最后只剩下最后一个是有效的

  • 暂无评论
wrkingcs
$ws_worker->onConnect  = function ($connection){
    global $ws_worker;
    $key = 'connection_'.'_'.$ws_worker->id.'_'.$connection->id;
    var_dump($key);
    $connection->lastMessageTime = time();
    Channel\Client::on($key, function($event_data)use(&$connection){
            $connection->send('这个是单发'.date('Y-m-d H:i:s'));
    });
    Channel\Client::on('aaa', function($multi_data)use(&$connection){
        echo "\n2\n";
        $connection->send($multi_data);
    });
};

我这样在 onconnet中订阅aaa事件 不是每个connection都可以接受到这个aaa事件吗 目前我测试貌似只有2个可以收到 再多的链接就收不到aaa

  • 暂无评论
walkor 打赏

你的代码是在onconnet中让Channel\Client订阅aaa事件,而不是每个connection订阅aaa事件。Channel\Client订阅aaa事件是一个全局事件,代码只不过是不停的更改Channel\Client订阅aaa事件的回调函数,但只有最后一个回调有效。

另外不停的给每个connection添加'connection_'.'_'.$ws_worker->id.'_'.$connection->id事件,链接关闭时又不解除事件监听,会导致对应事件所占内存一直被占用,内存越来越大,形成内存泄露。

群发做法类似下面做法:

require_once __DIR__ . '/../Workerman/Autoloader.php';
require_once __DIR__ . '/Channel/src/Server.php';
require_once __DIR__ . '/Channel/src/Client.php';
use Workerman\Worker;

$channel_server = new Channel\Server('0.0.0.0', 2206);

$worker = new Worker('websocket://0.0.0.0:1234');
// 全局群组到链接的映射数组
$group_con_map = array();
$worker->onWorkerStart = function(){
    // Channel客户端连接到Channel服务端
    Channel\Client::connect('127.0.0.1', 2206);

    // 监听全局分组发送消息事件
    Channel\Client::on('send_to_group', function($event_data){
        $group_id = $event_data;
        $message = $event_data;
        global $group_con_map;
        var_dump(array_keys($group_con_map));
        if (isset($group_con_map)) {
            foreach ($group_con_map as $con) {
                $con->send($message);
            }
        }
    });
};
$worker->onMessage = function($con, $data){
    // 加入群组消息{"cmd":"add_group", "group_id":"123"}
    // 或者 群发消息{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}
    $data = json_decode($data, true);
    var_dump($data);
    $cmd = $data;
    $group_id = $data;
    switch($cmd) {
        // 链接加入群组
        case "add_group":
            global $group_con_map;
            // 将链接加入到对应的群组数组里
            $group_con_map = $con;
            // 记录这个链接加入了哪些群组,方便在onclose的时候清理group_con_map对应群组的数据
            $con->group_id = isset($con->group_id) ? $con->group_id : array();
            $con->group_id = $group_id;
            break;
        // 群发消息给群组
        case "send_to_group":
            // Channel\Client给所有服务器的所有进程广播分组发送消息事件
            Channel\Client::publish('send_to_group', array(
                'group_id'=>$group_id,
                'message'=>$data
            ));
            break;
    }
};
// 这里很重要,链接关闭时把链接从全局群组数据中删除,避免内存泄漏
$worker->onClose = function($con){
    global $group_con_map;
    // 遍历链接加入的所有群组,从group_con_map删除对应的数据
    if (isset($con->group_id)) {
        foreach ($con->group_id as $group_id) {
            unset($group_con_map);
        }
        if (empty($group_con_map)) {
            unset($group_con_map);
        }
    }
};

Worker::runAll();

workerman手册加了这个例子 https://www.kancloud.cn/walkor/workerman/346075

其实群发最好用GatewayWorker框架,这些都是有现成的接口,直接调用就好了。

  • wrkingcs 2017-06-25

    好的 谢谢 我改用GatewayWorker了,但是Gateway怎么异步处理呢 就是GatewayWorker只管通信 我异步处理完了通知Gateway

年代过于久远,无法发表回答
×
🔝