webman/workerman的协程基建套件及分享

chaz6chez

前言

这段时间本身比较忙,也很少在关注技术相关的点,上个月空了刚好有时间看看群里,结果发现大家在讨论协程以及webman/workerman的劣势-阻塞退化问题,本来说是稍稍提两下实现方向,结果一来二去直接弄了一个插件出来,经过反反复复修改,最后发布了webman-coroutine插件

现状

workerman/webman的阻塞退化问题

workerman是标准的master/worker多进程模型,master只负责管理worker,而每个worker会启动event-loop进行事件监听,这里面包含了stream、timer等事件,所有事件公用一个event-loop,公用一套调度体系;每一个事件回调会触发注册的回调函数,整体是单线程的执行调度,也就是说如果回调函数里面有阻塞,那么会阻塞event-loop的循环,直到回调函数执行完毕才会执行下一个事件回调。

也就是说你把event-loop看作是一个队列,那么回调函数就是消费者,这个队列是一个单消费者的队列,当回调函数阻塞的时候,队列是没有其他消费者来消费回调的,这也就造成了队头阻塞问题,当队列buffer被占满时,生产者将无法投送事件到event-loop中,这会造成什么问题呢?假设我们有N个worker监听8080端口,当有消息的时候会触发一次start()方法,而start()方法是一个while(1){}的死循环,那么每请求一次将占用一个worker,导致worker一直在等待start()执行完毕才能释放控制权给event-loop,当N个任务后,所有worker将被占满,至此,workerman将无法接收8080端口的任何信息。

当然,现实环境下没有这么夸张,但是遇到一些长阻塞的方法时还是会存在并发量上不去的问题,那么在传统workerman的开发环境下怎么处理呢?开多一点worker;其实你把它看成一个消息队列就好理解,当消费能力上不去的时候,要么减少消费阻塞时长,要么就是增加消费者。webman也同理,因为webman是在事件回调函数内进行框架的加载和控制器方法的执行的。

workerman swoole驱动未使用协程

有朋友会说,webman/workerman可以使用swoole作为底层驱动,只要安装swoole并将workerman的驱动设置为Swoole即可使用协程了;这种说法并不完全正确。

以下是workerman 4.x的swoole驱动实现:


<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    Ares<aresrr#qq.com>
 * @link      http://www.workerman.net/
 * @link      https://github.com/ares333/Workerman
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Workerman\Events;

use Workerman\Worker;
use Swoole\Event;
use Swoole\Timer;

class Swoole implements EventInterface
{

    protected $_timer = array();

    protected $_timerOnceMap = array();

    protected $mapId = 0;

    protected $_fd = array();

    // milisecond
    public static $signalDispatchInterval = 500;

