关于webman启动流程的简单分析

jeyfang

一、写在前面

21年的时候有机会第一次接触了webman,初次接触这种模式的框架时,感觉还是挺新颖的。目前已经使用了1年多,感觉挺不错的。之前在论坛看到了第三方项目中对PHPForker的介绍,于是有机会学习了一下,并重新回过头来看webman的启动流程。

二、说明

以下流程图是个人理解,有不对的地方请指出来,我将修正它。
同时声明下该流程的几个点

  1. 该流程核心关注接收并响应请求的流程,针对定时器部分暂未列出
  2. 默认处理流程涉及的系统为Linux
  3. 涉及信号的部分未列出
  4. 监听类型为tcp

三、流程图

如果图片查看不完整的话,看这里img

webman启动流程图

四、简单的server服务

<?php
declare(strict_types=1);

namespace Stream;

class Timer
{
    /**
     * @var SplPriorityQueue
     */
    private static SplPriorityQueue $queue;

    public function __construct()
    {
        // 初始化优先队列
        self::$queue = new SplPriorityQueue();
        // 定义extra
        self::$queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
    }

    public function init(): void
    {
        // 注册信号
        pcntl_signal(SIGALRM, [$this, 'handler'], false);
    }

    /**
     * @param callable $func 回调函数
     * @param float $interval
     * @return void
     */
    public function addTimer(callable $func, float $interval): void
    {
        // 如果不存在任务,则创建一个信号
        if (self::$queue->count() === 0) {
            pcntl_alarm(1);
        }
        $now = hrtime(true) / 1e-9;
        $nextRunTime = $now + $interval;
        self::$queue->insert(
            [
                'interval' => $interval,
                'func' => $func
            ],
            -$nextRunTime
        );
        $count = self::$queue->count();
        printf('queue count:%d' . PHP_EOL, $count);
    }

    public function handler(): void
    {
        pcntl_alarm(1);
        $this->tick();
    }

    public function tick(): void
    {
        $count = self::$queue->count();
        $now = hrtime(true) / 1e-9;
        while ($count--) {
            $data = self::$queue->top();
            $runTime = -$data['priority'];
            if ($runTime <= $now) {
                self::$queue->extract();
                call_user_func($data['data']['func']);
                $this->addTimer($data['data']['func'], $data['data']['interval']);
            }
        }
    }

}

class Server
{
    private string $server = 'tcp://127.0.0.1:5501';

    public function listenSelect(): void
    {
        $mainSocket = stream_socket_server($this->server, $errorCode, $errorMsg);

        stream_set_blocking($mainSocket, false);

        $read = [];
        $write = $except = null;

        $read[$this->server] = $mainSocket;

        while (true) {
            // 调用等待信号的处理器,Timer部分
            pcntl_signal_dispatch();

            $tmpRead = $read;
            $tmpWrite = $write;

            try {
                $select = stream_select($tmpRead, $tmpWrite, $except, 1, 0);
            } catch (\Throwable $e) {
                printf($e->getMessage() . PHP_EOL);
                continue;
            }

            if ($select === false) {
                continue;
            }

            foreach ($tmpRead as $sid => $socket) {
                if ($socket === $mainSocket) {
                    // 说明有新的链接进入
                    $newSocket = stream_socket_accept($socket, 0, $newSocketPeer);
                    if ($newSocket === false) {
                        print '接受connection失败:' . $newSocketPeer . PHP_EOL;
                        continue;
                    }
                    $socketAddress = 'tcp://' . stream_socket_get_name($newSocket, true);
                    $read[$socketAddress] = $newSocket;
                    print '接受connection成功:' . $newSocketPeer . PHP_EOL;
                }
                else {
                    // 从客户端读取数据,如何确定是当前这个client发来的数据?
                    // 因为select拿到数据,说明一定是有新的数据被读到(返回的read是有新数据到达的socket)
                    $msg = fread($socket, 65535);
                    if ($msg === '' || $msg === false) {
                        // 移除read
                        foreach ($read as $k => $v) {
                            if ($socket === $v) {
                                unset($read[$k]);
                            }
                        }
                        fclose($socket);
                        print '关闭connection:' . $sid . PHP_EOL;
                    } else {
                        // 打印信息,并写回
                        print '收到connection消息:' . $msg . PHP_EOL;
                        if (in_array($socket, $read, true)) {
                            fwrite($socket, '已收到信息,' . date('H:i:s') . PHP_EOL);
                        }
                    }
                }
            }

//            usleep(100000);
        }

    }
}

$timer = new Timer();
$timer->addTimer(
    static function() {
        $time = time();
        printf('this is timer: hello world: %d' . PHP_EOL, $time);
    },
    1
);

$timer->init();
(new Server())->listenSelect();

四、中途遇到的几个问题

4.1、关于stream_select一直返回

起初在本地测试的过程中,针对stream_select(&$read, &$write, &$except, $tv_sec, $tv_usec),我学着的PHPForker的做法,将新连接进来的socket不仅放入$read中,同时也放入$write中,这样一旦启动之后新连接进入后,原本的stream_select应该阻塞直到超时的,却并没有被阻塞。通过debug调试发现,这时每次select都有write返回,也就是该新加进去的socket。

