当前位置:首页 > PHP

kafka php实现

2026-02-14 18:32:41PHP

Kafka PHP 实现指南

安装依赖

使用 composer 安装 php-rdkafka 扩展和依赖库:

pecl install rdkafka
composer require arnm/php-rdkafka

生产者示例

创建生产者发送消息到指定主题:

<?php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'kafka-server:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test-topic");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

消费者示例

创建消费者订阅主题并处理消息:

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'my-consumer-group');
$conf->set('bootstrap.servers', 'kafka-server:9092');

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['test-topic']);

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo $message->payload . "\n";
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
    }
}

配置参数

常用生产者配置:

  • queue.buffering.max.messages: 队列缓冲消息数
  • message.send.max.retries: 发送重试次数
  • retry.backoff.ms: 重试间隔时间

常用消费者配置:

  • auto.offset.reset: earliestlatest
  • enable.auto.commit: 是否自动提交偏移量
  • session.timeout.ms: 会话超时时间

错误处理

实现自定义错误回调:

$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

性能优化

  • 批量发送消息使用 poll()flush()
  • 调整 batch.num.messages 控制批量大小
  • 启用压缩设置 compression.codec

高级用法

手动提交偏移量:

$consumer->commit($message);

创建管理员客户端管理主题:

kafka php实现

$admin = new RdKafka\Admin($conf);
$topicConfig = new RdKafka\TopicConf();
$admin->createTopics(new RdKafka\NewTopic("new-topic", 3, 1));

标签: kafkaphp
分享给朋友:

相关文章

php实现文件的下载

php实现文件的下载

PHP 实现文件下载的方法 在 PHP 中实现文件下载功能通常需要设置正确的 HTTP 头信息,并输出文件内容。以下是几种常见的实现方式: 使用 header() 函数强制下载 通过设置 Conte…

php 实现mvc

php 实现mvc

PHP 实现 MVC 架构的方法 MVC(Model-View-Controller)是一种常见的软件设计模式,用于分离业务逻辑、用户界面和用户输入。以下是 PHP 实现 MVC 架构的关键步骤:…

php实现短信验证

php实现短信验证

PHP实现短信验证的步骤 选择合适的短信服务提供商 国内常见的短信服务提供商包括阿里云短信、腾讯云短信、云片等。注册账号并获取API密钥和签名模板是第一步。这些平台通常提供详细的开发文档和SDK。…

php 路由实现的

php 路由实现的

PHP 路由实现方法 基于原生 PHP 实现路由 创建一个简单的路由解析器,通过解析 URL 路径来调用对应的控制器或函数。 // 定义路由表 $routes = [ '/' => '…

php 购物车实现session

php 购物车实现session

PHP 购物车实现(基于 Session) 在 PHP 中,使用 Session 实现购物车功能是一种常见方法。Session 可以跨页面存储用户数据,适合临时保存购物车信息。 初始化 Sessio…

php实现重定向

php实现重定向

使用header函数进行重定向 在PHP中,header()函数是实现重定向的常用方法。通过发送HTTP头信息Location,浏览器会自动跳转到指定URL。需确保在调用header()前没有输出任何…