webman使用Amphp并行数据库查询,然后获取合并结果
安装
composer require amphp/amp
composer require amphp/mysql
结果
代码
<?php
namespace app\test\controller;
use app\common\controller\ApiBase;
use support\facade\Logger;
use support\Response;
use Throwable;
use Amp\Future;
use Amp\Mysql\MysqlConfig;
use Amp\Mysql\MysqlConnectionPool;
use function Amp\async;
use function Amp\Future\awaitAll;
class Test4Controller extends ApiBase
{
private function createPool(): MysqlConnectionPool
{
$config = MysqlConfig::fromString(
"host=localhost user=root password=123456 db=test"
);
return new MysqlConnectionPool($config);
}
public function function1(MysqlConnectionPool $pool,$i): Future
{
return async(function () use ($pool,$i) {
// 模拟异步任务1
$result = time() . " 任务 1-$i 开始" . PHP_EOL;
$statement = $pool->prepare('SELECT SLEEP(20)');
$statement->execute();
$result .= time() . " 任务 1-$i 完成" . PHP_EOL;
return $result;
});
}
public function function2(MysqlConnectionPool $pool,$i): Future
{
return async(function () use ($pool,$i) {
// 模拟异步任务2
$result = time() . " 任务 2-$i 开始" . PHP_EOL;
$statement = $pool->prepare('SELECT SLEEP(10)');
$statement->execute();
$result .= time() . " 任务 2-$i 完成" . PHP_EOL;
return $result;
});
}
public function function3(MysqlConnectionPool $pool,$i): Future
{
return async(function () use ($pool,$i) {
// 模拟异步任务3
$result = time() . " 任务 3-$i 开始" . PHP_EOL;
$statement = $pool->prepare('SELECT SLEEP(5)');
$statement->execute();
$result .= time() . " 任务 3-$i 完成" . PHP_EOL;
return $result;
});
}
public function runTasks(): Response
{
echo time() . " 开始任务...".PHP_EOL.PHP_EOL;
$pool = $this->createPool();
try {
$tasks=[];
for ($i = 0; $i < 10; $i++) {
$tasks[] = $this->function1($pool,$i);
$tasks[] = $this->function2($pool,$i);
$tasks[] = $this->function3($pool,$i);
}
$results = awaitAll($tasks);
echo "失败:".PHP_EOL;
foreach ($results[0] as $result) {
echo $result . PHP_EOL;
}
echo "成功:".PHP_EOL;
foreach ($results[1] as $result) {
echo $result . PHP_EOL;
}
echo time() . " 完成任务...".PHP_EOL;
return res_success(111);
} catch (Throwable $e) {
Logger::admin(get_exc_str($e), 'error');
return res_error(msg: '处理异常:' . $e->getMessage());
} finally {
$pool->close();
}
}
}
个评论
好文章