期待大佬能基于rdkafka封装一个webman能用扩展?

zeus

最近要和大数据部门合作,处理下他们的kafka-topic的数据,打算用webman的自定义进程实现,找了几个包,

nmred/kafka-php // 这个包里面有amphp ,在start的时候会多启动一个进程去消费,担心在webman的自定义进程中没法很好的管理, 所以打算放弃。
longlang/phpkafka // 这个包可在fpm和swoole下使用,但是还没出正式版,放弃了。

目前是这两个包 下载量比较大了。

最后还是基于rdkafka的文档中例子编写了三个类,例子文档:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html ; 但是感觉不是很完美, 希望亮哥大大考虑下封装一个扩展,在webman中使用。万分感谢。

2440 4 0
4个回答

walkor 打赏

哪里不完美可以发出来大家讨论哈。我目前在做workerman v5 和 webman v1.2相关的开发,我一个人精力有限,对kafka并不熟悉,目前而言实在没有精力从头研究kafka。

  • zeus 2022-01-01

    嗯嗯,理解,我先用最简单的方式实现了,过几天发上来大家帮忙看下哈。感谢。

  • tanhongbin 2022-01-06

    老大,希望在文档加个模块写入每次更新内容,以及升级命令就更好了,要不然每次都得去github上去找

Tinywan

什么叫担心在webman的自定义进程中没法很好的管理, 所以打算放弃。 请问你有尝试去管理吗?真尝试了,遇到的问题又是什么?所有自己业务的扩展第一步的自己先尝试,有问题大家一起解决。别动不动就让作者你给写个扩展

橘叔

先自己尝试一下,阅读一下包里面是否有静态变量,全局变量,等操作,然后进行压测看一下是否存在内存泄露. 有修改提交pr 不就行了...

zeus
<?php

namespace App\Library\Kafka;

use RdKafka\Conf;
use RdKafka\Message;

/**
 * 高级消费者
 * Class HighLevelConsumer
 * @package App\Library\Kafka
 */
class HighLevelConsumer
{
    /**
     * @var \RdKafka\KafkaConsumer
     */
    private $consumer;

    /**
     * 消费开始的offset
     */
    const BEGIN = 'earliest';

    /**
     * 分组名
     */
    private $group_id;

    /**
     * 当前消费到的位置
     */
    private $offset;

    /**
     * 是否自动提交offset
     * @var int
     */
    private $auto_commit;

    /**
     * 自动提交offset时间毫秒
     * @var int
     */
    private $auto_commit_interval_ms;

    /**
     * Kafka服务器地址
     */
    private $metadata_broker_list;

    /**
     * 主题数组
     */
    private $topics;

    /**
     * 构造函数
     * @param string $metadata_broker_list 服务器地址
     * @param string $group_id 分组名
     * @param array $topics 主题数组
     * @param string|null $offset 消费偏移量
     * @throws \RdKafka\Exception
     */
    public function __construct(string $metadata_broker_list, string $group_id, array $topics, int $auto_commit = 0,
                                int    $auto_commit_interval_ms = 5000, string $offset = null)
    {
        $this->metadata_broker_list = $metadata_broker_list;
        $this->group_id = $group_id;
        $this->topics = $topics;
        $this->auto_commit = $auto_commit;
        $this->auto_commit_interval_ms = $auto_commit_interval_ms;
        $this->offset = $offset ?? self::BEGIN;

        //初始化高级消费者
        $this->init();
    }

    /**
     * 初始化
     * @throws \RdKafka\Exception
     */
    private function init()
    {

        $conf = new Conf();

// Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            /**
             * @var \RdKafka\TopicPartition[] $partitions
             */
            $partitionStr = json_encode(array_map(function (\RdKafka\TopicPartition $partition) {
                return [
                    'topic' => $partition->getTopic(),
                    'partition' => $partition->getPartition(),
                    'offset' => $partition->getOffset(),
                ];
            }, $partitions));

            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    info(__METHOD__ . ' rebalance assign callback:' . $partitionStr);
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    info(__METHOD__ . ' rebalance revoke callback:' . $partitionStr);
                    $kafka->assign(NULL);
                    break;

                default:
                    error(__METHOD__ . ' rebalance unknown callback:' . $err);
                    throw new \Exception($err);
            }
        });

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
        $conf->set('group.id', $this->group_id);

// Initial list of Kafka brokers
        $conf->set('metadata.broker.list', $this->metadata_broker_list);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
        $conf->set('auto.offset.reset', $this->offset);

        //自动提交offset开关 和 自动提交时间间隔
        $conf->set('enable.auto.commit', $this->auto_commit);
        $conf->set('auto.commit.interval.ms', $this->auto_commit_interval_ms);

        $this->consumer = new \RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
        $this->consumer->subscribe($this->topics);
    }

    /**
     * 真实消费
     * @return Message|null
     * @throws \RdKafka\Exception
     */
    public function consume(): ?Message
    {
        $message = $this->consumer->consume(120 * 1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                info(__METHOD__ . ' messages : ' . json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE));
                return $message;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                info(__METHOD__ . ' No more messages : ' . $message->errstr());
                return null;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                info(__METHOD__ . ' Timeout :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
                return null;
            default:
                error(__METHOD__ . ' Exception :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
                throw new \Exception(__METHOD__ . ' ' . $message->errstr(), $message->err);
        }
    }

    /**
     * 同步提交offset
     * @throws \RdKafka\Exception
     */
    public function commit(): void
    {
        $this->consumer->commit();
    }

    /**
     * 异步提交offset
     * @throws \RdKafka\Exception
     */
    public function commitAsync(): void
    {
        $this->consumer->commitAsync();
    }
}

此类基于rdKafka扩展实现,大佬看下有啥问题没?

年代过于久远,无法发表回答
×
🔝