    protected $_hasSignal = false;

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::add()
     */
    public function add($fd, $flag, $func, $args = array())
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                $res = \pcntl_signal($fd, $func, false);
                if (! $this->_hasSignal && $res) {
                    Timer::tick(static::$signalDispatchInterval,
                        function () {
                            \pcntl_signal_dispatch();
                        });
                    $this->_hasSignal = true;
                }
                return $res;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                $method = self::EV_TIMER === $flag ? 'tick' : 'after';
                if ($this->mapId > \PHP_INT_MAX) {
                    $this->mapId = 0;
                }
                $mapId = $this->mapId++;
                $t = (int)($fd * 1000);
                if ($t < 1) {
                   $t = 1;   
                }
                $timer_id = Timer::$method($t,
                    function ($timer_id = null) use ($func, $args, $mapId) {
                        try {
                            \call_user_func_array($func, (array)$args);
                        } catch (\Exception $e) {
                            Worker::stopAll(250, $e);
                        } catch (\Error $e) {
                            Worker::stopAll(250, $e);
                        }
                        // EV_TIMER_ONCE
                        if (! isset($timer_id)) {
                            // may be deleted in $func
                            if (\array_key_exists($mapId, $this->_timerOnceMap)) {
                                $timer_id = $this->_timerOnceMap[$mapId];
                                unset($this->_timer[$timer_id],
                                    $this->_timerOnceMap[$mapId]);
                            }
                        }
                    });
                if ($flag === self::EV_TIMER_ONCE) {
                    $this->_timerOnceMap[$mapId] = $timer_id;
                    $this->_timer[$timer_id] = $mapId;
                } else {
                    $this->_timer[$timer_id] = null;
                }
                return $timer_id;
            case self::EV_READ:
            case self::EV_WRITE:
                $fd_key = (int) $fd;
                if (! isset($this->_fd[$fd_key])) {
                    if ($flag === self::EV_READ) {
                        $res = Event::add($fd, $func, null, SWOOLE_EVENT_READ);
                        $fd_type = SWOOLE_EVENT_READ;
                    } else {
                        $res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE);
                        $fd_type = SWOOLE_EVENT_WRITE;
                    }
                    if ($res) {
                        $this->_fd[$fd_key] = $fd_type;
                    }
                } else {
                    $fd_val = $this->_fd[$fd_key];
                    $res = true;
                    if ($flag === self::EV_READ) {
                        if (($fd_val & SWOOLE_EVENT_READ) !== SWOOLE_EVENT_READ) {
                            $res = Event::set($fd, $func, null,
                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
                            $this->_fd[$fd_key] |= SWOOLE_EVENT_READ;
                        }
                    } else {
                        if (($fd_val & SWOOLE_EVENT_WRITE) !== SWOOLE_EVENT_WRITE) {
                            $res = Event::set($fd, null, $func,
                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
                            $this->_fd[$fd_key] |= SWOOLE_EVENT_WRITE;
                        }
                    }
                }
                return $res;
        }
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::del()
     */
    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                return \pcntl_signal($fd, SIG_IGN, false);
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                // already remove in EV_TIMER_ONCE callback.
                if (! \array_key_exists($fd, $this->_timer)) {
                    return true;
                }
                $res = Timer::clear($fd);
                if ($res) {
                    $mapId = $this->_timer[$fd];
                    if (isset($mapId)) {
                        unset($this->_timerOnceMap[$mapId]);
                    }
                    unset($this->_timer[$fd]);
                }
                return $res;
            case self::EV_READ:
            case self::EV_WRITE:
                $fd_key = (int) $fd;
                if (isset($this->_fd[$fd_key])) {
                    $fd_val = $this->_fd[$fd_key];
                    if ($flag === self::EV_READ) {
                        $flag_remove = ~ SWOOLE_EVENT_READ;
                    } else {
                        $flag_remove = ~ SWOOLE_EVENT_WRITE;
                    }
                    $fd_val &= $flag_remove;
                    if (0 === $fd_val) {
                        $res = Event::del($fd);
                        if ($res) {
                            unset($this->_fd[$fd_key]);
                        }
                    } else {
                        $res = Event::set($fd, null, null, $fd_val);
                        if ($res) {
                            $this->_fd[$fd_key] = $fd_val;
                        }
                    }
                } else {
                    $res = true;
                }
                return $res;
        }
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::clearAllTimer()
     */
    public function clearAllTimer()
    {
        foreach (array_keys($this->_timer) as $v) {
            Timer::clear($v);
        }
        $this->_timer = array();
        $this->_timerOnceMap = array();
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::loop()
     */
    public function loop()
    {
        Event::wait();
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::destroy()
     */
    public function destroy()
    {
        Event::exit();
        posix_kill(posix_getpid(), SIGINT);
    }

    /**
     *
     * {@inheritdoc}
     *
     * @see \Workerman\Events\EventInterface::getTimerCount()
     */
    public function getTimerCount()
    {
        return \count($this->_timer);
    }
}

我们可以看到确实正确加载了Swoole的event-loop驱动,但仅仅也只是加载了event-loop,并没有在回调的注册部分加入协程,那么就相当于仅仅只是写了一个\Co\run(),但是没有在\Co\run()中创建协程进行运行,那么意味着当事件的回调函数中当监听8080端口进行处理,遇到了阻塞的时候还是无法出让当前控制权给event-loop,event-loop就没办法执行下一个8080端口的事件,为什么会这样呢?因为workerman使用stream_socket_server()对外部网络进行监听,而如下代码又会等待回调:

    // Workerman\Worker 2465-2476行
    public function resumeAccept()
    {
        // Register a listener to be notified when server socket is ready to read.
        if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
            if ($this->transport !== 'udp') {
                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
            } else {
                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
            }
            $this->_pauseAccept = false;
        }
    }

