当前位置:首页 > PHP

kafka php实现

2026-02-14 18:32:41PHP

Kafka PHP 实现指南

安装依赖

使用 composer 安装 php-rdkafka 扩展和依赖库:

pecl install rdkafka
composer require arnm/php-rdkafka

生产者示例

创建生产者发送消息到指定主题:

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'kafka-server:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test-topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

消费者示例

创建消费者订阅主题并处理消息:

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'my-consumer-group');
$conf->set('bootstrap.servers', 'kafka-server:9092');

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test-topic']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
    }
}

配置参数

常用生产者配置:

  • queue.buffering.max.messages: 队列缓冲消息数
  • message.send.max.retries: 发送重试次数
  • retry.backoff.ms: 重试间隔时间

常用消费者配置:

  • auto.offset.reset: earliestlatest
  • enable.auto.commit: 是否自动提交偏移量
  • session.timeout.ms: 会话超时时间

错误处理

实现自定义错误回调:

$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

性能优化

  • 批量发送消息使用 poll()flush()
  • 调整 batch.num.messages 控制批量大小
  • 启用压缩设置 compression.codec

高级用法

手动提交偏移量:

$consumer->commit($message);

创建管理员客户端管理主题:

$admin = new RdKafka\Admin($conf);
$topicConfig = new RdKafka\TopicConf();
$admin->createTopics(new RdKafka\NewTopic("new-topic", 3, 1));

kafka php实现

标签: kafkaphp
分享给朋友:

相关文章

git php实现

git php实现

Git 与 PHP 的集成实现 使用 PHP 执行 Git 命令 通过 PHP 的 exec()、shell_exec() 或 passthru() 函数可以直接调用 Git 命令,适用于服务器端操作…

php实现增删改查

php实现增删改查

以下是在PHP中实现增删改查(CRUD)操作的详细方法,以MySQL数据库为例: 数据库连接配置 创建config.php文件存储数据库连接信息: <?php $host = 'localh…

php实现栈

php实现栈

栈的基本概念 栈(Stack)是一种遵循后进先出(LIFO)原则的数据结构。核心操作包括: push:元素入栈。 pop:元素出栈(返回并移除栈顶元素)。 peek:查看栈顶元素…

php实现表单

php实现表单

PHP 表单基础实现 创建一个简单的 PHP 表单需要 HTML 表单结构和 PHP 处理脚本。以下是一个完整的示例: HTML 表单部分 (form.html) <form actio…

php 实现购物车

php 实现购物车

PHP 实现购物车功能 购物车功能是电子商务网站的核心部分,可以通过 PHP 和 Session 或数据库来实现。以下是两种常见的实现方式: 使用 Session 存储购物车数据 Sessi…

php实现接口

php实现接口

PHP 实现接口的方法 在 PHP 中,接口(Interface)用于定义一组方法的规范,而不实现具体功能。类可以实现一个或多个接口,并必须实现接口中定义的所有方法。 定义接口 接口使用 inte…