php 实现kafka
安装 Kafka 扩展
在 PHP 中使用 Kafka 需要安装 rdkafka 扩展。可以通过 PECL 安装:
pecl install rdkafka
安装完成后,在 php.ini 中添加 extension=rdkafka.so 并重启 PHP 服务。
生产者示例代码
以下是一个 Kafka 生产者的示例代码,用于发送消息到 Kafka 主题:

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test_topic");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
?>
消费者示例代码
以下是一个 Kafka 消费者的示例代码,用于从 Kafka 主题接收消息:
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test_topic']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . "\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
echo "Error: " . $message->errstr . "\n";
break;
}
}
?>
配置参数说明
Kafka 客户端支持多种配置参数,以下是一些常用参数:

bootstrap.servers: Kafka 服务器地址,格式为host:port。group.id: 消费者组 ID,用于标识消费者组。auto.offset.reset: 偏移量重置策略,可选earliest或latest。
错误处理
在生产环境中,需要处理 Kafka 可能出现的错误。例如,生产者发送失败或消费者无法连接时,可以通过日志记录或重试机制处理异常。
性能优化
对于高吞吐量场景,可以调整以下参数以提高性能:
queue.buffering.max.messages: 生产者缓冲区大小。batch.num.messages: 生产者批量发送的消息数量。fetch.message.max.bytes: 消费者单次拉取的最大字节数。
以上代码和配置可以帮助在 PHP 中实现 Kafka 的基本功能。根据实际需求调整参数和逻辑即可。