那么即便swoole底层hook了系统函数,也只是将mainSocket的回调出让,但来自相同mainSocket的下一次事件是需要上一次事件完结恢复才可以继续接收的。

导致

以上的问题会导致什么样的问题呢?

  • 非刻意的阻塞将worker占满,极端情况降低吞吐承载力
    • PDO
    • curl
    • 文件读写
    • 等等 blocking-I/O相关

传统解决方案:多开worker

  • 因为内外共用event-loop,刻意的阻塞实现会将worker占满,导致无法接收处理外部网络请求
    • 长轮询接口
    • http-sse
    • 一些长连接场景
    • 带有阻塞业务的timer
    • 队列 生产/消费
    • 等等

传统解决方案:自定义进程实现 或 使用外部服务

解决

基于上述情况,我开发了webman/workerman可用的协程基建插件,webman-coroutine

插件通过适配器模式和工厂模式的方法去兼容现目前市面上比较常见的几种协程驱动swowswoole、php-fiber(ripple实现),将不同的底层驱动抽象适配为统一的调用方法,并且兼容非协程环境,也就意味着你用同一套代码写出来的业务可以较为平滑的切换在这些环境及非协程环境之间,且保证逻辑是正常运行。

插件为webman的开发框架重新实现了webserver,让原本不完备支持协程的框架可以完备的支持协程:

<?php
/**
 * @author workbunny/Chaz6chez
 * @email chaz6chez1993@outlook.com
 */
declare(strict_types=1);

namespace Workbunny\WebmanCoroutine;

use Webman\App;
use Webman\Http\Request;
use Workbunny\WebmanCoroutine\Handlers\HandlerInterface;
use Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine;
use Workbunny\WebmanCoroutine\Utils\WaitGroup\WaitGroup;
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;

/**
 *  协程化web服务进程
 */
class CoroutineWebServer extends App
{

    /**
     * 每个连接的协程计数
     *
     * @var int[]
     */
    protected static array $_connectionCoroutineCount = [];

    /**
     * 获取连接的协程计数
     *
     * @return int[]|int
     */
    public static function getConnectionCoroutineCount(?string $connectionId = null): array|int
    {
        return $connectionId === null
            ? static::$_connectionCoroutineCount
            : (static::$_connectionCoroutineCount[$connectionId] ?? 0);
    }

    /**
     * 回收连接的协程计数
     *
     * @param string $connectionId
     * @param bool $force
     * @return void
     */
    public static function unsetConnectionCoroutineCount(string $connectionId, bool $force = false): void
    {
        if (!$force and self::getConnectionCoroutineCount($connectionId) > 0) {
            return;
        }
        unset(static::$_connectionCoroutineCount[$connectionId]);
    }

    /** @inheritdoc  */
    public function onWorkerStart($worker)
    {
        if (!\config('plugin.workbunny.webman-coroutine.app.enable', false)) {
            return;
        }
        parent::onWorkerStart($worker);
        /** @var HandlerInterface $handler */
        $handler = Factory::getCurrentHandler();
        $handler::initEnv();
    }

    /**
     * 停止服务
     *
     *  - 不用返回值和参数标定是为了兼容
     *
     * @param Worker|mixed $worker
     * @return void
     */
    public function onWorkerStop($worker, ...$params)
    {
        if (is_callable($call = [parent::class, 'onWorkerStop'])) {
            call_user_func($call, $worker, ...$params);
        }
    }

    /**
     * 连接建立
     *
     *  - 不用返回值和参数标定是为了兼容
     *
     * @param ConnectionInterface $connection
     * @param mixed ...$params
     * @return void
     */
    public function onConnect($connection, ...$params): void
    {
        if (!is_object($connection)) {
            return;
        }
        if (is_callable($call = [parent::class, 'onConnect'])) {
            // 协程化创建连接
            new Coroutine(function () use ($call, $connection, $params) {
                call_user_func($call, $connection, ...$params);
            });
        }
    }

