我想把rabbitmq 数据 转发到worker上面 一直不行 一直在 consume() 方法执行不往下执行

zhou2021
require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
global $location_con;

$worker = new Worker();

$worker->count = 1;

$worker->onWorkerStart = function () {

    dg_init('system', 'rabbitmq_data_sysnc', array());
    global $location_con;
    global $sting;
    $string = '';
    //建立连接
    $location_con = new AsyncTcpConnection('ws://127.0.0.1:9395');
    $location_con->onConnect = function ($connection) {
        var_dump('onConnect ok');
        //heartbeat($connection, 'rabbitmq_data', 0);
    };
    $location_con->connect();
    $location_con->send('123123123123');
    $conn_args = array(
        'host' => '192.168.5.133',
        'port' => '5672',
        'login' => 'admin',
        'password' => 'admin',
        'vhost' => '/'
    );
    $e_name = 'S2C'; //交换机名
    $q_name = 'S2C'; //队列名
    $k_route = 'S2C'; //路由key
    //创建连接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);

    //创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
    $ex->setFlags(AMQP_PASSIVE); //持久化
    $ex->declareExchange();

    //创建队列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); //持久化
    $q->declareQueue();
    $q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键

********下面这段代码不行

    $q->consume(function($envelope, $queue) use($location_con){
        echo "send data to 9037";
        $location_con->send($envelope->getBody());
        echo $string = $envelope->getBody();
    });//自动ACK应答
*************上面代码不知道怎么写
    $conn->disconnect();
    //连接断开
    $location_con->onClose = function ($connection) {
        connect_close($connection, 'start_data_sync-walkthink');
    };
};

Worker::runAll();
2378 2 0
2个回答

walkor 打赏

consume里是一个死循环,一直循环消费队列的数据。因为代码一直运行在这个循环里,workerman永远无法得到控制权,就无法把数据发送出去。

你可以用stream_socket_client 替代 AsyncTcpConnection 。看到对端使用的websocket协议,stream_socket_client 不好直接连websocket端口,对端最好再开一个text端口,stream_socket_client以text协议发送数据。

  • zhou2021 2021-03-29

    在worker 里面 stream_socket_client 怎么使用呢 有实例 吗?或者 代码?多谢

walkor 打赏

接收方开text协议,类似

$worker = new Worker('text://127.0.0.1:9395');
$worker->onMessage = function($con, $data){
    $data = json_decode($data);
    var_dump($data);
};
$q->consume(function($envelope, $queue) use($location_con){
  $client = stream_socket_client('tcp://127.0.0.1:9395');
  fwrite($client, json_encode($envelope->getBody())."\n");
  echo $string = $envelope->getBody();
});
  • zhou2021 2021-03-29

    // 建立socket连接到内部推送端口
    $client = stream_socket_client('tcp://127.0.0.1:9395', $errno, $errmsg, 1);

    $conn_args = array(
    'host' => '192.168.5.133',
    'port' => '5672',
    'login' => 'admin',
    'password' => 'admin',
    'vhost' => '/',
    'debug' =>true
    );
    $e_name = 'S2C'; //交换机名
    $q_name = 'S2C'; //队列名
    $k_route = 'S2C'; //路由key
    //创建连接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
    //die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);

    //创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
    $ex->setFlags(AMQP_PASSIVE); //持久化
    $ex->declareExchange();

    //创建队列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); //持久化
    $q->declareQueue();
    $q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键

    while(True){
    $q->consume(function($envelope, $queue) use($client){
    echo "send data to 9037";
    fwrite($client, $envelope->getBody());
    },AMQP_AUTOACK);//自动ACK应答
    }
    //$conn->disconnect();

年代过于久远,无法发表回答
×
🔝