如何使用workerman实现多进程主动轮询外部服务器?

typing
   首先感谢walkor大大创造了一个如此优秀的php框架. 其中的多进程优势、异步IO、定时器和libevent事件轮询库、支持高并发等特性,让我眼前为之一亮.

   我看了手册和demo,在做服务器方面已经提供了很好地实例和说明,可我目前遇到一个需求:将php服务器模拟客户端对外部服务器进行主动轮询.

如示意图:
[attach]67[/attach]

1.创建一个主进程(守护进程),一旦启动长时间运行在后台,即使关掉浏览器页面.

主线程定时查询数据库(MySQL),一旦发现有符合条件的URL(可能多条),即创建对应的数量的子进程.
子进程也需要长时间存在,定时轮询URL对应的服务器取回数据.
子进程一旦取回所需要的数据,将结果保存到数据库,自我结束(或被主进程关闭).

目前的想法是在worker类里增加一个轮询方法,但是感觉这样破坏了框架结构.
难点:
1.如何创建子线程?

如何实现定时轮询?
3.子线程如何自我关闭?

ps:
楼主从事iOS客户端开发,刚接触PHP几天,正在努力学习中,无奈项目期限太紧,苦思无果,前来宝地求助,希望能帮忙提供思路或给出简单demo.

再次感谢walkor大大和热心的朋友们.

8234 5 0
5个回答

walkor 打赏

首先赞一个,提问的非常有条理。

说说我的看法,
1、不能每个url一个进程,如果url数量控制不好,会造成创建太多进程导致服务器内存资源耗尽
2、子进程不必自我结束,进程能复用就尽量复用
3、业务比较简单,可以只开一个进程,并使用IO复用(workerman的异步IO或者curl_multi等),性能比多进程多线程更高

下面是一个定时器例子,只使用一个worker进程,定时查询数据库获得url,并异步批量请求这些url。

文件名 :Applications/HttpPoll/start.php

<?php
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;

$worker = new Worker();

$worker->onWorkerStart = function(){
    Timer::add(5, 'http_poll');
};

function http_poll()
{
    $url_array = get_url_array_from_db();
    // 这里使用的是workerman的异步IO AsyncTcpConnection。也可以使用curl,那样对http支持更好一些
    foreach($url_array as $url)
    {
         // 建立异步链接
         $connection = new AsyncTcpConnection('tcp://'.$url.':80');
         // $connection 是个对象,可以把一些数据以属性的形式存储进去,使用的时候再读取。这里把url存起来
         $connection->url = $url;
         // 链接失败时的处理
         $connection->onError = function($connection, $err_no, $err_msg)
         {
            echo "\n!!!!!!!!!!!!{$connection->url}!!!!!!!!!!!!!!!\n","fail $err_no $err_msg";
         };
         // 一旦链接上服务端,则发起http请求
         $connection->onConnect = function($connection)
         {
            $connection->send("GET / HTTP/1.1\r\nConnection: close\r\nHost: {$connection->url}\r\nAccept: text/html\r\nUser-Agent: Mozilla/5.0\r\n\r\n");
         };
         // 一旦收到数据打印。注意这里AsyncTcpConnection指定的是tcp,没有处理协议,这里接收的数据是分段的http协议数据
         $connection->onMessage = function($connection, $http_buffer)
         {
            echo "\n----------$connection->url---------\n{$http_buffer}\n";
            $connection->close();
         };
    }
}

// 从数据库中读取url
function get_url_array_from_db()
{
    return array('www.baidu.com', 'www.163.com', 'www.sina.com');
}

注意上面 http_poll 函数中使用的是workerman自带的异步IO,AsycTcpConnection,由于我没有实现客户端的http协议(实现方法参见手册协议订制部分),这里仅使用了tcp,没有分包,可能会导致onMessage中的$http_buffer是分段发来的。

http_poll中可用curl_multi_*函数替换workerman的异步IO,也可以批量获取url,并且对http协议支持的更好。使用方法及例子见: http://php.net/manual/en/function.curl-multi-exec.php

  • 暂无评论
typing

感谢walkor大大的耐心回复.

可是实际的需求比示意图要复杂:
1.从数据库中取出的不是一个简单地URL,而是一个taskId,要根据taskId去读取另一张表,要依次定时轮询表中的URL.