    /**
     * 连接关闭
     *
     *  - 不用返回值和参数标定是为了兼容
     *
     * @param ConnectionInterface|mixed $connection
     * @param ...$params
     * @return void
     */
    public function onClose($connection, ...$params)
    {
        if (!is_object($connection)) {
            return;
        }
        if (is_callable($call = [parent::class, 'onClose'])) {
            // 协程化关闭连接
            new Coroutine(function () use ($call, $connection, $params) {
                call_user_func($call, $connection, ...$params);
            });
        }
        self::unsetConnectionCoroutineCount(spl_object_hash($connection), true);
    }

    /**
     * @link parent::onMessage()
     * @param ConnectionInterface|mixed $connection
     * @param Request|mixed $request
     * @param ...$params
     * @return null
     * @link parent::onMessage()
     */
    public function onMessage($connection, $request, ...$params)
    {
        if (!is_object($connection)) {
            return null;
        }
        $connectionId = spl_object_hash($connection);
        $params = func_get_args();
        $res = null;
        // 检测协程数
        if (($consumerCount = \config('plugin.workbunny.webman-coroutine.app.consumer_count', 0)) > 0) {
            // 等待协程回收
            wait_for(function () use ($connectionId, $consumerCount) {
                return self::getConnectionCoroutineCount($connectionId) <= $consumerCount;
            });
        }

        $waitGroup = new WaitGroup();
        $waitGroup->add();
        // 请求消费协程
        new Coroutine(function () use (&$res, $waitGroup, $params, $connectionId) {
            $res = parent::onMessage(...$params);
            // 计数 --
            self::$_connectionCoroutineCount[$connectionId] --;
            // 尝试回收
            self::unsetConnectionCoroutineCount($connectionId);
            // wg完成
            $waitGroup->done();
        });
        // 计数 ++
        self::$_connectionCoroutineCount[$connectionId] =
            (isset(self::$_connectionCoroutineCount[$connectionId])
                ? self::$_connectionCoroutineCount[$connectionId] + 1
                : 1);
        // 等待
        $waitGroup->wait();
        return $res;
    }
}

CoroutineWebServer是继承并代理了App的onMessage方法,将原本的方法执行回调化,并且做到了非侵入onMessage的执行逻辑,较为安全的支持了未来webman可能的升级改动。

另外对于workerman 4.x下的event驱动也做了兼容,除了增加了swow的事件驱动外,还重新实现了swoole的事件驱动:

<?php
/**
 * @author workbunny/Chaz6chez
 * @email chaz6chez1993@outlook.com
 */

declare(strict_types=1);

namespace Workbunny\WebmanCoroutine\Events;

use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Workbunny\WebmanCoroutine\Exceptions\EventLoopException;
use Workerman\Events\EventInterface;

class SwooleEvent implements EventInterface
{
    /** @var int[] All listeners for read event. */
    protected array $_reads = [];

    /** @var int[] All listeners for write event. */
    protected array $_writes = [];

    /** @var callable[] Event listeners of signal. */
    protected array $_signals = [];

    /** @var int[] Timer id to timer info. */
    protected array $_timer = [];

    /** @var int 定时器id */
    protected int $_timerId = 0;

    /**
     * @param bool $debug 测试用
     * @throws EventLoopException 如果没有启用拓展
     */
    public function __construct(bool $debug = false)
    {
        if (!$debug and !extension_loaded('swoole')) {
            throw new EventLoopException('Not support ext-swoole. ');
        }
    }

    /** @inheritdoc  */
    public function add($fd, $flag, $func, $args = [])
    {
        switch ($flag) {
            case EventInterface::EV_SIGNAL:
                if (!isset($this->_signals[$fd])) {
                    if ($res = Process::signal($fd, $func)) {
                        $this->_signals[$fd] = $func;
                    }

                    return $res;
                }

                return false;
            case EventInterface::EV_TIMER:
            case EventInterface::EV_TIMER_ONCE:
                $timerId = $this->_timerId++;
                $this->_timer[$timerId] = Timer::after((int) ($fd * 1000), function () use ($timerId, $flag, $func) {
                    call_user_func($func);
                    if ($flag === EventInterface::EV_TIMER_ONCE) {
                        $this->del($timerId, $flag);
                    }
                });

                return $timerId;
            case EventInterface::EV_READ:
                if (\is_resource($fd)) {
                    if ($this->_reads[$key = (int) $fd] ?? null) {
                        $this->del($fd, EventInterface::EV_READ);
                    }
                    if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_READ)) {
                        $this->_reads[$key] = 1;
                    }

                    return (bool) $res;
                }

