php 实现kafka
PHP 实现 Kafka 生产者与消费者
安装依赖
确保系统已安装 Kafka 和 Zookeeper。通过 Composer 安装 PHP 的 Kafka 客户端库 php-rdkafka:

composer require arnaud-lb/php-rdkafka
生产者代码示例
以下代码演示如何发送消息到 Kafka 主题:
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test_topic");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
?>
消费者代码示例
以下代码展示如何从 Kafka 主题消费消息:
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test_topic']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . "\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
?>
配置参数说明
bootstrap.servers: Kafka 服务器地址group.id: 消费者组标识RD_KAFKA_PARTITION_UA: 自动分配分区poll(0): 非阻塞式发送flush(): 确保所有消息发送完成
错误处理建议
- 生产者需处理
flush()失败情况 - 消费者需处理超时和分区结束情况
- 捕获异常避免进程意外终止
性能优化方向
- 调整
queue.buffering.max.messages控制生产者缓冲区大小 - 设置
enable.auto.commit为 false 可手动提交偏移量 - 使用
auto.offset.reset配置初始偏移策略







