写了一段程序。在循环在redis读取数据,并根据数据发送主题。mqtt服务器 用的是 Mosquitto
遇到了两个问题。
1 当qos为2时候发不出去,0或1的时候就可以。
2 程序运行一段时间 就会提示 Mqtt client: No connection to broker
代码在下面,请看看是哪里用的不对。
<?php
class LabelServer
{
function __construct(&$task)
{
$this->worker = $task;
$redis = new Redis();
$this->redis = $redis;
$redis->connect('127.0.0.1', 6379);
$mqtt = new \Workerman\Mqtt\Client('mqtt://127.0.0.1:1883', );
$mqtt->connect();
$db = \Tools::openDatabase(\Config::$CONFIG, \Config::$CONFIG, \Config::$CONFIG, \Config::$CONFIG);
$this->db = $db;
$mqtt->onConnect = function ($mqtt) use ($redis, $task, $db) {
while (true) {
while ($redis->LLEN("mqttqueue") > 0) {
$value = $redis->RPOP("mqttqueue");
if ($value) {
Tools::Console("process $value workerid {$task->id}");
$avalue = json_decode($value, true);
$method = $avalue;
switch ($method) {
case "device.register":
$this->doDeviceRegister($mqtt, $avalue, $avalue);
break;
default:
Tools::Console("doOther $method");
}
}
}
usleep(100000);
}
};
}
private function doDeviceRegister($mqtt, $topic, $payload)
{
Tools::Console("doRegister workerid {$this->worker->id}");
$insertData = [];
$insertData = $topic;
$insertData = json_encode($payload);;
$this->db->insert("u")->cols($insertData)->query();
$deviceid = $payload;
$mqtt->publish("/mtt/shoptalk/$deviceid/device/register_reply", "{ \"id\": \"\", \"version\": \"1.0\", \"params\": { \"code\":200, \"message\":\"注册成功\" } , \"time\":1524448721000 , \"method\": \"device.register_reply\" }", );
}
}
workerman里好像不能用无限死循环吧,while(true)好像会有问题的
$redis->LLEN("mqttqueue") > 0这个为false的时候,mqtt的连接进入死循环。qos=2时,是需要“消息接收方”进行反馈的。但mqtt的连接在不断的做死循环,是无法得到反馈的。
知道了,我用了一个timer来读取redis 就可以发送了