                return false;
            case self::EV_WRITE:
                if (\is_resource($fd)) {
                    if ($this->_writes[$key = (int) $fd] ?? null) {
                        $this->del($fd, EventInterface::EV_WRITE);
                    }
                    if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_WRITE)) {
                        $this->_writes[$key] = 1;
                    }

                    return (bool) $res;
                }

                return false;
            default:
                return null;
        }
    }

    /** @inheritdoc  */
    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                if ($this->_signals[$fd] ?? null) {
                    if (Process::signal($fd, null)) {
                        unset($this->_signals[$fd]);

                        return true;
                    }
                }

                return false;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                if ($id = $this->_timer[$fd] ?? null) {
                    if (Timer::clear($id)) {
                        unset($this->_timer[$fd]);

                        return true;
                    }
                }

                return false;
            case self::EV_READ:
                if (
                    \is_resource($fd) and
                    isset($this->_reads[$key = (int) $fd]) and
                    Event::isset($fd, SWOOLE_EVENT_READ)
                ) {
                    if (Event::del($fd)) {
                        unset($this->_reads[$key]);

                        return true;
                    }
                }

                return false;
            case self::EV_WRITE:
                if (
                    \is_resource($fd) and
                    isset($this->_writes[$key = (int) $fd]) and
                    Event::isset($fd, SWOOLE_EVENT_WRITE)
                ) {
                    if (Event::del($fd)) {
                        unset($this->_writes[$key]);

                        return true;
                    }
                }

                return false;
            default:
                return null;
        }
    }

    /** @inheritdoc  */
    public function loop()
    {
        // 阻塞等待
        Event::wait();
        // 确定loop为退出状态
        exit(0);
    }

    /** @inheritdoc  */
    public function destroy()
    {
        // 移除所有定时器
        $this->clearAllTimer();
        // 退出所有协程
        foreach (Coroutine::listCoroutines() as $coroutine) {
            Coroutine::cancel($coroutine);
        }
        // 退出event loop
        Event::exit();
        $this->_reads = $this->_writes = [];
    }

    /** @inheritdoc  */
    public function clearAllTimer()
    {
        foreach ($this->_timer as $id) {
            Timer::clear($id);
        }
        $this->_timer = [];
    }

    /** @inheritdoc  */
    public function getTimerCount()
    {
        return count($this->_timer);
    }
}

在测试workerman 5.x的过程中还找到了一些workerman的swoole驱动的bug,我进行了PR,积极参与维护,fix: all coroutines must be canceled before Event::exit #1059

其他更多特性及功能请参考插件文档,插件也支持纯workerman开发环境,webman-coroutine文档

一些经验

1. 协程并不是银弹,并不会让一些原本耗费时间的逻辑变短,它只是能合理的利用阻塞的间歇去处理其他的业务,本质上是用空间换时间

2. PHP的数组和对象是存放在堆中的数据,其他如字符串、整数等是在栈上

  • 协程的切换中会自动保存寄存器和栈信息,但不会保存堆数据,这也就意味着堆数据会被多个协程操作,导致竞争状态
$a = new \stdClass();
$a->id = 1;
new Coroutine(function () use ($a) {
    // 一些业务逻辑
    $a->id = 2;
})
new Coroutine(function () use ($a) {
    // 一些业务逻辑
    $a->id = 3;
})
// 等待所有协程结束

// 由于每个协程的逻辑中可能存在协程切换出让,结合对象是堆数据且引用,最后的结果不能保证是1或者2或者3
// 数组同理
echo $a->id;
  • 对于保存在栈上的数据如果进行引用操作,也会存在竞争状态
