当前位置:首页 > PHP

php 实现kafka

2026-01-29 12:11:38PHP

PHP 实现 Kafka 生产者与消费者

安装依赖

确保系统已安装 Kafka 和 Zookeeper。通过 Composer 安装 PHP 的 Kafka 客户端库 php-rdkafka

php 实现kafka

composer require arnaud-lb/php-rdkafka

生产者代码示例

以下代码演示如何发送消息到 Kafka 主题:

php 实现kafka

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');

$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test_topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
?>

消费者代码示例

以下代码展示如何从 Kafka 主题消费消息:

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('bootstrap.servers', 'localhost:9092');

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test_topic']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
    }
}
?>

配置参数说明

  • bootstrap.servers: Kafka 服务器地址
  • group.id: 消费者组标识
  • RD_KAFKA_PARTITION_UA: 自动分配分区
  • poll(0): 非阻塞式发送
  • flush(): 确保所有消息发送完成

错误处理建议

  • 生产者需处理 flush() 失败情况
  • 消费者需处理超时和分区结束情况
  • 捕获异常避免进程意外终止

性能优化方向

  • 调整 queue.buffering.max.messages 控制生产者缓冲区大小
  • 设置 enable.auto.commit 为 false 可手动提交偏移量
  • 使用 auto.offset.reset 配置初始偏移策略

标签: phpkafka
分享给朋友:

相关文章

php实现打印功能

php实现打印功能

PHP实现打印功能的方法 在PHP中实现打印功能可以通过多种方式完成,包括直接输出到浏览器、生成PDF文件、调用打印机接口等。以下是几种常见的方法: 直接输出HTML内容 PHP可以通过echo或p…

php长连接实现

php长连接实现

PHP 长连接实现方法 长连接(Keep-Alive)允许客户端与服务器在一次TCP连接中发送和接收多个HTTP请求/响应,减少重复建立连接的开销。以下是PHP中实现长连接的几种方式: 配置HTTP…

php实现文件的下载

php实现文件的下载

PHP 实现文件下载的方法 在 PHP 中实现文件下载功能通常需要设置正确的 HTTP 头信息,并输出文件内容。以下是几种常见的实现方式: 使用 header() 函数强制下载 通过设置 Conte…

php搜索功能实现的

php搜索功能实现的

PHP 搜索功能实现方法 数据库搜索实现 使用 SQL 的 LIKE 语句进行模糊匹配,适用于小型网站和简单搜索需求。 $searchTerm = $_GET['search']; $query =…

php https实现

php https实现

PHP 中实现 HTTPS 的方法 在 PHP 中实现 HTTPS 主要通过配置服务器和编写代码来确保通信的安全性。以下是几种常见的方法: 配置服务器支持 HTTPS 确保服务器已安装 SSL/T…

php实现评论功能

php实现评论功能

数据库设计 创建评论表(示例结构): CREATE TABLE `comments` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id`…