kafka php实现
Kafka PHP 实现指南
安装依赖
使用 composer 安装 php-rdkafka 扩展和依赖库:
pecl install rdkafka
composer require arnm/php-rdkafka
生产者示例
创建生产者发送消息到指定主题:
<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'kafka-server:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test-topic");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
消费者示例
创建消费者订阅主题并处理消息:
<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'my-consumer-group');
$conf->set('bootstrap.servers', 'kafka-server:9092');
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test-topic']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . "\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
配置参数
常用生产者配置:
queue.buffering.max.messages: 队列缓冲消息数message.send.max.retries: 发送重试次数retry.backoff.ms: 重试间隔时间
常用消费者配置:
auto.offset.reset:earliest或latestenable.auto.commit: 是否自动提交偏移量session.timeout.ms: 会话超时时间
错误处理
实现自定义错误回调:
$conf->setErrorCb(function ($kafka, $err, $reason) {
printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
性能优化
- 批量发送消息使用
poll()和flush() - 调整
batch.num.messages控制批量大小 - 启用压缩设置
compression.codec
高级用法
手动提交偏移量:
$consumer->commit($message);
创建管理员客户端管理主题:
$admin = new RdKafka\Admin($conf);
$topicConfig = new RdKafka\TopicConf();
$admin->createTopics(new RdKafka\NewTopic("new-topic", 3, 1));