$a = 1;
new Coroutine(function () use (&$a) {
    // 一些业务逻辑
    $a = 2;
})
new Coroutine(function () use (&$a) {
    // 一些业务逻辑
    $a = 3;
})
// 等待所有协程结束

// 由于每个协程的逻辑中可能存在协程切换出让,变量是引用,最后的结果不能保证是1或者2或者3
echo $a;
  • 堆数据可以利用clone进行拷贝操作,但资源类型不可以clone
  • 可以通过协程id + 静态数组结合来保存和销毁需要处理的竞态数据,从而实现协程上下文
static array $context = [];

$a = 1;

$id1 = new Coroutine(function () use (&$id1) {
    $contextA = self::$context[$id]
    // 一些业务逻辑
    self::$context[$id1] = 2;
})
self::$context[$id1] = $a;

$id2 = new Coroutine(function () use (&$id2) {
    $contextB = self::$context[$id]
    // 一些业务逻辑
    self::$context[$id2] = 3;
})
self::$context[$id1] = $a;

// 等待所有协程结束

// 这里会输出1
echo $a;

// 读取上下文内容,获取协程结果, 一般这里不推荐直接上下文读取,而是通过CSP模型的channel进行传递
// 还要注意上下文的回收,避免静态数组膨胀
echo self::$context[$id1];
echo self::$context[$id2];

// 以上并不是完整的上下文实现方案,只是一个伪代码!!

3. 关于数据库连接池

  • 数据库协议一般是支持双工的,但PDO是标准的blocking-I/O实现
  • PDO在发送SQL后会阻塞等待SQL的执行结果,swow和swoole在底层hook了阻塞等待的过程,进行了协程切换

    以pdo的mysql举例:

    // https://github.com/php/php-src/blob/master/ext/pdo_mysql/mysql_driver.c
    static zend_long mysql_handle_doer(pdo_dbh_t *dbh, const zend_string *sql)
    {
        pdo_mysql_db_handle *H = (pdo_mysql_db_handle *)dbh->driver_data;
        PDO_DBG_ENTER("mysql_handle_doer");
        PDO_DBG_INF_FMT("dbh=%p", dbh);
        PDO_DBG_INF_FMT("sql=%.*s", (int)ZSTR_LEN(sql), ZSTR_VAL(sql));
        if (mysql_real_query(H->server, ZSTR_VAL(sql), ZSTR_LEN(sql))) {
            pdo_mysql_error(dbh);
            PDO_DBG_RETURN(-1);
        } else {
            my_ulonglong c = mysql_affected_rows(H->server);
            if (c == (my_ulonglong) -1) {
                pdo_mysql_error(dbh);
                PDO_DBG_RETURN(H->einfo.errcode ? -1 : 0);
            } else {
                /* MULTI_QUERY support - eat up all unfetched result sets */
                MYSQL_RES* result;
                while (mysql_more_results(H->server)) {
                    if (mysql_next_result(H->server)) {
                        pdo_mysql_error(dbh);
                        PDO_DBG_RETURN(-1);
                    }
                    result = mysql_store_result(H->server);
                    if (result) {
                        mysql_free_result(result);
                    }
                }
                PDO_DBG_RETURN((int)c);
            }
        }
    }

    以上代码可以简单理解为以下伪代码

    $requestId = $mysqlClient->send('SQL');
    while (1) {
        $res = $mysqlClient->get($requestId);
        if ($res) {
            return $res;
        }
        // 超时等其他机制
        // 协程sleep出让
    }
  • 如果协程共用同一个连接,由于PDO的BIO实现方式,所以可能导致N次协程的请求都被$mysqlClient->send('SQL');由DB服务器接收并依次执行(来源于同一个连接的多次SQL是顺序执行),但可能存在后者的协程结果唤起了前者协程的$res = $mysqlClient->get($requestId);,从而导致数据错乱;这里本质上是因为PDO对象是堆数据,在多个协程中是竞态的,为了避免这样的情况,有以下方案解决:
    • 为每个协程创建连接 【不推荐】
    • 实现连接对象池(连接池本质上可以简单理解和实现为一个可以合理管理数据库连接对象上下文的静态数组)【需要一定开发能力】
    • 使用协程版的数据库,如 hyperf/databasehyperf/db

