当前位置:首页 > PHP

kafka php实现

2026-02-14 18:32:41PHP

Kafka PHP 实现指南

安装依赖

使用 composer 安装 php-rdkafka 扩展和依赖库:

pecl install rdkafka
composer require arnm/php-rdkafka

生产者示例

创建生产者发送消息到指定主题:

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'kafka-server:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test-topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

消费者示例

创建消费者订阅主题并处理消息:

kafka php实现

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'my-consumer-group');
$conf->set('bootstrap.servers', 'kafka-server:9092');

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test-topic']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
    }
}

配置参数

常用生产者配置:

  • queue.buffering.max.messages: 队列缓冲消息数
  • message.send.max.retries: 发送重试次数
  • retry.backoff.ms: 重试间隔时间

常用消费者配置:

kafka php实现

  • auto.offset.reset: earliestlatest
  • enable.auto.commit: 是否自动提交偏移量
  • session.timeout.ms: 会话超时时间

错误处理

实现自定义错误回调:

$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

性能优化

  • 批量发送消息使用 poll()flush()
  • 调整 batch.num.messages 控制批量大小
  • 启用压缩设置 compression.codec

高级用法

手动提交偏移量:

$consumer->commit($message);

创建管理员客户端管理主题:

$admin = new RdKafka\Admin($conf);
$topicConfig = new RdKafka\TopicConf();
$admin->createTopics(new RdKafka\NewTopic("new-topic", 3, 1));

标签: kafkaphp
分享给朋友:

相关文章

php实现递归

php实现递归

递归的基本概念 递归是一种函数调用自身的技术,适用于解决可以分解为相同子问题的问题。在PHP中,递归需要明确的终止条件(基线条件),否则会导致无限循环。 递归实现阶乘 阶乘是经典的递归案例,n! =…

php验证码的实现

php验证码的实现

PHP 验证码实现方法 使用 GD 库生成基础验证码 创建画布并生成随机字符串作为验证码内容: session_start(); $width = 120; $height = 40; $image…

php搜索功能实现

php搜索功能实现

实现基本的PHP搜索功能 使用MySQL数据库和PHP实现简单的关键词搜索功能。需要确保数据库中有可供搜索的表和数据。 // 连接数据库 $conn = new mysqli('localhost'…

php 伪静态实现

php 伪静态实现

PHP 伪静态实现方法 伪静态(URL Rewrite)是一种通过服务器配置将动态URL转换为静态URL的技术,提升URL的可读性和SEO效果。以下是几种常见的实现方式: 使用Apache的mod_…

php 实现排名

php 实现排名

PHP 实现排名的方法 在 PHP 中实现排名功能通常涉及对数据进行排序、计算排名以及输出结果。以下是几种常见的实现方式: 基本数组排序排名 $scores = [85, 92, 78, 95, 8…

php无法实现的

php无法实现的

PHP 的局限性 PHP 是一种广泛使用的服务器端脚本语言,主要用于 Web 开发。尽管功能强大,但在某些场景下存在局限性。 操作系统级开发 PHP 不适合编写操作系统内核或底层驱动程序。这类任务通…