package. package_length 在使用Channel的时候,出险这样的错误提示,请问,是什么原因,如何解决

YangWJ

错误提示:
error package. package_length=1195725856
使用workerman与TP5.0的整合方法,分别建立了webscoket服务与Channel的服务端、http服务,启动客户端连接。
两个问题:
1、Channel客户端与服务端连接失败;
2、error package. package_length=1195725856。
原计划尝试将这个功能嵌入到tp5.0里面
https://www.kancloud.cn/walkor/workerman/315202
个人修改后代码,(刚入门菜鸟,轻喷。)
贴上原代码:
tp5.0 :文件路径1:vendor/topthink/think-worker/src Worker.php

namespace think\worker;

use Workerman\Worker;
use Channel\Client;
use Channel\Server as Channelserver;
/**
 * Worker控制器扩展类
 * Channel相关文件:Client.php和.Server.php,安装在tp根目录:extend/Channel下
 */
abstract class Server
{
    /**
     * -----------------------------------------------------------------------------------------------------------
     * 
     */
    protected $worker;
    protected $socket     = '';
    protected $protocol   = 'http';
    protected $host       = '0.0.0.0';
    protected $port       = '2346';
    protected $processes  = 4;  //进程个数
    protected $name       = "ApiWebSocket"; //进程名称
    protected $daemonize  = false;  //表示是否以daemon(守护进程)方式运行
    protected $stdoutFile = LOG_PATH."workman_putout.log";  //所有的打印输出全部保存在日志文件workman_putout.log文件中
    protected $logFile    = LOG_PATH."workman_start_stop.log";  //此文件记录了workerman自身相关的日志,包括启动、停止等

    //Channel服务扩展
    protected $channelServer    = false;  //是否开启Channel服务,默认false,不开启服务
    protected $channelHost      = '0.0.0.0';  //是否开启Channel服务,默认false,不开启服务
    protected $channelPort      = 2206;  //是否开启Channel服务,默认false,不开启服务

    /**
     * 架构函数
     * @access public
     */
    public function __construct()
    {
        //是否启动Channel服务,默认不开启
        if($this->channelServer){
            $channel_server = new Channelserver($this->channelHost, $this->channelPort);
        }
        // 实例化 Websocket 服务
        $this->worker = new Worker($this->socket ?: $this->protocol . '://' . $this->host . ':' . $this->port);
        // 设置进程数
        $this->worker->count = $this->processes;
        $this->worker->name = $this->name;      
        // 初始化
        $this->init();

        // 设置回调
        foreach ( as $event) {
            if (method_exists($this, $event)) {
                $this->worker->$event = ;
            }
        }
        // Run worker
        Worker::runAll();
    }

    protected function init()
    {
        //输出日志文件
        if (!file_exists($this->stdoutFile)) { // 如果不存在则创建
            file_put_contents($this->stdoutFile, '');
            // 检测是否有权限操作
            if (!is_writeable($this->stdoutFile)) chmod($this->stdoutFile, 0777); // 如果无权限,则修改为0777最大权限         
        };
        //开启、关闭日志文件
        if (!file_exists($this->logFile)) { // 如果不存在则创建
            file_put_contents($this->logFile, '');
            // 检测是否有权限操作
            if (!is_writeable($this->logFile)) chmod($this->logFile, 0777); // 如果无权限,则修改为0777最大权限    
        };

        Worker::$daemonize  = $this->daemonize;
        Worker::$stdoutFile = $this->stdoutFile;
        Worker::$logFile    = $this->logFile;
    }

}

文件2:websocket 服务端 启动Channel服务端 路径:application/push/controller/Worker.php

<?php

namespace app\push\controller;

use think\worker\Server;
use Workerman\Lib\Timer;
use Channel\Client;
use Channel\Server as Channelserver;

class Worker extends Server
{
    protected $socket        = 'websocket://api.3-gd.com:2886';
    protected $processes     = 2;
    protected $name          = "pusher";    //进程名称

    protected $channelServer = true; //是否开启Channel服务,默认false,不开启服务
    protected $channelHost   = 'api.3-gd.com';  //是否开启Channel服务,默认false,不开启服务
    protected $channelPort   = 2207;  //是否开启Channel服务,默认false,不开启服务