4. 其他需要池化的组件

  • 本质上和数据库存在的问题一样,是对象/数组这种堆数据的竞态问题
  • 如果不在意返回结果,其实就不用在意上下文问题

5. 更多经验,持续更新

愿景

目前webman/workerman的协程实现仅仅只是入了个门,主要解决了阻塞退化问题,能够简单的实现以下场景:

  • 长轮询接口
  • 非阻塞timer调度
  • 队列生产消费
  • worker/server协程化

但还有很多基建需要社区出谋出力添砖加瓦,比如:

  • 少侵入/非侵入的改造,让webman数据库连接池化
  • 少侵入/非侵入的改造,让workerman的组件协程化
  • 少侵入/非侵入的改造,让composer组件协程化

当然,在此之前,你可以使用所有基于swow\swoole\ripple\revolt协程驱动开发的协程版组件,但我希望未来可以整合这些协程实现的组件,能够有一个统一的使用方式(虽然难度相当大,但也想试试);

欢迎大佬们共建,issuePR
文章如有错误,敬请指正。谢谢!!


2024-10-22 更新

目前已经实现了较为基础的Utils\Pool工具,可用于对象池化的实现

  • Utils\Poo\Debugger可用于检测待池化的对象是否存在非法和风险,抛出的异常可以自行捕获进行日志监控或者是调试,详细参考测试用例
    • Debugger使用了WeakMap来对生命周期内的检测对象进行缓存,避免重复检查
    • Debugger使用了生成器进行递归检测,更少的内存占用,避免深递归内存溢出
    • 风险/非法信息
      • 静态数组属于风险项
      • 静态对象属于风险项
      • 资源类型属于非法项
  • Utils\Poo\Pool可用于实现连接池、资源锁等,具体可参考文档

建议和意见都可以提交issue

欢迎各位大佬的PR

1928 28 24
28个评论

shanjian

大佬厉害

  • 暂无评论
guanhui07

good job

  • 暂无评论
t2

学习了

  • 暂无评论
smile1

前排

  • 暂无评论
walkor

  • 暂无评论
yin5th

感谢大佬分享!!!

  • 暂无评论
初心by

兔神牛皮

  • 暂无评论
Tinywan

大佬厉害,666

  • 暂无评论
xiaoming

协程版的数据库 本质实现是 连接对象池吗

  • chaz6chez 2024-10-08

    PDO的对象与数据库客户端连接一一对应,连接对象池打破了单例连接的这种做法,所以相同的数据库会存在多个客户端连接,这个对象池主要是为了合理的去管理上下文问题

深路潇湘

看不懂,但依然给你点赞

  • chaz6chez 2024-10-08

    有不懂的地方可以在这里提问,我尽我所能解答疑问

10bang

  • 暂无评论
深蓝

大佬玩底层,我们只能摸摸大佬的风,也想PR出点力,奈何能力不够。

  • chaz6chez 2024-10-08

    使用过程中有任何需要实现的特性或者找到的bug🐞也可以积极提issue呀,代码的注释很齐全,也可以看看源码,总有机会提pr的,加油!

wocall

大师👍

  • 暂无评论
Jinson

666

  • 暂无评论
Gin

赞赞赞

  • 暂无评论
qq7467466

兔子大佬太强了

  • 暂无评论
tanhongbin

大佬就是大佬,底层都能改,我看底层代码都费劲,看5分钟就能睡着

  • 暂无评论
xiaopi

老哥依旧稳定输出

  • 暂无评论
tang23

先赞后看

  • 暂无评论
晚安。

赞👍

  • 暂无评论
liudada1204

先赞后看👍

wolfcode

好文~
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍

  • 暂无评论
ikun

如此好的文章 应该置顶推荐

  • 暂无评论
皮皮侠

牛皮

  • 暂无评论
皮皮侠

