由于 ikilobyte/pulsar-client-php 不支持 pulsar+ssl 协议 所以用 websocket实现下
<?php
namespace process;
use Workerman\Connection\AsyncTcpConnection;
class PulsarTuya
{
public function onWorkerStart()
{
global $consumer;
$access_id = '4qvtkvtudjnng******';
$access_key = '3ae1c617a23b**************';
$params = '?ackTimeoutMillis=3000&subscriptionType=Failover';
$env = 'event-test';
$domain = 'ws://mqe.tuyacn.com:8285/' . 'ws/v2/consumer/persistent/' . $access_id . '/out/' . $env . '/' . $access_id . '-sub' . $params;
$option = [
'ssl' => array(
// 本地证书路径。 必须是 PEM 格式,并且包含本地的证书及私钥。
'local_cert' => '/your/path/to/pemfile',
// local_cert 文件的密码。
'passphrase' => 'your_pem_passphrase',
// 是否允许自签名证书。
'allow_self_signed' => true,
// 是否需要验证 SSL 证书。
'verify_peer' => false
)
];
$consumer = new AsyncTcpConnection($domain,$option);
// 设置以ssl加密方式访问
$consumer->transport = 'ssl';
$consumer->headers = [
'username' => $access_id,
'password' => self::genPwd($access_id,$access_key),
"Connection" => "Upgrade",
];
$consumer->onConnect = function(AsyncTcpConnection $con) {
};
$consumer->onMessage = function(AsyncTcpConnection $con, $data)use($access_key) {
echo self::decrypt($data,$access_key) . PHP_EOL;
};
$a = $consumer->connect();
//print_r($a);
print_r($consumer);
}
public function onMessage($connection, $data)
{
global $consumer;
}
public function onClose($connection)
{
global $consumer;
}
public static function genPwd($id,$key) {
return substr(md5($id . md5($key)),8,16);
}
public static function decrypt($data,$key) {
$data = json_decode($data,true);
$b = $data['payload'];
$c = base64_decode($b);
$d = json_decode($c,true);
$decrypted = openssl_decrypt($d['data'], 'AES-128-ECB', substr($key,8,16));
return $decrypted;
}
}