直接start运行,大约45秒(累计2W5+数据后)后抛出异常
Warning: stream_socket_client(): unable to connect to tcp://172.18.0.19:22345 (Address not available) in /var/www/html/vendor/workerman/workerman/Connection/AsyncTcpConnection.php on line 185
MQTT内数据为
{"code":"LI2701_LS_HH","type":"BOOL","value":false,"timestamp":"1617874140"}
服务端代码1
require_once __DIR__ . '/../../vendor/autoload.php';
use \Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
$worker->onWorkerStart = function(){
global $db;
$db = 0;
$mqtt = new \Workerman\Mqtt\Client('mqtt://127.0.0.1:1883');
$mqtt->onConnect = function($mqtt) {
$mqtt->subscribe('simp/nodes/#');
};
$mqtt->onMessage = function($topic, $content){
global $db;
// 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:22345');
// 任务及参数数据
// $task_data = array(
// 'function' => 'send_mail',
// 'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
// );
// 发送数据
$topic = explode("/", $topic);
$data = json_decode($content,true);
$data["count"] = $db;
$db++;
$task_connection->send(json_encode($data));
// 异步获得结果
$task_connection->onMessage = function($task_connection, $task_result)
{
// 结果
// 获得结果后记得关闭异步连接
$task_connection->close();
// 通知对应的websocket客户端任务完成
};
// 执行异步连接
$task_connection->connect();
};
$mqtt->connect();
};
服务端代码2
require_once __DIR__ . '/../../vendor/autoload.php';
use Workerman\Worker;
// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:22345');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function($connection, $task_data)
{
// 假设发来的是json数据
$data = json_decode($task_data, true);
// 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
echo $data["count"]."\n";
$task_result = "ok";
// 发送结果
$connection->send($task_result);
};
异常时执行status
每次建立一个连接客户端侧都会消耗一个端口,连接断开后端口不会立刻释放,会进入time_wait等待一段事件。如果不断的快速创建连接本地端口可能因此消耗光,进而无法发起新的连接。
如果你的异步任务真的有这么大量,可以考虑复用连接,类似连接池,建立多个连接保存在全局数组里。每次选择一个连接发送数据。