webman mqtt的几种用法

sorshion

做个备忘录

利用webman启动时自动运行机制

此机制会产生进程数量的链接
截图

// bootstap/Mqtt.php

<?php

namespace bootstrap;

use Webman\Bootstrap;

class Mqtt implements Bootstrap
{
    protected static $mqtt = null;

    protected static $connected = false;

    public static function start($worker)
    {
        $mqtt = new \Workerman\Mqtt\Client('mqtt://0.0.0.1:1883', [
            'username'  => 'xxx',
            'password'  =>  'xxx',
            'debug'     =>  true
        ]);
        $mqtt->connect();
        $mqtt->onConnect = function ($mqtt) {
            self::$connected = true;
            if (!empty($mqtt->waitQueue)) {
                foreach ($mqtt->waitQueue as $item) {
                    $mqtt->publish($item[0], $item[1]);
                }
                $mqtt->waitQueue = [];
            }
        };
        static::$mqtt = $mqtt;
    }

    public static function publish($t, $m)
    {
        if (static::$connected === false) {
            static::$mqtt->waitQueue[] = [$t, $m];
            return;
        }
        static::$mqtt->publish($t, $m);
    }
}

// config/bootstap.php
...
return [
    ...
    bootstrap\Mqtt::class,
];

推送使用方式

use bootstrap\Mqtt;
Mqtt::publish($topic, $content);

参考

github issue

利用webman自定义进程

// process/MqttClient.php
<?php

namespace process;

class MqttClient
{
    protected static $mqtt = null;

    protected static $connected = false;

    public function onWorkerStart()
    {
        $mqtt = new \Workerman\Mqtt\Client('mqtt://0.0.0.1:1883', [
            'username'  => 'xxx',
            'password'  =>  'xxx',
            'debug'     =>  true
        ]);
        $mqtt->connect();
        $mqtt->onConnect = function ($mqtt) {
            self::$connected = true;
        };
        // \Workerman\Timer::add(2, function () use ($mqtt) {
        //     $mqtt->publish('workerman', 'hello workerman mqtt');
        // });
        static::$mqtt = $mqtt;
    }

    public function onMessage($connection, string $data)
    {
        if (self::$connected) {
            // 通过data中的信息动态发布
            $arr = json_decode($data, true);
            self::$mqtt->publish($arr['topic'], $arr['content']);
        }

        $connection->close($data);
    }
}

// config/process.php
'mqttclient' => [
    'handler' => process\MqttClient::class,
    'listen' => 'tcp://0.0.0.0:8789',
    'count' => 1, // 进程数
],

推送使用方式

$client = stream_socket_client('tcp://0.0.0.0:8789');

$data = [
'topic' => $topic,
'content' => $content,
];
stream_socket_sendto($client, json_encode($data));

参考

1
2

1519 4 7
4个评论

Gin

good good

  • 暂无评论
晚安。

+1

  • 暂无评论
xiaobai

这个MQTT服务端要自己创建吗?

  • Gin 2024-05-28

    emqx docker运行就行

oliusha

“利用webman启动时自动运行机制”使用这个方法,执行命令行命令出错了。

/www # php webman list

Fatal error: Uncaught Error: Call to a member function add() on null in /www/vendor/workerman/workerman/Connection/AsyncTcpConnection.php:215
Stack trace:
#0 /www/vendor/workerman/mqtt/src/Client.php(252): Workerman\Connection\AsyncTcpConnection->connect()
#1 /www/app/service/Mqtt.php(19): Workerman\Mqtt\Client->connect()
#2 /www/support/bootstrap.php(93): app\service\Mqtt::start(NULL)
#3 /www/webman(12): require_once('/www/support/bo...')
#4 {main}
  thrown in /www/vendor/workerman/workerman/Connection/AsyncTcpConnection.php on line 215
/www # php -v
PHP 8.0.13 (cli) (built: Nov 30 2021 07:57:55) ( NTS )
Copyright (c) The PHP Group
Zend Engine v4.0.13, Copyright (c) Zend Technologies
/www # 

有人遇到过吗?

sorshion

180
积分
0
获赞数
0
粉丝数
2022-05-12 加入
×
🔝