「分享实验」LaravelORM+协程在Webman中的应用

cclilshy

这几天我在想如何在Webman框架中使用LaravelORM并支持协程。将两者结合起来,理论上可以兼顾高并发与开发效率。

实验目标

在Webman中集成LaravelORM协程版,并验证其性能和兼容性。

实验准备

环境配置

  • 操作系统:Ubuntu 20.04
  • PHP版本:PHP 8.1
  • Webman版本:最新版
  • PRipple引擎: 开发版

审计LaravelORM

Illuminate\Database\Connection

public function select($query, $bindings = [], $useReadPdo = true)
{
    return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
        if ($this->pretending()) {
            return [];
        }

        // For select statements, we'll simply execute the query and return an array
        // of the database result set. Each element in the array will be a single
        // row from the database table, and will either be an array or objects.
        $statement = $this->prepared(
            $this->getPdoForSelect($useReadPdo)->prepare($query)
        );

        $this->bindValues($statement, $this->prepareBindings($bindings));
        $statement->execute();
        return $statement->fetchAll();
    });
}

以select方法为例可以看到上述类中,Laravel将所有对\PDO的操作都封装在了Connection中
并提供了ConnectionInterface的抽象接口,这意味着如果实现了这个接口,就可以无缝的替换掉PDO逻辑

施工过程

我选用了AMPHP的MySQL客户端库amphp/mysql来实现这个接口

<?php declare(strict_types=1);

use Amp\Mysql\MysqlConfig;
use Amp\Mysql\MysqlConnectionPool;
use Amp\Mysql\MysqlTransaction;
use Closure;
use Exception;
use Fiber;
use Generator;
use Illuminate\Database\MySqlConnection;
use Throwable;

use function boolval;
use function in_array;
use function spl_object_hash;
use function trim;

class PConnection extends MySqlConnection
{
    private const ALLOW_OPTIONS = [
        'host',
        'port',
        'user',
        'password',
        'db',
        'charset',
        'collate',
        'compression',
        'local-infile',

        'username',
        'database'
    ];

    /*** @var MysqlConnectionPool */
    private MysqlConnectionPool $pool;

    /**
     * @param        $pdo
     * @param string $database
     * @param string $tablePrefix
     * @param array  $config
     */
    public function __construct($pdo, string $database = '', string $tablePrefix = '', array $config = [])
    {
        parent::__construct($pdo, $database, $tablePrefix, $config);
        $dsn = '';
        foreach ($config as $key => $value) {
            if (in_array($key, static::ALLOW_OPTIONS, true)) {
                if (!$value) {
                    continue;
                }

                $key = match ($key) {
                    'username' => 'user',
                    'database' => 'db',
                    default => $key
                };
                $dsn .= "{$key}={$value} ";
            }
        }
        $config     = MysqlConfig::fromString(trim($dsn));
        $this->pool = new MysqlConnectionPool($config);
        //                        if (isset($this->pdo)) {
        //                            unset($this->pdo);
        //                        }
    }

    /**
     * @return void
     */
    public function beginTransaction(): void
    {
        $transaction = $this->pool->beginTransaction();
        ;
        if ($fiber = Fiber::getCurrent()) {
            $this->fiber2transaction[spl_object_hash($fiber)] = $transaction;
        } else {
            $this->fiber2transaction['main'] = $transaction;
        }
    }

    /**
     * @return void
     * @throws Exception
     */
    public function commit(): void
    {
        if ($fiber = Fiber::getCurrent()) {
            $key = spl_object_hash($fiber);
        } else {
            $key = 'main';
        }

        if (!$transaction = $this->fiber2transaction[$key] ?? null) {
            throw new Exception('Transaction not found');
        }

        $transaction->commit();
        unset($this->fiber2transaction[$key]);
    }

    /**
     * @param $toLevel
     * @return void
     * @throws Exception
     */
    public function rollBack($toLevel = null): void
    {
        if ($fiber = Fiber::getCurrent()) {
            $key = spl_object_hash($fiber);
        } else {
            $key = 'main';
        }

        if (!$transaction = $this->fiber2transaction[$key] ?? null) {
            throw new Exception('Transaction not found');
        }

        $transaction->rollback();
        unset($this->fiber2transaction[$key]);
    }

