按给的文档里的 $loop 使用 Worker::getEventLoop(); 获取,然后使用 Client 实例的 end() 后,没有任何反应,但改为使用 React\EventLoop\Factory::create() 获取到的 $loop 再按 react 的文档里调用 $loop->run() 才正常,on('response') 和 on('error') 才能正确收到数据。 另外为什么不改进下 AsyncTcpConnection 直接支持 http 协议呢?
哦,我知道原因了,因为发请求是在收到 beanstalk 队列后请求的,发送请求后 beanstalkd 立即 reserve 了等待新队列消息,这时候进程阻塞了,所以不执行 http 的请求了。。。看来这个 reserve 也得使用异步的方式来做才行。
我已经基于 AsyncTcpConnection 实现了一个全异步的协程式 Beanstalkd 客户端,目前测试可以正常运行,在 reserve 阶段不会阻塞 Worker 进程,等完善之后发布源码到 Github。 大概使用示例(以下代码在 onWorkerStart 中):
$client = new \MyTestBeanstalkdClient($config, $config); $client->start(function() use ($client) { $ret = yield $client->connect(); echo 'after connect'; var_dump($ret); $ret = yield $client->listTubes(); echo 'after listtubes'; var_dump($ret); while (true) { $ret = yield $client->reserve(); $lines = explode("\r\n", $ret); list($status, $jobId, $length) = explode(" ", $lines); array_shift($lines); $data = join('', $lines); echo "get job: $jobId\n"; echo "get data: $data"; $ret = yield $client->delete($jobId); echo $ret; } });
之前的用法,是普通的 socket 连接,当 reserve 时,Worker 进程会阻塞,导致其它异步请求无法发送,而且使用 status 命令也无法检查进程状态,显示为 busy。 改用这种异步协程方式后,reserve 状态下,工作进程就完全是 idle 状态,其它异步请求也完全没有任何影响,并且是全同步式写法,不会难理解。
牛逼
这个目前暂停了,因为用纯原生的 PHP 代码去实现协程碰到了各种问题,比如协程嵌套,协程之间的各种协调,处理起来非常麻烦,上面演示的是单层协程,但考虑到实现 start() 内部还可能出现其它程序的协程、客户端内部也可能出现断线重连等各种需要考虑协程配合的场景,我还没想到较好的办法来做这个,所以目前暂停了。
哦,我知道原因了,因为发请求是在收到 beanstalk 队列后请求的,发送请求后 beanstalkd 立即 reserve 了等待新队列消息,这时候进程阻塞了,所以不执行 http 的请求了。。。看来这个 reserve 也得使用异步的方式来做才行。
我已经基于 AsyncTcpConnection 实现了一个全异步的协程式 Beanstalkd 客户端,目前测试可以正常运行,在 reserve 阶段不会阻塞 Worker 进程,等完善之后发布源码到 Github。
大概使用示例(以下代码在 onWorkerStart 中):
之前的用法,是普通的 socket 连接,当 reserve 时,Worker 进程会阻塞,导致其它异步请求无法发送,而且使用 status 命令也无法检查进程状态,显示为 busy。
改用这种异步协程方式后,reserve 状态下,工作进程就完全是 idle 状态,其它异步请求也完全没有任何影响,并且是全同步式写法,不会难理解。
牛逼
这个目前暂停了,因为用纯原生的 PHP 代码去实现协程碰到了各种问题,比如协程嵌套,协程之间的各种协调,处理起来非常麻烦,上面演示的是单层协程,但考虑到实现 start() 内部还可能出现其它程序的协程、客户端内部也可能出现断线重连等各种需要考虑协程配合的场景,我还没想到较好的办法来做这个,所以目前暂停了。