看起来有点迷糊,那么现在的情况是由于workerman的数据库不支持连接池,所以使用这个协程组件的情况下不能进行数据库操作吗

  • Tinywan 2024-10-19

    数据库连接池暂不支持

  • chaz6chez 2024-10-19

    插件内有Pool工具,通过Pool自行封装数据库或者连接相关的工具就可以了,暂时还不能直接使用webman/workerman的数据库插件

pengzhen

大佬,请问这个插件有生产环境使用吗?我目前正在为公司项目寻找框架,选定webman+gatewaywork,但是苦于没有协程,看到你这个插件,想用,又担心有坑

  • chaz6chez 26天前

    群里有大佬已经准备上生产了,我自己的项目目前还在测试环境没有上生产,这个项目目前是重心开发的,遇到任何问题都可以咨询

  • pengzhen 26天前

    有群吗?

  • chaz6chez 26天前

    webman微信群

  • pengzhen 26天前

    几群?

  • chaz6chez 26天前

    我都在

pengzhen

大佬,读了下文档,有以下几个问题不懂

1、webman 的webserver 要同时开启 CoroutineWebServer和webman自带的server吗?那样岂不是对外暴露了2个端口,webman自带的可以去掉吗?
2、操作数据库可以用hypderf/database 解决连接池问题吗?
3、上下文,用webman自带的context类保存包括对象,数组这些数据类型可以吗?还是必须要用协程id作为key区别开来

  • chaz6chez 26天前

    1.可以关闭webman自带的server
    2.使用swoole驱动可以使用hyperf的db,但需要自己引入框架
    3.一般使用无需关注上下文,跨协程处理数据建议使用channel

  • pengzhen 26天前

    我想用到的上下文其实就是想保存当前这个请求的全局变量,类似fpm下的静态属性,处理完这个请求就销毁的

  • chaz6chez 26天前

    你思考的稍微简单了一些

  • pengzhen 26天前

    怎么说?目前项目中需要的就是请求级别的变量,暂时没有跨协程的

  • chaz6chez 26天前

    请求+容器就跨协程啊,我建议如果想要上生产,自身对协程没有那么熟悉的话,直接用hyperf全家套

  • xiaopi 25天前

    3.我理解的实际上swoole的协程都在同一个进程空间,可以共同使用进程内存资源,所以多个协程利用同一个数组保存数据是ok的,而webman自带的Context类就是根据协程id作为数组key,区分不同的协程资源的,适用的场景就是不需要协程间协作的场景;
    而使用swoole的channel适用于需要协程间协作的场景,比如,一次请求下要记录到数据库、记录日志,并将记录的结果响应给调用方,这种情况下解决方案就是:
    1.父协程创建协程A和协程B分别处理这两件事
    2.父协程阻塞当前协程,等待协程A和协程B的执行结果
    3.父协程得到了结果,并响应给调用放。

    上述的情况使用共享变量的情况下就很难处理,想要阻塞父协程,不使用内置的waitGroup的情况下,就必须在父协程写个循环体,并且为了不阻塞进程,还需要写个IO,即sleep ,while(true){ sleep(1)}
    而使用channel就很方便,直接在父协程pop两次,协程A完成后push(),协程B完成后push(),伪代码:

    $chan = new Channel();
    go(function(){
    //
    $chan->push()
    })
    go(function(){
    //
    $chan->push()
    })

    $chan->pop()
    $chan->pop()

    return response("ok");

  • chaz6chez 22天前

    workerman如果使用协程,协程也在一个进程空间,数据引用在协程之间是不隔离的,比如&的数据,比如对象,比如资源类型,比如数组,只要是引用类型的数据都存在协程间的竞争状态,这样的竞争状态会导致数据可能被污染,为了达到数据不被污染的效果,除了对数据加锁外,还可以通过channel进行有序传递,也就是当一个数据正在被一个协程消费的时候,其他的协程是没有从通道内获取到数据的,直到获取到数据的协程消费完毕,变相的其实也是一种锁的机制,在pop不到数据的时候协程会自动出让控制权;不论是上下文还是通道还是sync锁,都是一种竞争数据的并发安全操作。

德玛西亚

先赞后看,养成习惯。

  • 暂无评论

chaz6chez

4594
积分
0
获赞数
0
粉丝数
2018-11-16 加入
×
🔝