    /**
     * @var MysqlTransaction[]
     */
    private array $fiber2transaction = [];

    /**
     * @param Closure $callback
     * @param int     $attempts
     * @return mixed
     * @throws Throwable
     */
    public function transaction(Closure $callback, $attempts = 1): mixed
    {
        $this->beginTransaction();
        try {
            $result = $callback($this->getTransaction());
            $this->commit();
            return $result;
        } catch (Throwable $e) {
            $this->rollBack();
            throw $e;
        }
    }

    /**
     * @return MysqlTransaction|null
     */
    private function getTransaction(): MysqlTransaction|null
    {
        if ($fiber = Fiber::getCurrent()) {
            $key = spl_object_hash($fiber);
        } else {
            $key = 'main';
        }

        if (!$transaction = $this->fiber2transaction[$key] ?? null) {
            return null;
        }

        return $transaction;
    }

    /**
     * @param string $query
     * @param array  $bindings
     * @param bool   $useReadPdo
     * @return array
     */
    public function select($query, $bindings = [], $useReadPdo = true): mixed
    {
        return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
            if ($this->pretending()) {
                return [];
            }

            $statement = $this->pool->prepare($query);
            return $statement->execute($this->prepareBindings($bindings));
        });
    }

    /**
     * @param string $query
     * @param array  $bindings
     * @return bool
     */
    public function statement($query, $bindings = []): bool
    {
        return $this->run($query, $bindings, function ($query, $bindings) {
            if ($this->pretending()) {
                return [];
            }

            $statement = $this->getTransaction()?->prepare($query) ?? $this->pool->prepare($query);
            return boolval($statement->execute($this->prepareBindings($bindings)));
        });
    }

    /**
     * 针对数据库运行 select 语句并返回所有结果集。
     *
     * @param string $query
     * @param array  $bindings
     * @param bool   $useReadPdo
     * @return array
     */
    public function selectResultSets($query, $bindings = [], $useReadPdo = true): array
    {
        return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
            if ($this->pretending()) {
                return [];
            }

            $statement = $this->pool->prepare($query);
            $result    = $statement->execute($this->prepareBindings($bindings));
            $sets      = [];

            while ($result = $result->getNextResult()) {
                $sets[] = $result;
            }

            return $sets;
        });
    }

    /**
     * 针对数据库运行 select 语句并返回一个生成器。
     *
     * @param string $query
     * @param array  $bindings
     * @param bool   $useReadPdo
     * @return Generator
     */
    public function cursor($query, $bindings = [], $useReadPdo = true): Generator
    {
        while ($record = $this->select($query, $bindings, $useReadPdo)) {
            yield $record;
        }
    }

    /**
     * 运行 SQL 语句并获取受影响的行数。
     *
     * @param string $query
     * @param array  $bindings
     * @return int
     */
    public function affectingStatement($query, $bindings = []): int
    {
        return $this->run($query, $bindings, function ($query, $bindings) {
            if ($this->pretending()) {
                return 0;
            }
            // 对于更新或删除语句,我们想要获取受影响的行数
            // 通过该语句并将其返回给开发人员。我们首先需要
            // 执行该语句,然后我们将使用 PDO 来获取受影响的内容。
            $statement = $this->pool->prepare($query);
            $result    = $statement->execute($this->prepareBindings($bindings));
            $this->recordsHaveBeenModified(
                ($count = $result->getRowCount()) > 0
            );
            return $count;
        });
    }

    /**
     * @return void
     */
    public function reconnect()
    {
        //TODO: 无事可做
    }

    /**
     * @return void
     */
    public function reconnectIfMissingConnection()
    {
        //TODO: 无事可做
    }
}

取代工厂

