当前位置:首页 > PHP

php 实现kafka

2026-01-29 12:11:38PHP

PHP 实现 Kafka 生产者与消费者

安装依赖

确保系统已安装 Kafka 和 Zookeeper。通过 Composer 安装 PHP 的 Kafka 客户端库 php-rdkafka

php 实现kafka

composer require arnaud-lb/php-rdkafka

生产者代码示例

以下代码演示如何发送消息到 Kafka 主题:

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');

$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test_topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
?>

消费者代码示例

以下代码展示如何从 Kafka 主题消费消息:

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('bootstrap.servers', 'localhost:9092');

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test_topic']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
    }
}
?>

配置参数说明

  • bootstrap.servers: Kafka 服务器地址
  • group.id: 消费者组标识
  • RD_KAFKA_PARTITION_UA: 自动分配分区
  • poll(0): 非阻塞式发送
  • flush(): 确保所有消息发送完成

错误处理建议

  • 生产者需处理 flush() 失败情况
  • 消费者需处理超时和分区结束情况
  • 捕获异常避免进程意外终止

性能优化方向

  • 调整 queue.buffering.max.messages 控制生产者缓冲区大小
  • 设置 enable.auto.commit 为 false 可手动提交偏移量
  • 使用 auto.offset.reset 配置初始偏移策略

php 实现kafka

标签: phpkafka
分享给朋友:

相关文章

php实现购物车

php实现购物车

PHP实现购物车功能 购物车功能是电商网站的核心模块之一,通常涉及商品添加、删除、数量修改和结算等操作。以下是使用PHP实现购物车功能的几种方法。 使用Session存储购物车数据 Session是…

php实现搜索功能

php实现搜索功能

实现基本的搜索功能 在PHP中实现搜索功能通常涉及数据库查询。以下是一个简单的实现方式,假设使用MySQL数据库: <?php // 连接数据库 $conn = new mysqli('loc…

php实现递归

php实现递归

递归的基本概念 递归是一种函数调用自身的技术,适用于解决可以分解为相同子问题的问题。在PHP中,递归需要明确的终止条件(基线条件),否则会导致无限循环。 递归实现阶乘 阶乘是经典的递归案例,n!…

php如何实现直播

php如何实现直播

实现直播功能的方法 PHP可以通过结合其他技术和工具来实现直播功能。以下是几种常见的方法: 使用流媒体服务器 配置流媒体服务器如Nginx-RTMP、Red5或Wowza。这些服务器支持RTMP协…

php实现上传

php实现上传

PHP 文件上传实现 HTML 表单设置 创建一个包含文件上传字段的 HTML 表单,确保表单的 enctype 属性设置为 multipart/form-data,这是文件上传必需的设置。 <…

php实现推送

php实现推送

PHP 实现推送功能的方法 PHP 实现推送功能可以通过多种方式完成,包括 WebSocket、Server-Sent Events (SSE)、长轮询或第三方推送服务。以下是几种常见的方法: 使用…