我想把rabbitmq 数据 转发到worker上面 一直不行 一直在 consume() 方法执行不往下执行
require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
global $location_con;
$worker = new Worker();
$worker->count = 1;
$worker->onWorkerStart = function () {
dg_init('system', 'rabbitmq_data_sysnc', array());
global $location_con;
global $sting;
$string = '';
//建立连接
$location_con = new AsyncTcpConnection('ws://127.0.0.1:9395');
$location_con->onConnect = function ($connection) {
var_dump('onConnect ok');
//heartbeat($connection, 'rabbitmq_data', 0);
};
$location_con->connect();
$location_con->send('123123123123');
$conn_args = array(
'host' => '192.168.5.133',
'port' => '5672',
'login' => 'admin',
'password' => 'admin',
'vhost' => '/'
);
$e_name = 'S2C'; //交换机名
$q_name = 'S2C'; //队列名
$k_route = 'S2C'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
$ex->setFlags(AMQP_PASSIVE); //持久化
$ex->declareExchange();
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
$q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键
********下面这段代码不行
$q->consume(function($envelope, $queue) use($location_con){
echo "send data to 9037";
$location_con->send($envelope->getBody());
echo $string = $envelope->getBody();
});//自动ACK应答
*************上面代码不知道怎么写
$conn->disconnect();
//连接断开
$location_con->onClose = function ($connection) {
connect_close($connection, 'start_data_sync-walkthink');
};
};
Worker::runAll();
2个回答
年代过于久远,无法发表回答
consume里是一个死循环,一直循环消费队列的数据。因为代码一直运行在这个循环里,workerman永远无法得到控制权,就无法把数据发送出去。
你可以用stream_socket_client 替代 AsyncTcpConnection 。看到对端使用的websocket协议,stream_socket_client 不好直接连websocket端口,对端最好再开一个text端口,stream_socket_client以text协议发送数据。
在worker 里面 stream_socket_client 怎么使用呢 有实例 吗?或者 代码?多谢
接收方开text协议,类似
// 建立socket连接到内部推送端口
$client = stream_socket_client('tcp://127.0.0.1:9395', $errno, $errmsg, 1);
$conn_args = array(
'host' => '192.168.5.133',
'port' => '5672',
'login' => 'admin',
'password' => 'admin',
'vhost' => '/',
'debug' =>true
);
$e_name = 'S2C'; //交换机名
$q_name = 'S2C'; //队列名
$k_route = 'S2C'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
//die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
$ex->setFlags(AMQP_PASSIVE); //持久化
$ex->declareExchange();
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
$q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键
while(True){
$q->consume(function($envelope, $queue) use($client){
echo "send data to 9037";
fwrite($client, $envelope->getBody());
},AMQP_AUTOACK);//自动ACK应答
}
//$conn->disconnect();