<?php
/**
* Here is your custom functions.
*/
use InfluxDB2\Client;
use InfluxDB2\Model\WritePrecision;
use InfluxDB2\Point;
use InfluxDB2\WriteType as WriteType;
function influxdb() {
$token = 'xxxxxxxxx';
$org = 'test';
$bucket = 'dbname';
static $influxClient;
if (!$influxClient) {
$influxClient = new Client([
"url" => "http://127.0.0.1:8086",
"token" => $token,
"bucket" => $bucket,
"org" => $org,
"precision" => WritePrecision::S
]);
}
return $influxClient;
}
queue/redis/Dynamic.php
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
use InfluxDB2\Client;
use InfluxDB2\Model\WritePrecision;
use InfluxDB2\Point;
use InfluxDB2\WriteType as WriteType;
class Dynamic implements Consumer
{
// 要消费的队列名
public $queue = 'send-dynamic';
// 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
public $connection = 'default';
public $token = 'iv16aGoyk6NsVPRpZT4vdtMUTjNoax-ijJ6kazqXa9bwBbShpZEFx2T7mqU-Ep1IDiYmB-k0LcDDTFaqRrye2A==';
public $org = 'jtkj';
public $bucket = 'bridgedb';
// 消费
public function consume($dataJson)
{
$deviceId = $dataJson['deviceId'];
$dataCount = $dataJson['count'];
$dataTime = $dataJson['time'];
$dataData = $dataJson['data'];
try {
//echo $dataJson['deviceId'] . "\r\n";
$writeApi = influxdb()->createWriteApi(["writeType" => WriteType::BATCHING, 'batchSize' => $dataCount, 'maxRetries' => 1, 'maxRetryDelay' => 10000, 'maxRetryTime' => 10000]);
for ($i = 0; $i < $dataCount; $i++) {
$point = Point::measurement(strval($deviceId))
->addTag('channel', strval($dataData[$i]['channel']))
->addField('originalValue1', (double)$dataData[$i]['originalValue1'][0])
->addField('calculatedValue1', (double)$dataData[$i]['originalValue1'][0])
->time(strtotime($dataTime));
$writeApi->write($point);
}
$writeApi->close();
} catch (\Error $e) {
print "\n\n $e \n\n";
influxdb()->close();
}
influxdb()->close();
}
}
这样写还是不行,还是一样报这个警告
[WARNING] - The retryable error occurred during writing of data. Reason: '[0] Failed to connect to 192.168.100.60 port 8086: Address already in use'. Retry in: 9.685s.
app/functions.php 里加一个函数
调用的时候就 influxdb()->xxx()
app/functions.php
queue/redis/Dynamic.php
这样写还是不行,还是一样报这个警告
再用Timer加个心跳