    /**
     * 数据传输内容格式要求
     * a、必须包含用户ID、用户信息、请求路径或请求路径、请求或传输的参数、返回数据的处理方法
     * b、参数全部为json字符串,接收后再处理为数组对象或json对象
     * 接收数据后,此页面仅做分发和接收处理,其余数据由对应的模块功能进行处理
     * 
     * 返回数据内容
     * "send_msg" :返回的提示主要信息
     * "send_desc":返回的主要描述信息
     * "type"     :返回的数据处理方式,如"message":一般提示信息、"alarm":重要提示信息、"data":接口返回数据
     * "fun"      :建议的数据处理方法及函数
     * 
     */

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        // 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
        $connection->lastMessageTime = time();
        // 处理接收到的数据
        $input_info = json_decode( $data, true );
        // 判断接收的数据是否为合法数据
        if(!$input_info||!$input_info||!$input_info||!$input_info||!$input_info){
            $arr = ;                    
            $connection->send( json_encode($arr) );
        }
        // 将判断后的数据交给对应的控制器去处理
        else{
            //选择控制器并获取操作结果
            $user_id   = $input_info;
            $user_info = $input_info;
            $user_mvc  = $input_info;
            $user_data = $input_info;
            $user_fun  = $input_info;
            //将数据交给其他模型处理
            $do_action = model('push/PushComm');
            $res = $do_action->sendDataToMVC($user_id,$user_info,$user_mvc,$user_data);
            $arr=;  
            //将结果返回给客户端
            $connection->send( json_encode($arr) );
        }
    }

    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
    {

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {

    }

    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }

    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {
        // 心跳间隔60秒
        define('HEARTBEAT_TIME', 60);
        // 链接保持提醒10秒
        define('ALARM_TIME', 10);
        Timer::add(1, function()use($worker){
            $time_now = time();
            foreach($worker->connections as $connection) {
                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;                   
                    continue;
                }
                //$connection->send(time());
                // 链接保持提醒时间
                if ($time_now - $connection->lastMessageTime == HEARTBEAT_TIME - ALARM_TIME) {
                    //发送命令消息,提示应该重新发送消息,保持链接
                    $arr = ;
                    $connection->send( json_encode($arr) );
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                    $connection->close();
                }
            }
        });

        //连接Channel服务
        // Channel客户端连接到Channel服务端
        Client::connect( $this->channelHost, $this->channelPort);
        // 以自己的进程id为事件名称
        $event_name = $worker->id;
        // 订阅worker->id事件并注册事件处理函数
        Client::on($event_name, function($event_data)use($worker){
            $to_connection_id = $event_data;
            $message = $event_data;
            if(!isset($worker->connections))
            {
                echo "connection not exists\n";
                return;
            }
            $to_connection = $worker->connections;
            $to_connection->send($message);
        });

        // 订阅广播事件
        $event_name = '广播';
        // 收到广播事件后向当前进程内所有客户端连接发送广播数据
        Client::on($event_name, function($event_data)use($worker){
            $message = $event_data;
            foreach($worker->connections as $connection)
            {
                $connection->send($message);
            }
        });
    }
}

文件3:http 服务端 启动Channel客户端 路径:application/push/controller/Workerserver.php

<?php

namespace app\push\controller;

use think\worker\Server;
use Workerman\Lib\Timer;
use Channel\Client;
use Channel\Server as Channelserver;

/**
 * 基于Worker的多进程(分布式集群)推送系统 
 * 可以实现服务端后台任务向用户推送数据
 * 后台用户定期执行某操作等
 * 实现双向通讯
 * -------------------------------特别说明---------------------------------
 * Channel相关文件:Client.php和.Server.php,安装在tp根目录:extend/Channel下
 * 
 * 用来处理http请求,向任意客户端推送数据,需要传workerID和connectionID
 * 
 * 依赖WorkMan
 */
class Workerserver extends Server
{
    protected $socket        = 'http://api.3-gd.com:2887';
    protected $processes     = 1;
    protected $name          = "publisher"; //进程名称

    protected $channelServer = false; //是否开启Channel服务,默认false,不开启服务
    protected $channelHost   = 'api.3-gd.com';  //是否开启Channel服务,默认false,不开启服务
    protected $channelPort   = 2207;  //是否开启Channel服务,默认false,不开启服务

    // 用来处理http请求,向任意客户端推送数据,需要传workerID和connectionID
    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {
        //连接Channel服务
        // Channel客户端连接到Channel服务端
        Client::connect( $this->channelHost, $this->channelPort);
    }

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        $connection->send('ok');
        if(empty($_GET)) return;
        // 是向某个worker进程中某个连接推送数据
        if(isset($_GET) && isset($_GET))
        {
            $event_name = $_GET;
            $to_connection_id = $_GET;
            $content = $_GET;
            Client::publish($event_name, array(
               'to_connection_id' => $to_connection_id,
               'content'          => $content
            ));
        }
        // 是全局广播数据
        else
        {
            $event_name = '广播';
            $content = $_GET;
            Client::publish($event_name, array(
               'content'          => $content
            ));
        }
    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }
}

求各位大神指点一下,谢谢

6674 2 0
2个回答

walkor 打赏

error package. package_length=1195725856 是有程序连错端口了,客户端协议与服务端协议不一致,解析出来的包长为1195725856。

手册中demo是ok的,你的具体哪里连错了得自己看了,代码太多了。

  • 暂无评论
YangWJ

谢谢,我再查查。

  • 暂无评论
年代过于久远,无法发表回答
×
🔝