原文:https://github.com/swow/swow/issues/115
快速体验swow:打开https://github.com/dixyes/lwmbs/actions/workflows/linux.yml,选择最新任务,下载cli_static_8.1_musl_x86_64_xxxx,解压即可使用
无阻塞的定时器
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Events\Swow;
use Swow\Coroutine;
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/swow.php';
$task = new Worker();
$task->onWorkerStart = function ($task) {
// 2 seconds
$time_interval = 2;
$timer_id = Timer::add($time_interval, function () {
Coroutine::run(static function () {
echo Coroutine::getCurrent()->getId(), ' ', date('H:i:s'), PHP_EOL;
sleep(10);
echo Coroutine::getCurrent()->getId(), ' ', date('H:i:s'), PHP_EOL;
});
});
};
// Run all workers
$task::$eventLoopClass = Swow::class;
Worker::runAll();
效果
root@be1fb440630a:/opt/www# php timer.php start
Workerman[index.php] start in DEBUG mode
----------------------------------------- WORKERMAN ------------------------------------------
Workerman version:4.1.5 PHP version:8.1.1 Event-Loop:Workerman\Events\Swow
------------------------------------------ WORKERS -------------------------------------------
proto user worker listen processes status
tcp root none none 1 [OK]
----------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
11 07:31:55
12 07:31:57
13 07:31:59
14 07:32:01
15 07:32:03
11 07:32:05
无阻塞的http server
<?php
use Workerman\Worker;
use Workerman\Events\Swow;
use Swow\Coroutine;
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/swow.php';
// #### http worker ####
$http_worker = new Worker('http://0.0.0.0:2345');
// 4 processes
$http_worker->count = 4;
// Emitted when data received
$http_worker->onMessage = function ($connection, $request) {
Coroutine::run(static function () use ($connection, $request) : void {
//$request->post();
//$request->header();
//$request->cookie();
//$request->session();
//$request->uri();
//$request->path();
//$request->method();
// Send data to client
$start = date('Y-m-d H:i:s');
sleep(3);
$connection->send($connection->worker->id . ' - ' .Coroutine::getCurrent()->getId() . ', start: ' . $start . ', now: ' . date('Y-m-d H:i:s'));
});
};
// Run all workers
$http_worker::$eventLoopClass = Swow::class;
Worker::runAll();
swow.php
<?php
namespace Workerman\Events;
use RuntimeException;
use Swow\Coroutine;
use Swow\Signal;
use Swow\SignalException;
use Workerman\Worker;
use function getmypid;
use function max;
use function msleep;
use function stream_poll_one;
use function Swow\Sync\waitAll;
use const STREAM_POLLHUP;
use const STREAM_POLLIN;
use const STREAM_POLLNONE;
use const STREAM_POLLOUT;
class Swow implements EventInterface
{
/**
* All listeners for read timer
* @var array
*/
protected $_eventTimer = [];
/**
* All listeners for read event.
* @var array<Coroutine>
*/
protected $_readEvents = [];
/**
* All listeners for write event.
* @var array<Coroutine>
*/
protected $_writeEvents = [];
/**
* All listeners for signal.
* @var array<Coroutine>
*/
protected $_signalListener = [];
protected $mapId;
/**
* Get timer count.
*
* @return integer
*/
public function getTimerCount()
{
return \count($this->_eventTimer);
}
/**
* {@inheritdoc}
*/
public function delay(float $delay, $func, $args)
{
$t = (int) ($delay * 1000);
$t = max($t, 1);
$coroutine = Coroutine::run(function () use ($t, $func, $args): void {
msleep($t);
unset($this->_eventTimer[Coroutine::getCurrent()->getId()]);
try {
$func(...(array) $args);
} catch (\Throwable $e) {
Worker::stopAll(250, $e);
}
});
$timer_id = $coroutine->getId();
$this->_eventTimer[$timer_id] = $timer_id;
return $timer_id;
}
/**
* {@inheritdoc}
*/
public function repeat(float $interval, $func, $args)
{
if ($this->mapId > \PHP_INT_MAX) {
$this->mapId = 0;
}
$t = (int) ($interval * 1000);
$t = max($t, 1);
$coroutine = Coroutine::run(function () use ($t, $func, $args): void {
while (true) {
msleep($t);
try {
$func(...(array) $args);
} catch (\Throwable $e) {
Worker::stopAll(250, $e);
}
}
});
$timer_id = $coroutine->getId();
$this->_eventTimer[$timer_id] = $timer_id;
return $timer_id;
}
/**
* {@inheritdoc}
*/
public function deleteTimer($timer_id)
{
if (isset($this->_eventTimer[$timer_id])) {
try {
(Coroutine::getAll()[$timer_id])->kill();
return true;
} finally {
unset($this->_eventTimer[$timer_id]);
}
}
return false;
}
/**
* {@inheritdoc}
*/
public function deleteAllTimer()
{
foreach ($this->_eventTimer as $timer_id) {
$this->deleteTimer($timer_id);
}
}
/**
* {@inheritdoc}
*/
public function onReadable($stream, $func)
{
if (isset($this->_readEvents[(int) $stream])) {
$this->offReadable($stream);
}
$this->_readEvents[(int) $stream] = Coroutine::run(function () use ($stream, $func): void {
try {
while (true) {
$rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP);
if ($rEvent !== STREAM_POLLNONE) {
$func($stream);
}
if ($rEvent !== STREAM_POLLIN) {
$this->offReadable($stream, bySelf: true);
break;
}
}
} catch (RuntimeException) {
$this->offReadable($stream, bySelf: true);
}
});
return true;
}
/**
* {@inheritdoc}
*/
public function offReadable($stream, bool $bySelf = false)
{
$fd = (int) $stream;
if (!isset($this->_readEvents[$fd])) {
return;
}
if (!$bySelf) {
$coroutine = $this->_readEvents[$fd];
if (!$coroutine->isExecuting()) {
return;
}
$coroutine->kill();
}
unset($this->_readEvents[$fd]);
}
/**
* {@inheritdoc}
*/
public function onWritable($stream, $func)
{
if (isset($this->_writeEvents[(int) $stream])) {
$this->offWritable($stream);
}
$this->_writeEvents[(int) $stream] = Coroutine::run(function () use ($stream, $func): void {
try {
while (true) {
$rEvent = stream_poll_one($stream, STREAM_POLLOUT | STREAM_POLLHUP);
if ($rEvent !== STREAM_POLLNONE) {
$func($stream);
}
if ($rEvent !== STREAM_POLLOUT) {
$this->offWritable($stream, bySelf: true);
break;
}
}
} catch (RuntimeException) {
$this->offWritable($stream, bySelf: true);
}
});
return true;
}
/**
* {@inheritdoc}
*/
public function offWritable($stream, bool $bySelf = false)
{
$fd = (int) $stream;
if (!isset($this->_writeEvents[$fd])) {
return;
}
if (!$bySelf) {
$coroutine = $this->_writeEvents[$fd];
if (!$coroutine->isExecuting()) {
return;
}
$coroutine->kill();
}
unset($this->_writeEvents[$fd]);
}
/**
* {@inheritdoc}
*/
public function onSignal($signal, $func)
{
if (isset($this->_signalListener[$signal])) {
return false;
}
$coroutine = Coroutine::run(static function () use ($signal, $func): void {
try {
Signal::wait($signal);
$func($signal);
} catch (SignalException) {
}
});
$this->_signalListener[$signal] = $coroutine;
return true;
}
/**
* {@inheritdoc}
*/
public function offSignal($signal)
{
if (!isset($this->_signalListener[$signal])) {
return false;
}
$this->_signalListener[$signal]->kill();
unset($this->_signalListener[$signal]);
return true;
}
/**
* {@inheritdoc}
*/
public function run()
{
waitAll();
}
/**
* Destroy loop.
*
* @return void
*/
public function stop()
{
Coroutine::getMain()->kill();
Signal::kill(getmypid(), Signal::INT);
}
public function destroy()
{
$this->stop();
}
public function add($fd, $flag, $func, $args = [])
{
switch ($flag) {
case self::EV_SIGNAL:
return $this->onSignal($fd, $func);
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$method = self::EV_TIMER === $flag ? 'tick' : 'after';
if ($method === 'tick') {
return $this->repeat($fd, $func, $args);
} else {
return $this->delay($fd, $func, $args);
}
case self::EV_READ:
return $this->onReadable($fd, $func);
case self::EV_WRITE:
return $this->onWritable($fd, $func);
}
}
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_SIGNAL:
return $this->offSignal($fd);
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
return $this->deleteTimer($fd);
case self::EV_READ:
case self::EV_WRITE:
if ($flag === self::EV_READ) {
$this->offReadable($fd);
} else {
$this->offWritable($fd);
}
}
}
public function clearAllTimer()
{
$this->deleteAllTimer();
}
public function loop()
{
waitAll();
}
}
swow版本
root@be1fb440630a:/opt/www# php --ri swow
Swow
Status => enabled
Author => Swow Team
Link => https://github.com/swow/swow
Contact => Twosee <twosee@php.net>
Version => 0.3.2-dev-390a1231 ( NTS )
Built => Dec 17 2022 07:11:23
Context => boost-context
Scheduler => libuv-event
收藏了
快速体验swow:打开https://github.com/dixyes/lwmbs/actions/workflows/linux.yml 选择最新任务,下载cli_static_8.1_musl_x86_64_xxxx,解压即可使用
没找到地方下载,笨了---
进入页面后在右边选一个linux test,点进去,往下翻,就有下载链接
好像在协程里面,我使用的redis和mysql很快就断开连接了,不知道是不是扩展的问题
$timer_id = Timer::add($time_interval, function () {
Coroutine::run(static function () {
Db::table()->find();
});
没复现,我这测试没问题
请问这个webman业务中能用吗(非新开的workerman进程中)
不能,别试
好的谢谢
用了这个扩展之后,是不是workerman的count(也就是分身数量)可以设置为1了。那么也意味着windows下面也可以得到linux差不多的性能了?