实现了Connection之后我们还有Hook住DatabaseManager的工厂方法`

return new class ($app) extends ConnectionFactory {
    /**
     * Create a new connection instance.
     *
     * @param string      $driver
     * @param PDO|Closure $connection
     * @param string      $database
     * @param string      $prefix
     * @param array       $config
     * @return SQLiteConnection|MariaDbConnection|MySqlConnection|PostgresConnection|SqlServerConnection|Connection
     *
     */
    protected function createConnection($driver, $connection, $database, $prefix = '', array $config = []): SQLiteConnection|MariaDbConnection|MySqlConnection|PostgresConnection|SqlServerConnection|Connection
    {
        return match ($driver) {
            'mysql' => new PConnection($connection, $database, $prefix, $config),
            'mariadb' => new MariaDbConnection($connection, $database, $prefix, $config),
            'pgsql' => new PostgresConnection($connection, $database, $prefix, $config),
            'sqlite' => new SQLiteConnection($connection, $database, $prefix, $config),
            'sqlsrv' => new SqlServerConnection($connection, $database, $prefix, $config),
            default => throw new InvalidArgumentException("Unsupported driver [{$driver}]."),
        };
    }
}

为了验证上述无缝耦合的最终效果,我准备将它安装到Webman

接入过程

我封装了一个Database类,用于Hook住Laravel的DatabaseManager

use Illuminate\Container\Container;
use Illuminate\Database\Capsule\Manager;
use Illuminate\Database\DatabaseManager;
use Illuminate\Events\Dispatcher;
use Illuminate\Pagination\Cursor;
use Illuminate\Pagination\CursorPaginator;
use Illuminate\Pagination\Paginator;
use Psc\Drive\Laravel\Coroutine\Database\Factory;
use function class_exists;
use function config;
use function get_class;
use function method_exists;
use function request;

class Database extends Manager
{
    /**
     * @return void
     */
    public static function install(): void
    {
        /**
         * 判断是否安装Webman
         */
        if (!class_exists(\support\Container::class)) {
            return;
        }

        /**
         * 判断是否曾被Hook
         */
        if (isset(parent::$instance) && get_class(parent::$instance) === Database::class) {
            return;
        }

        /**
         * Hook webman LaravelDB
         */
        $config      = config('database', []);
        $connections = $config['connections'] ?? [];
        if (!$connections) {
            return;
        }
        $app = Container::getInstance();

        /**
         * Hook数据库连接工厂
         */
        $capsule = new Database($app);
        $default = $config['default'] ?? false;

        if ($default) {
            $defaultConfig = $connections[$config['default']] ?? false;
            if ($defaultConfig) {
                $capsule->addConnection($defaultConfig);
            }
        }

        foreach ($connections as $name => $config) {
            $capsule->addConnection($config, $name);
        }

        if (class_exists(Dispatcher::class) && !$capsule->getEventDispatcher()) {
            $capsule->setEventDispatcher(\support\Container::make(Dispatcher::class, [Container::getInstance()]));
        }

        // Set as global
        $capsule->setAsGlobal();
        $capsule->bootEloquent();

        // Paginator
        if (class_exists(Paginator::class)) {
            if (method_exists(Paginator::class, 'queryStringResolver')) {
                Paginator::queryStringResolver(function () {
                    $request = request();
                    return $request?->queryString();
                });
            }
            Paginator::currentPathResolver(function () {
                $request = request();
                return $request ? $request->path() : '/';
            });
            Paginator::currentPageResolver(function ($pageName = 'page') {
                $request = request();
                if (!$request) {
                    return 1;
                }
                $page = (int)($request->input($pageName, 1));
                return $page > 0 ? $page : 1;
            });
            if (class_exists(CursorPaginator::class)) {
                CursorPaginator::currentCursorResolver(function ($cursorName = 'cursor') {
                    return Cursor::fromEncoded(request()->input($cursorName));
                });
            }
        }

        parent::$instance = $capsule;
    }

    /**
     * Hook Factory
     * @return void
     */
    protected function setupManager(): void
    {
        $factory       = new Factory($this->container);
        $this->manager = new DatabaseManager($this->container, $factory);
    }
}

实践运行结果

为了更直观的展现协程的效果,我将webman-worker数量改为了1,并且在每次请求中都会进行数据库查询

初始化控制器

 /**
 * @param Request $request
 * @return string
 */
public function index(Request $request): string
{
    // 手动Hook调DatabaseManager
    Database::install();

    // 记录执行时间
    $startTime = microtime(true);

    // 模拟一个耗时1s的查询
    $result    = Db::statement('SELECT SLEEP(1);');

    // 记录结束时间
    $endTime   = microtime(true);

    // 输出结果
    return "{$startTime} - {$endTime}";
}

启动单元测试

<?php declare(strict_types=1);

namespace Tests;

use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Psc\Plugins\Guzzle\PHandler;
use function P\async;
use function P\tick;

class CoroutineTest extends TestCase
{
    public function test_main(): void
    {
        $client = new Client(['handler' => new PHandler(['pool'=>0])]);
        for ($i = 0; $i < 100; $i++) {
            async(function () use ($client, $i) {
                $response        = $client->get('http://127.0.0.1:8787/');
                $responseContent = $response->getBody()->getContents();
                echo "Request $i: $responseContent\n";
            });
        }
        tick();
        $this->assertEquals(1, 1);
    }
}

最终输出结果

可以看到每个请求都确实耗时一秒,但100个请求都在一秒内完成了

Request 0: 1723015194.3121 - 1723015195.4183
Request 1: 1723015194.3389 - 1723015195.4193
Request 2: 1723015194.339 - 1723015195.4196
Request 3: 1723015194.3391 - 1723015195.4187
Request 4: 1723015194.3391 - 1723015195.4198
Request 5: 1723015194.3392 - 1723015195.42
Request 6: 1723015194.3393 - 1723015195.4202
Request 7: 1723015194.3394 - 1723015195.4204
Request 8: 1723015194.3394 - 1723015195.4588
Request 9: 1723015194.3395 - 1723015195.4595
Request 10: 1723015194.3395 - 1723015195.4626
Request 11: 1723015194.3396 - 1723015195.4633
Request 12: 1723015194.3397 - 1723015195.4653
Request 13: 1723015194.3398 - 1723015195.4658
Request 14: 1723015194.3398 - 1723015195.4688
Request 15: 1723015194.3399 - 1723015195.4726
Request 16: 1723015194.34 - 1723015195.4735
Request 17: 1723015194.34 - 1723015195.4774
Request 18: 1723015194.3401 - 1723015195.48
Request 19: 1723015194.3402 - 1723015195.4805
Request 20: 1723015194.3402 - 1723015195.4816
Request 21: 1723015194.3403 - 1723015195.4818
Request 22: 1723015194.3404 - 1723015195.4862
Request 23: 1723015194.3404 - 1723015195.4911
Request 24: 1723015194.3405 - 1723015195.4915
Request 25: 1723015194.3406 - 1723015195.4917
Request 26: 1723015194.3406 - 1723015195.4919
Request 27: 1723015194.3408 - 1723015195.4921
Request 28: 1723015194.3408 - 1723015195.4923
Request 29: 1723015194.3409 - 1723015195.4925
Request 30: 1723015194.3409 - 1723015195.4933
Request 31: 1723015194.341 - 1723015195.4935
Request 32: 1723015194.341 - 1723015195.4936
Request 33: 1723015194.3411 - 1723015195.4938
Request 34: 1723015194.3412 - 1723015195.494
Request 35: 1723015194.3412 - 1723015195.4941
Request 36: 1723015194.3413 - 1723015195.4943
Request 37: 1723015194.3414 - 1723015195.4944
Request 38: 1723015194.3414 - 1723015195.4946
Request 39: 1723015194.3416 - 1723015195.4947
Request 40: 1723015194.3417 - 1723015195.4949
Request 41: 1723015194.3418 - 1723015195.495
Request 42: 1723015194.342 - 1723015195.5174
Request 43: 1723015194.3421 - 1723015195.518
Request 44: 1723015194.3423 - 1723015195.5184
Request 45: 1723015194.3425 - 1723015195.5191
Request 46: 1723015194.3426 - 1723015195.5194
Request 48: 1723015194.3429 - 1723015195.5215
Request 50: 1723015194.3433 - 1723015195.5219
Request 51: 1723015194.3435 - 1723015195.5221
Request 47: 1723015194.3428 - 1723015195.5225
Request 49: 1723015194.3431 - 1723015195.523
Request 52: 1723015194.3436 - 1723015195.5265
Request 53: 1723015194.3437 - 1723015195.5268
Request 54: 1723015194.3439 - 1723015195.527
Request 55: 1723015194.344 - 1723015195.5275
Request 56: 1723015194.3443 - 1723015195.5282
Request 57: 1723015194.3444 - 1723015195.5314
Request 58: 1723015194.3445 - 1723015195.5316
Request 59: 1723015194.3445 - 1723015195.5318
Request 60: 1723015194.3446 - 1723015195.5323
Request 61: 1723015194.3448 - 1723015195.5324
Request 62: 1723015194.3449 - 1723015195.5326
Request 63: 1723015194.345 - 1723015195.5328
Request 64: 1723015194.3451 - 1723015195.533
Request 65: 1723015194.3453 - 1723015195.5331
Request 66: 1723015194.3455 - 1723015195.5437
Request 67: 1723015194.3456 - 1723015195.5441
Request 69: 1723015194.3458 - 1723015195.5443
Request 70: 1723015194.3459 - 1723015195.5445
Request 71: 1723015194.346 - 1723015195.5448
Request 72: 1723015194.3464 - 1723015195.5451
Request 68: 1723015194.3457 - 1723015195.5456
Request 73: 1723015194.3471 - 1723015195.5508
Request 74: 1723015194.3475 - 1723015195.551
Request 75: 1723015194.3478 - 1723015195.5512
Request 76: 1723015194.3482 - 1723015195.5516
Request 77: 1723015194.3486 - 1723015195.5518
Request 78: 1723015194.3489 - 1723015195.5542
Request 79: 1723015194.3491 - 1723015195.5545
Request 80: 1723015194.3492 - 1723015195.5549
Request 81: 1723015194.3493 - 1723015195.5605
Request 82: 1723015194.3493 - 1723015195.561
Request 83: 1723015194.3494 - 1723015195.5633
Request 84: 1723015194.3494 - 1723015195.5638
Request 85: 1723015194.3495 - 1723015195.5641
Request 86: 1723015194.3496 - 1723015195.5661
Request 87: 1723015194.3496 - 1723015195.5678
Request 88: 1723015194.3497 - 1723015195.5681
Request 89: 1723015194.3499 - 1723015195.5684
Request 90: 1723015194.35 - 1723015195.5685
Request 91: 1723015194.3501 - 1723015195.5756
Request 92: 1723015194.3502 - 1723015195.5758
Request 93: 1723015194.3504 - 1723015195.576
Request 94: 1723015194.3505 - 1723015195.5768
Request 95: 1723015194.3506 - 1723015195.577
Request 96: 1723015194.3508 - 1723015195.5772
Request 97: 1723015194.3509 - 1723015195.5774
Request 98: 1723015194.3509 - 1723015195.5777
Request 99: 1723015194.351 - 1723015195.5781

结论

通过上述实践,我们可以看到LaravelORM与Webman中使用协程是非常具有可行性的,
并且在高并发&慢查询场景下,协程的优势也在此得到了充分的体现

1188 9 8
9个评论

初心by

牛皮

  • 暂无评论
释永战

有没有ThinkPHP的?

软饭工程师

你是真的牛批啊,代码是不是没贴全,有点没看懂,能否封装成composer扩展呢
在实际项目中,如何使用,比如大量数据的导出,希望在查询阶段消耗较少的时间;有个定时任务,做数据处理和同步,这个能否有实际使用价值

生椰拿铁0糖

好!期待composer发布

  • 暂无评论
mohy

后面部分,laravel 提供的这个方法就可以增加新的数据库驱动了
\Illuminate\Database\Connection::resolverFor

深林孤鹰

太牛了,必须顶一下,感觉webman+pripple已经无敌了

  • 暂无评论
tanhongbin

大佬,好好整整 这个composer如果出了 你准能上名人榜 太给力了 很牛

  • 暂无评论
ab0029

666

  • 暂无评论
Jinson

666,刚看到这篇,官网文档倒是还没有,等你这个上了。Guzzle http 本身支持异步,clichouse 用 smi2/phpclickhouse 也有 selectAsync 异步查询,再加上 Eloquent Mysql,就基本满足我需求了,期待

  • 暂无评论

cclilshy

420
积分
0
获赞数
0
粉丝数
2024-04-09 加入
×
🔝