php 实现kafka
安装依赖
在PHP中使用Kafka需要安装php-rdkafka扩展。可以通过PECL安装:
pecl install rdkafka
安装完成后,在php.ini中添加:
extension=rdkafka.so
生产者示例
以下是一个简单的Kafka生产者代码示例,用于发送消息到Kafka主题:
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'kafka-server:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test-topic");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
?>
消费者示例
以下是一个简单的Kafka消费者代码示例,用于从Kafka主题接收消息:
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'test-group');
$conf->set('bootstrap.servers', 'kafka-server:9092');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test-topic']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . "\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
echo "Error: " . $message->errstr . "\n";
break;
}
}
?>
配置参数
Kafka客户端支持多种配置参数,以下是一些常用参数:
bootstrap.servers: Kafka服务器地址,格式为host:portgroup.id: 消费者组IDauto.offset.reset: 偏移量重置策略,可选earliest或latestenable.auto.commit: 是否自动提交偏移量
错误处理
在生产环境中,需要妥善处理Kafka可能产生的错误。可以通过设置错误回调来捕获错误:
$conf->setErrorCb(function ($kafka, $err, $reason) {
echo "Kafka error: " . rd_kafka_err2str($err) . " (reason: $reason)\n";
});
性能优化
对于高性能场景,可以调整以下参数:
queue.buffering.max.messages: 生产者缓冲区大小batch.num.messages: 每批消息数量linger.ms: 生产者等待时间
注意事项
- 确保Kafka服务器地址正确且可访问
- 消费者需要正确处理偏移量,避免消息重复或丢失
- 在生产环境中建议使用SSL/SASL等安全机制







