当前位置:首页 > PHP

php 实现kafka

2026-01-29 12:11:38PHP

PHP 实现 Kafka 生产者与消费者

安装依赖

确保系统已安装 Kafka 和 Zookeeper。通过 Composer 安装 PHP 的 Kafka 客户端库 php-rdkafka

php 实现kafka

composer require arnaud-lb/php-rdkafka

生产者代码示例

以下代码演示如何发送消息到 Kafka 主题:

php 实现kafka

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');

$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test_topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
?>

消费者代码示例

以下代码展示如何从 Kafka 主题消费消息:

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('bootstrap.servers', 'localhost:9092');

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test_topic']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
    }
}
?>

配置参数说明

  • bootstrap.servers: Kafka 服务器地址
  • group.id: 消费者组标识
  • RD_KAFKA_PARTITION_UA: 自动分配分区
  • poll(0): 非阻塞式发送
  • flush(): 确保所有消息发送完成

错误处理建议

  • 生产者需处理 flush() 失败情况
  • 消费者需处理超时和分区结束情况
  • 捕获异常避免进程意外终止

性能优化方向

  • 调整 queue.buffering.max.messages 控制生产者缓冲区大小
  • 设置 enable.auto.commit 为 false 可手动提交偏移量
  • 使用 auto.offset.reset 配置初始偏移策略

标签: phpkafka
分享给朋友:

相关文章

php实现静态

php实现静态

静态化实现方法 PHP实现静态化主要通过生成HTML文件或缓存机制来减少服务器动态解析的开销。以下是几种常见方法: 生成静态HTML文件 通过file_put_contents()将动态内容保存为H…

php无限分类的实现

php无限分类的实现

PHP无限分类的实现 无限分类是一种常见的树形结构数据组织方式,适用于商品分类、菜单管理、地区选择等场景。以下是几种实现方法: 递归实现法 递归是最直观的实现方式,通过函数自身调用来遍历子分类。数据…

php 实现秒杀

php 实现秒杀

PHP 实现秒杀的关键技术 秒杀系统需要解决高并发、超卖、性能瓶颈等问题。以下是实现秒杀的核心方案: 使用 Redis 预减库存 将商品库存提前加载到 Redis 中,通过原子操作保证库存准确性:…

php 函数实现

php 函数实现

PHP 函数实现基础 PHP 函数的定义使用 function 关键字,后跟函数名和参数列表。函数名不区分大小写,但建议保持一致。 function functionName($param1, $p…

php 实现队列

php 实现队列

PHP 实现队列的方法 PHP 可以通过多种方式实现队列功能,以下是几种常见的实现方法: 使用数组实现队列 PHP 数组可以通过 array_push 和 array_shift 函数模拟队列的先进…

php 实现单点登录

php 实现单点登录

PHP 实现单点登录的方法 单点登录(SSO)允许用户通过一次登录访问多个相互信任的系统。以下是基于 PHP 的实现方案: 基于共享 Session 的方案 在同一个主域名下的子域名间可以通过共享…