walkor大神,请教一个socket相关的问题
现在情况是这样的,我要写一个将MySQL数据同步到ES的服务,方案是将MySQL binlog日志解析成结构化的数据,然后写入ES,解析binlog的是一个php cli 单进程,死循环获取binlog数据,因为怕数据太多,消费能力跟不上,想着用workerman多个worker进程处理,但是多个worker进程存在的问题是有序性问题,同一张表的事件只能同时由一个worker进程处理,我目前的想法是缓存了几个缓存了几个socket链接,然后根据表名去走对应的socket链接发送消息,不确定这样子稳不稳定。代码如下面,写了一点点,大概意思能表现出来。
方案和代码参考了这个问题 https://wenda.workerman.net/question/508
有没有什么更好的方案呢?
消费解析后的binlog worker进程
use app\dbBase;
use Workerman\Worker;
require_once __DIR__ . '/../vendor/autoload.php';
// 创建一个Worker监听2347端口,不使用任何应用层协议
$worker = new Worker("text://0.0.0.0:2347");
// 启动4个进程对外提供服务
$worker->count = 6;
$worker->name = 'write_es';
Worker::$logFile = __DIR__ . '/' . $worker->name . '.log';
$worker->onWorkerStart = function($worker)
{
// 将db实例存储在全局变量中(也可以存储在某类的静态成员中)
dbBase::getInstance()->init();
};
// 当客户端发来数据时
$worker->onMessage = function($connection, $data)
{
echo $data.PHP_EOL;
//将mysql数据写入ES
// 向客户端发送hello $data
$connection->send('hello ' . $data."\n");
};
$worker->onConnect = function ($connection) {
$connection->send('hello\n');
};
// 运行worker
Worker::runAll();
解析binlog后推送到worker进程
class mysqlEventSubscribers extends EventSubscribers
{
const client_count = 4;
private static $clients;
/**
* mysql增删改查事件
* @param EventDTO $event
*/
public function allEvents(EventDTO $event): void {
// all events got __toString() implementation
echo $event;
// all events got JsonSerializable implementation
//echo json_encode($event, JSON_PRETTY_PRINT);
//将事件推送到worker进程中进行处理
$this->send($event);
echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
}
function send($event) {
if (!isset($this->clients)) {
// 建立socket连接到内部推送端口
for ($i = 0; $i<self::client_count;$i++) {
static::$clients[$i] = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);
}
}
//根据event事件中的数据库表名,找到对应的
$client = $this->getClientByEvent($event);
// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
fwrite($client, json_encode($event) . "\n");
// 读取推送结果
echo fread($client, 8192);
}
function getTableFromEvent($event) {
//逻辑没实现,大概就是不同的增删改查返回不同的表名
return 'table';
}
function getClientByEvent($event) {
$table = $this->getTableFromEvent($event);
$clientIndex = syncTable::$tables[$table] % self::client_count;
return static::$clients[$clientIndex];
}
}
我怎么感觉你这有点像要写定时任务的样子 就是当有消息过多的时候我没办法同步到es的时候就用缓存来解压一下 直至消息同步为此是不是?
主要是为了多进程消费,让同一个表到达同一个worker进程
我测试了下,上面的方案是行不通的,因为同一个client发送的多个消息,并不是同一个worker进程处理的,所以上面我的方案不行,应该还是要在发送之前加个数组缓存event消息,同一个表的多条数据,等一条处理完了再发送下一条
同一个连接的数据肯定是同一个worker处理的,所以感觉代码没啥问题。
我测试的结果发现同一个连接的数据是分发给不同的worker处理的,这个代码还是不行
不可能,worker进程间是隔离的,连接分配到某个进程后就不会再次分配了,这个连接的所有数据都会给这个进程处理。
@1393:你说的是对的,我之前测试的代码有问题,多谢了