gateway-worker运行逻辑分析

sunwenzheng

https://www.workerman.net/doc/gateway-worker/principle.html

  1. Gateway 注册到Register上
  2. BusinessWorker注册到Register上
  3. Register 将 Gateway 列表 给 BusinessWorer
  4. BusinessWorker依次连接所有Gateway
  5. GateWay将连接自己的 BusinessWorker 存到_workerConnections数组里面,当客户端有数据上来时,随机选择一个BusinessWorker来处理。
  6. BusinessWorker 使用 属性 eventHandler 对应类的静态方法来处理收到数据的业务逻辑

下面是进一步的代码片段说明

Register

     * 设置消息回调
     *
     * @param \Workerman\Connection\ConnectionInterface $connection
     * @param string                                    $buffer
     * @return void
     */
    public function onMessage($connection, $buffer)
    {
        // 删除定时器
        Timer::del($connection->timeout_timerid);
        $data       = @json_decode($buffer, true);
        if (empty($data['event'])) {
            $error = "Bad request for Register service. Request info(IP:".$connection->getRemoteIp().", Request Buffer:$buffer). See http://doc2.workerman.net/register-auth-timeout.html";
            Worker::log($error);
            return $connection->close($error);
        }
        $event      = $data['event'];
        $secret_key = isset($data['secret_key']) ? $data['secret_key'] : '';
        // 开始验证
        switch ($event) {

            // 是 gateway 连接
            case 'gateway_connect':
                if (empty($data['address'])) {
                    echo "address not found\n";
                    return $connection->close();
                }
                if ($secret_key !== $this->secretKey) {
                    Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
                    return $connection->close();
                }
                $this->_gatewayConnections[$connection->id] = $data['address'];
                $this->broadcastAddresses();
                break;

            // 是 worker 连接
            case 'worker_connect':
                if ($secret_key !== $this->secretKey) {
                    Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
                    return $connection->close();
                }
                $this->_workerConnections[$connection->id] = $connection;
                $this->broadcastAddresses($connection);
                break;
            case 'ping':
                break;
            default:
                Worker::log("Register unknown event:$event IP: ".$connection->getRemoteIp()." Buffer:$buffer. See http://doc2.workerman.net/register-auth-timeout.html");
                $connection->close();
        }
    }
 /**
     * 向 BusinessWorker 广播 gateway 内部通讯地址
     *
     * @param \Workerman\Connection\ConnectionInterface $connection
     */
    public function broadcastAddresses($connection = null)
    {
        $data   = array(
            'event'     => 'broadcast_addresses',
            'addresses' => array_unique(array_values($this->_gatewayConnections)),
        );
        $buffer = json_encode($data);
        if ($connection) {
            $connection->send($buffer);
            return;
        }
        foreach ($this->_workerConnections as $con) {
            $con->send($buffer);
        }
    }