这里起初,我针对stream_select的什么时候返回的理解,除了超时场景下返回外,另外的就是监听的$read、$write、$except的文件描述符有新变化才返回。所以当第一次将把主动监听的socket放进到read时(这里后面统一称为mainSocket),如果此时有客户端连接进来,那么下一次在select监听的时候,就会发现read中的mainSocket有就绪的状态,所以这时我们将受理连接,也就是通过stream_socket_accept去接收,得到一个客户端的socket。如果此时将该socket不仅放入$read去监听,同时也放到$write去监听,那接下来每次select的时候,都会立即返回。为什么呢?

后面我翻到了有其他人也有同样的疑惑https://www.workerman.net/q/1307,看到这个我才大概明白。前面放进去的$write一直处于可写状态,所以每次select都能拿到。

4.2、理解Linux下的select、poll、epoll模型

这里找到了几篇解说挺好的文章,可以参考下,它有一个系列PHP socket初探,PHP Socket初探—-先从一个简单的socket服务器开始

五、推荐文章

1、php socket通信中stream_select方法的理解
2、stream_select ($read, $write, $except, $timeout ); 函数问题
3、PHP Socket初探—-先从一个简单的socket服务器开始

原文见这里

3583 4 14
4个评论

智佳思远

很少有这么底层的好文了

  • 暂无评论
liziyu

收藏了,感谢分享@!

  • 暂无评论
jeyfang

有理解不对的地方,请大家指正

  • 暂无评论
27025011

"PHP Socket初探—-先从一个简单的socket服务器开始":有没完整的php代码?

  • admin 2023-07-04
    <?php
    
    class SocketServer
    {
    
        public $socket;
    
        public $all_sockets = [];
    
        public function __construct(string $address)
        {
            //创建监听socket服务 资源句柄 ,,resource|false
            $this->socket = \stream_socket_server($address, $php_errorcode, $php_errormsg);
    
            if ($this->socket == false) {
                throw new \Exception('创建失败!', $php_errorcode, $php_errormsg);
            }
    
            \stream_set_blocking($this->socket, false);
    
            //当前初始化的第一个socket服务 资源句柄放入 资源池
            $this->all_sockets[intval($this->socket)] = $this->socket;
            echo ('=== 服务器启动|' . $address.PHP_EOL);
        }
    
        public function run()
        {
            //死循环
            while (true) {
                $write=$except=null;
                $allSocket=$this->all_sockets;
                \stream_select($allSocket, $write, $except, 60);
    
                foreach ($allSocket as $index => $socket) {
                    //如果 streamsocket 服务句柄 == 当前socket资源池的id,说明有新连接
                    if ($this->socket === $socket) {
                        // 接受由 stream_socket_server() 创建的套接字连接 timeout/覆盖默认的套接字接受的超时时限。输入的时间需以秒为单位
                        // resource|false
                        $new_conn_socket = \stream_socket_accept($this->socket);
                        if ($new_conn_socket == false) {
                            //可能是建立连接异常
                            continue;
                        }
                        $this->onConn($new_conn_socket);
                        $this->all_sockets[intval($new_conn_socket)] = $new_conn_socket;
                        echo '=== 新连接建立'.(int)($new_conn_socket).PHP_EOL;
                    } else {
                        //如果不是新的连接,则看看有无数据过来,读取长度 65536
                        $buff = fread($socket, 0xFFFF);
                        // 不能是 == ''
                        if ($buff === '' || $buff === false) {
                            //客户端已断开
                            echo '=== 断开连接'.intval($socket).PHP_EOL;
                            $this->onClose($socket);
                            unset($this->all_sockets[intval($socket)]);
                            fclose($socket);
                            continue;
                        }
                        //处理发来的数据
                        $this->onMessage($socket, $buff);
                    }
                }
            }
        }
    
        public function onConn(mixed $socket)
        {
    
        }
    
        public function onClose(mixed $socket)
        {
        }
    
        public function onMessage(mixed $socket, $buff)
        {
            $body = 'hello word';
            $header = [
                'HTTP/1.1 200 OK',
                'Connection: keep-alive',
                'Niubi: test123',
                'Content-length:' . strlen($body)
            ];
            $header_string = \implode(chr(0x0D) . chr(0x0A), $header);
    
            $data = $header_string . chr(0x0D) . chr(0x0A) . chr(0x0D) . chr(0x0A) . $body;
    
    //        echo '用户数据:'.$buff.PHP_EOL;
            fwrite($socket, $data);
    
            unset($this->all_sockets[intval($socket)]);
            fclose($socket);
        }
    
    }
    
    $a = new SocketServer('tcp://0.0.0.0:88');
    $a->run();;
    
年代过于久远,无法发表评论

jeyfang

414
积分
0
获赞数
0
粉丝数
2021-05-12 加入
×
🔝