Task A 中的URL_A_1 与 Task B中的URL_B_1 可能需要同时发起(比如在晚上9点整同时请求n台服务器的数据).
该工程会在多台服务器上分布式部署,实现集群效果.

所以如果"可以只开一个进程,并使用IO复用"的话,虽然也可以实现,但是我担心在读写数据库时候会有阻塞,导致任务是依次执行的,而达不到并发的要求.另外复用一个进程,也可能会增加任务调度的逻辑复杂度.

如果我控制好worker进程的数量,解决服务器内存资源耗尽的问题后(如果2G内存,每个worker进程占用5m,那么我控制最多创建 400个进程),是否还有其它的隐患?

我在您的demo基础上改了一下,您看写得对不对?

<?php
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;

$main_worker = new Worker();

$main_worker->onWorkerStart = function(){
    Timer::add(5, 'sub_worker_create_poll');
};

$sub_worker_array = array();

function sub_worker_create_poll(){
    $task_array = get_taskId_array_from_db();
    foreach($task_array as $task_id)
    {
        if(empty($sub_worker_array)){
            $sub_worker = new Worker();
            $sub_worker->onWorkerStart = function(){
                Timer::add(1, 'http_poll');
            };

            $sub_worker_array = $sub_worker;
        }else{
            echo "任务".$task_id."已创建进程";
        }
    }
}

function http_poll()
{
    $task_id = ? //如何知道当前的函数是被哪个worker调用的? 然后取到$sub_worker_array对应的task_id?
    $url_array = get_url_array_from_db_by_taskId($task_id);
// 这里使用的是workerman的异步IO AsyncTcpConnection。也可以使用curl,那样对http支持更好一些
    foreach($url_array as $url)
    {
        // 建立异步链接
        $connection = new AsyncTcpConnection('tcp://'.$url.':80');
        // $connection 是个对象,可以把一些数据以属性的形式存储进去,使用的时候再读取。这里把url存起来
        $connection->url = $url;
        // 链接失败时的处理
        $connection->onError = function($connection, $err_no, $err_msg)
        {
            echo "\n!!!!!!!!!!!!{$connection->url}!!!!!!!!!!!!!!!\n","fail $err_no $err_msg";
        };
        // 一旦链接上服务端,则发起http请求
        $connection->onConnect = function($connection)
        {
            $connection->send("GET / HTTP/1.1\r\nConnection: close\r\nHost: {$connection->url}\r\nAccept: text/html\r\nUser-Agent: Mozilla/5.0\r\n\r\n");
        };
        // 一旦收到数据打印。注意这里AsyncTcpConnection指定的是tcp,没有处理协议,这里接收的数据是分段的http协议数据
        $connection->onMessage = function($connection, $http_buffer)
        {
            echo "\n----------$connection->url---------\n{$http_buffer}\n";
            $connection->close();
        };
    }
}

// 从数据库中读取task_id
function get_taskId_array_from_db()
{
    return array('task_id_1', 'task_id_2', 'task_id_3');
}

// 根据task_id从数据库中读取对应的URLs
function get_url_array_from_db_by_taskId($task_id)
{
    return array('www.baidu.com?param='.$task_id, 'www.163.com?param='.$task_id, 'www.sina.com?param='.$task_id);
}
  • 暂无评论
walkor 打赏

@Typing
不能在子进程中再创建子进程,进程越来越多会导致不可控

一个简单的方法是创建多个worker进程,每个进程定时从数据库中领取一个taskid去执行,有点想消息队列。这样也可以方便分布式部署。代码也最简单

  • 暂无评论
typing

@walkor
但是数据库的taskid数量是不确定的,会动态变化,不知道该创建多少个?
是不是还得有个进程来管理这些子进程?

  • 暂无评论
walkor 打赏

使用workerman,new 一个Worker,设置成固定的进程数(count属性),然后去轮询数据库即可。
进程管理根本不用担心,workerman会自己管理

taskid数量变化也没啥问题,比如有1000个taskid,100个进程轮询处理,简单的任务可能1秒就全部做完了

  • 暂无评论
年代过于久远,无法发表回答
×
🔝