Gateway :

   /**
     * 构造函数
     *
     * @param string $socket_name
     * @param array  $context_option
     */
    public function __construct($socket_name, $context_option = array())
    {
        parent::__construct($socket_name, $context_option);
        $this->_gatewayPort = substr(strrchr($socket_name,':'),1);
        $this->router = array("\\GatewayWorker\\Gateway", 'routerBind');

        $backtrace               = debug_backtrace();
        $this->_autoloadRootPath = dirname($backtrace[0]['file']);
    }
    /**
     * client_id 与 worker 绑定
     *
     * @param array         $worker_connections
     * @param TcpConnection $client_connection
     * @param int           $cmd
     * @param mixed         $buffer
     * @return TcpConnection
     */
    public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
    {
        if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
            $client_connection->businessworker_address = array_rand($worker_connections);
        }
        return $worker_connections[$client_connection->businessworker_address];
    }
 /**
     * 发送数据给 worker 进程
     *
     * @param int           $cmd
     * @param TcpConnection $connection
     * @param mixed         $body
     * @return bool
     */
    protected function sendToWorker($cmd, $connection, $body = '')
    {
        $gateway_data             = $connection->gatewayHeader;
        $gateway_data['cmd']      = $cmd;
        $gateway_data['body']     = $body;
        $gateway_data['ext_data'] = $connection->session;
        if ($this->_workerConnections) {
            // 调用路由函数,选择一个worker把请求转发给它
            /** @var TcpConnection $worker_connection */
            $worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
            if (false === $worker_connection->send($gateway_data)) {
                $msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://doc2.workerman.net/send-buffer-overflow.html";
                static::log($msg);
                return false;
            }
        } // 没有可用的 worker
        else {
            // gateway 启动后 1-2 秒内 SendBufferToWorker fail 是正常现象,因为与 worker 的连接还没建立起来,
            // 所以不记录日志,只是关闭连接
            $time_diff = 2;
            if (time() - $this->_startTime >= $time_diff) {
                $msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://doc2.workerman.net/send-buffer-to-worker-fail.html';
                static::log($msg);
            }
            $connection->destroy();
            return false;
        }
        return true;
    }
 /**
     * 当 worker 发来数据时
     *
     * @param TcpConnection $connection
     * @param mixed         $data
     * @throws \Exception
     *
     * @return void
     */
    public function onWorkerMessage($connection, $data)
    {
        $cmd = $data['cmd'];
        if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
            self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
            $connection->close();
            return;
        }
        switch ($cmd) {
            // BusinessWorker连接Gateway
            case GatewayProtocol::CMD_WORKER_CONNECT:
                $worker_info = json_decode($data['body'], true);
                if ($worker_info['secret_key'] !== $this->secretKey) {
                    self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));
                    $connection->close();
                    return;
                }
                $key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];
                // 在一台服务器上businessWorker->name不能相同
                if (isset($this->_workerConnections[$key])) {
                    self::log("Gateway: Worker->name conflict. Key:{$key}");
                    $connection->close();
                    return;
                }
                $connection->key = $key;
                $this->_workerConnections[$key] = $connection;
                $connection->authorized = true;
                if ($this->onBusinessWorkerConnected) {
                    call_user_func($this->onBusinessWorkerConnected, $connection);
                }
                return;
          .......... 省略其他判断
            default :
                $err_msg = "gateway inner pack err cmd=$cmd";
                echo $err_msg;
        }
    }
    /**
     * 当 Gateway 启动的时候触发的回调函数
     *
     * @return void
     */
    public function onWorkerStart()
    {
        // 分配一个内部通讯端口
        $this->lanPort = $this->startPort + $this->id;

        // 如果有设置心跳,则定时执行
        if ($this->pingInterval > 0) {
            $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
            Timer::add($timer_interval, array($this, 'ping'));
        }

        // 如果BusinessWorker ip不是127.0.0.1,则需要加gateway到BusinessWorker的心跳
        if ($this->lanIp !== '127.0.0.1') {
            Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));
        }

        if (!class_exists('\Protocols\GatewayProtocol')) {
            class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
        }

         //如为公网IP监听,直接换成0.0.0.0 ,否则用内网IP
        $listen_ip=filter_var($this->lanIp,FILTER_VALIDATE_IP,FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE)?'0.0.0.0':$this->lanIp;
        // 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
        $this->_innerTcpWorker = new Worker("GatewayProtocol://{$listen_ip}:{$this->lanPort}");
        $this->_innerTcpWorker->reusePort = false;
        $this->_innerTcpWorker->listen();
        $this->_innerTcpWorker->name = 'GatewayInnerWorker';

        // 重新设置自动加载根目录
        Autoloader::setRootPath($this->_autoloadRootPath);

        // 设置内部监听的相关回调
        $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');

        $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
        $this->_innerTcpWorker->onClose   = array($this, 'onWorkerClose');

        // 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接
        $this->registerAddress();

        if ($this->_onWorkerStart) {
            call_user_func($this->_onWorkerStart, $this);
        }
    }
 /**
     * 存储当前 Gateway 的内部通信地址
     *
     * @return bool
     */
    public function registerAddress()
    {
        $address = $this->lanIp . ':' . $this->lanPort;
        foreach ($this->registerAddress as $register_address) {
            $register_connection = new AsyncTcpConnection("text://{$register_address}");
            $secret_key = $this->secretKey;
            $register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){
                $register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}');
                // 如果Register服务器不在本地服务器,则需要保持心跳
                if (strpos($register_address, '127.0.0.1') !== 0) {
                    $register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
                        $register_connection->send('{"event":"ping"}');
                    });
                }
            };
            $register_connection->onClose = function ($register_connection) {
                if(!empty($register_connection->ping_timer)) {
                    Timer::del($register_connection->ping_timer);
                }
                $register_connection->reconnect(1);
            };
            $register_connection->connect();
        }
    }

BusinessWorker

