之前我写过一个基于定时器+swoole的rabbitmq生产者和消费者(https://www.workerman.net/q/8688)
在使用时发现或多或少有一些问题(最大的问题是CPU抢占问题,导致workerman内的基于定时任务长时间得不到执行),毕竟官方的内容都是同步机制的,我在想有没有可能使用异步实现。
经过长时间的研究,终于解决了这个问题,个人认为比官方基于bunny+React的方式更好使用一些。
Lib_calss_rabbitmq.php
<?php
/*
* 20230316 增加 rabbitmq_publish_v3
* 20230320 使用event事件进行驱动
*
*/
//composer require php-amqplib/php-amqplib
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Swoole\Coroutine;
use Workerman\Events\EventInterface;
class rabbitmq_client {
public $connection = "";
public $channel = "";
public $is_connected = false ;
public $is_debug = false;
public $exchange_name = "" ;
public $queen_name = "";
public $comsume_callback = null;
public $socket = null;
public $config_option = array(
'host' => "127.0.0.1",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'exchange_name' => 'default_exchange_name',
'queen_name' => 'default_queen_name',
);
public function __construct($option = array()){
$this->config_option['host'] = $option['host'];
$this->config_option['port'] = $option['port'];
$this->config_option['user'] = $option['user'];
$this->config_option['password'] = $option['password'];
$this->exchange_name = $option['exchange_name'];
$this->queen_name = $option['queen_name'];
//初始化Rabbitmq连接
while( $this->is_connected == false ){
if($this->connect() == true){
break;
}
$this->app_log("rabbitmq server connect failed");
sleep(1);
}
//执行定时握手任务
Timer::add( 55 , function (){
// 发送心跳数据
$this->write_heartbeat();
});
}
function app_log($log){
//将日志信息发送给日志服务器
$ts = round(microtime(true) - time() , 6);
@list($ts1 , $ts2) = @explode("." , $ts);
$logData = "{$ts1}.{$ts2} {$log}";
//logToFile($logData);
if(function_exists("logToScreen") == true){
logToScreen($logData , true);
}else{
echo $logData."\n";
}
}
function setDebug($is_debug = true){
$this->is_debug = $is_debug;
}
public function connect(){
try{
$this->connection = new AMQPStreamConnection(
$this->config_option['host'] ,
$this->config_option['port'] ,
$this->config_option['user'] ,
$this->config_option['password'] ,
'/' ,
false ,
'AMQPLAIN' ,
null,
'en_US' ,
3.0 ,
3.0 ,
null ,
true ,
60
);
//ZGH debug
//$this->app_log(get_class($this->connection)); //PhpAmqpLib\Connection\AMQPStreamConnection
//$this->app_log(get_class($this->connection->getIo())); //PhpAmqpLib\Wire\IO\StreamIO
//$this->app_log(print_r($this->connection->getIo()->getSocket(),true));
$this->socket = $this->connection->getIo()->getSocket();
//$this->app_log(print_r(debug_backtrace(),true));
//$this->app_log(print_r(debug_print_backtrace(),true));
if( $this->connection ->isConnected() == true){
$this->channel = $this->connection->channel();
//声明交换机
$this->channel->exchange_declare( $this->exchange_name , 'topic', false, true, false);
// 声明队列
$this->channel->queue_declare( $this->queen_name , false, true, false, false);
// 绑定队列
$this->channel->queue_bind($this->queen_name , $this->exchange_name , $this->queen_name );
$this->is_connected = true;
if($this->is_debug == true){
$this->app_log("rabbitmq connected");
}
return true;
}
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
}
return false;
}
public function reconnect(){
if( $this->is_connected == false ){
if( $this->connect() == true ){
//重新连接到服务器
$this->is_connected = true;
return true;
}
}
return false;
}
/**
* @return void
*
* 向服务器发送
*/
function write_heartbeat(){
if($this->is_connected == true){
try{
//app_log("heartbeat");
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->connection->write($pkt->getvalue());
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}else{
// false
$this->reconnect();
}
}
/**
* @param $data
* @param $queen_name
* @param $is_persistent
* @param $is_debug
* @return void
*/
function publish( $data = "" , $is_persistent = true , $exchange_name_input = "" , $queen_name_input = ""){
$delivery_mod = AMQPMessage::DELIVERY_MODE_PERSISTENT;
if($is_persistent == false){
$delivery_mod = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
}
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $this->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->exchange_name;
}
$rabbit_msg = new AMQPMessage($data , ['content_type'=>'text/plain','delivery_mode'=>$delivery_mod]); //定义消息
try{
// 发送消息
$this->channel->basic_publish($rabbit_msg, $exchange_name, $queen_name);
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}
//在做消费时,对流量进行控制,防止出现丢数据
function set_comsume_qos( $prefetch_size = 0 , $prefetch_count = 1 ){
$this->channel->basic_qos( $prefetch_size , $prefetch_count ,false); //当有消息在处理时不要发过来
}
/*
function comsume_callback($msg){
//收到MQ消息
$message_body = $msg->body;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//echo "[x] Received ", $message_body, "\n";
//redis_add_statistic( $redis , "rabbitmq:qos_test_consumption" , 0.1);
}
*/
/*
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
* */
function comsume_swoole_go( $queen_name_input = ""){
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->queen_name;
}
if($this->is_connected == true) {
go(function () {
// 消费者订阅队列
try {
if( !$this->comsume_callback ){
$this->app_log("function comsume_callback must be set");
return false;
}
$this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
} catch (Exception $e) {
$this->app_log("error catched :" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
});
// 添加事件驱动,收到消息时触发
Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));
//需要做一次初始化
$this->channel_wait();
}else{
// false
$this->connect();
}
}
function comsume( $queen_name_input = ""){
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->queen_name;
}
if($this->is_connected == true) {
// 消费者订阅队列
try {
if( !$this->comsume_callback ){
$this->app_log("function comsume_callback must be set");
return false;
}
$this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
} catch (Exception $e) {
$this->app_log("error catched :" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
// 添加事件驱动,收到消息时触发
Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));
//需要做一次初始化
$this->channel_wait();
}else{
// false
$this->connect();
}
}
function channel_wait(){
// 开始消费
try {
/*
while ( count($this->channel->callbacks) ) {
$this->channel->wait();
usleep(1);
}
while ( $this->channel->is_consuming() ) {
usleep(10);
$this->channel->wait();
}
//$this->app_log(get_class($this->channel)); //PhpAmqpLib\Channel\AMQPChannel
while ( $this->channel->is_consuming() ) {
$this->channel->wait(null , true , 0.001);
usleep(10);
}
*/
if( $this->channel->is_consuming() ) {
$this->channel->wait(null , true , 0.001);
}
} catch (Exception $e) {
$this->app_log("error catched when consuming:" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}
}
function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $rabbitmq_client->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $rabbitmq_client->queen_name;
}
$rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
}
使用方法:
<?php
//初始化Rabbitmq连接
$rabbitmq_config_option = array();
$rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
$rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
$rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
$rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
$rabbitmq_config_option['exchange_name'] = "upstream_exchange";
$rabbitmq_config_option['queen_name'] = "upstream_queen";
$rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);
//生产者,可以指定队列或者交换机:
function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $rabbitmq_client->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $rabbitmq_client->queen_name;
}
$rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
}
//作为消费者使用
$rabbitmq_client->comsume_callback = function ($msg)use($db,$workerId){
//收到MQ消息
$message_body = $msg->body;
$data_arr = json_decode($message_body , true);
//只有格式合格才进行确认
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
echo "[{$workerId}] Received ", $message_body, "\n";
};
$rabbitmq_client->comsume_swoole_go(); //协程方式 需要安装swoole
//$rabbitmq_client->comsume(); //普通方式
特别说明,如果需要使用协程方式,需要安装swoole,并且在项目启动文件前面加上:
use Swoole\Coroutine;
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
感谢分享
感谢分享