/**
     * 连接服务注册中心
     * 
     * @return void
     */
    public function connectToRegister()
    {
        foreach ($this->registerAddress as $register_address) {
            $register_connection = new AsyncTcpConnection("text://{$register_address}");
            $secret_key = $this->secretKey;
            $register_connection->onConnect = function () use ($register_connection, $secret_key, $register_address) {
                $register_connection->send('{"event":"worker_connect","secret_key":"' . $secret_key . '"}');
                // 如果Register服务器不在本地服务器,则需要保持心跳
                if (strpos($register_address, '127.0.0.1') !== 0) {
                    $register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
                        $register_connection->send('{"event":"ping"}');
                    });
                }
            };
            $register_connection->onClose = function ($register_connection) {
                if(!empty($register_connection->ping_timer)) {
                    Timer::del($register_connection->ping_timer);
                }
                $register_connection->reconnect(1);
            };
            $register_connection->onMessage = array($this, 'onRegisterConnectionMessage');
            $register_connection->connect();
        }
    }
   /**
     * 当注册中心发来消息时
     *
     * @return void
     */
    public function onRegisterConnectionMessage($register_connection, $data)
    {
        $data = json_decode($data, true);
        if (!isset($data['event'])) {
            echo "Received bad data from Register\n";
            return;
        }
        $event = $data['event'];
        switch ($event) {
            case 'broadcast_addresses':
                if (!is_array($data['addresses'])) {
                    echo "Received bad data from Register. Addresses empty\n";
                    return;
                }
                $addresses               = $data['addresses'];
                $this->_gatewayAddresses = array();
                // 保存gateway列表,准备连接
                foreach ($addresses as $addr) {
                    $this->_gatewayAddresses[$addr] = $addr;
                }
                $this->checkGatewayConnections($addresses);
                break;
            default:
                echo "Receive bad event:$event from Register.\n";
        }
    }
    /**
     * 检查 gateway 的通信端口是否都已经连
     * 如果有未连接的端口,则尝试连接
     *
     * @param array $addresses_list
     */
    public function checkGatewayConnections($addresses_list)
    {
        if (empty($addresses_list)) {
            return;
        }
        foreach ($addresses_list as $addr) {
            if (!isset($this->_waitingConnectGatewayAddresses[$addr])) {
                $this->tryToConnectGateway($addr);
            }
        }
    }
    /**
     * 尝试连接 Gateway 内部通讯地址
     *
     * @param string $addr
     */
    public function tryToConnectGateway($addr)
    {
        if (!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddresses[$addr]) && isset($this->_gatewayAddresses[$addr])) {
            $gateway_connection                    = new AsyncTcpConnection("GatewayProtocol://$addr");
            $gateway_connection->remoteAddress     = $addr;
            $gateway_connection->onConnect         = array($this, 'onConnectGateway');
            $gateway_connection->onMessage         = array($this, 'onGatewayMessage');
            $gateway_connection->onClose           = array($this, 'onGatewayClose');
            $gateway_connection->onError           = array($this, 'onGatewayError');
            $gateway_connection->maxSendBufferSize = $this->sendToGatewayBufferSize;
            if (TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize) {
                $gateway_connection->maxSendBufferSize = 50 * 1024 * 1024;
            }
            $gateway_data         = GatewayProtocol::$empty;
            $gateway_data['cmd']  = GatewayProtocol::CMD_WORKER_CONNECT;
            $gateway_data['body'] = json_encode(array(
                'worker_key' =>"{$this->name}:{$this->id}", 
                'secret_key' => $this->secretKey,
            ));
            $gateway_connection->send($gateway_data);
            $gateway_connection->connect();
            $this->_connectingGatewayAddresses[$addr] = $addr;
        }
        unset($this->_waitingConnectGatewayAddresses[$addr]);
    }
  /**
     * 当进程启动时一些初始化工作
     *
     * @return void
     */
    protected function onWorkerStart()
    {
        if (function_exists('opcache_reset')) {
            opcache_reset();
        }

        if (!class_exists('\Protocols\GatewayProtocol')) {
            class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
        }

        if (!is_array($this->registerAddress)) {
            $this->registerAddress = array($this->registerAddress);
        }
        $this->connectToRegister();

        \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
        \GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;
        if ($this->_onWorkerStart) {
            call_user_func($this->_onWorkerStart, $this);
        }

        if (is_callable($this->eventHandler . '::onWorkerStart')) {
            call_user_func($this->eventHandler . '::onWorkerStart', $this);
        }

        if (function_exists('pcntl_signal')) {
            // 业务超时信号处理
            pcntl_signal(SIGALRM, array($this, 'timeoutHandler'), false);
        } else {
            $this->processTimeout = 0;
        }

        // 设置回调 -- 主要的业务代码,注意是静态方法, eventHandler 可以自定义类
        if (is_callable($this->eventHandler . '::onConnect')) {
            $this->_eventOnConnect = $this->eventHandler . '::onConnect';
        }

        if (is_callable($this->eventHandler . '::onMessage')) {
            $this->_eventOnMessage = $this->eventHandler . '::onMessage';
        } else {
            echo "Waring: {$this->eventHandler}::onMessage is not callable\n";
        }

        if (is_callable($this->eventHandler . '::onClose')) {
            $this->_eventOnClose = $this->eventHandler . '::onClose';
        }

        if (is_callable($this->eventHandler . '::onWebSocketConnect')) {
            $this->_eventOnWebSocketConnect = $this->eventHandler . '::onWebSocketConnect';
        }

    }
1498 1 4
1个评论

Tinywan

感谢分享

  • 暂无评论
年代过于久远,无法发表评论

sunwenzheng

960
积分
0
获赞数
0
粉丝数
2022-08-02 加入
×
🔝