PHP实现请求队列
PHP 请求队列的实现方法
使用数据库作为队列存储
创建数据库表存储队列任务,包含任务ID、状态、请求数据等字段。通过定时脚本或常驻进程处理队列中的任务。
// 示例表结构
CREATE TABLE request_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
data TEXT NOT NULL,
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL
);
使用 Redis 实现高效队列
利用 Redis 的列表数据结构实现高性能队列,适合高并发场景。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 入队
$redis->rPush('request_queue', json_encode($requestData));
// 出队
while($job = $redis->lPop('request_queue')) {
processRequest(json_decode($job, true));
}
使用消息队列系统
集成 RabbitMQ 或 Beanstalkd 等专业消息队列系统,提供更可靠的消息传递机制。
// RabbitMQ 示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('request_queue', false, true, false, false);
$callback = function ($msg) {
processRequest($msg->body);
$msg->ack();
};
$channel->basic_consume('request_queue', '', false, false, false, false, $callback);
实现队列处理器
编写守护进程或使用 Supervisor 管理的 PHP 脚本持续处理队列任务。
// 队列处理器示例
while (true) {
$job = getNextJobFromQueue();
if ($job) {
try {
$result = processJob($job);
markJobAsComplete($job->id);
} catch (Exception $e) {
markJobAsFailed($job->id);
}
}
sleep(1); // 避免CPU过载
}
处理失败和重试机制
为队列任务实现失败重试逻辑,设置最大重试次数和延迟重试策略。
function processWithRetry($job, $maxRetries = 3) {
$attempts = 0;
while ($attempts < $maxRetries) {
try {
return attemptProcessing($job);
} catch (Exception $e) {
$attempts++;
if ($attempts >= $maxRetries) {
throw $e;
}
sleep(pow(2, $attempts)); // 指数退避
}
}
}
监控和日志记录
为队列系统添加监控和日志功能,跟踪任务处理状态和性能指标。

function logQueueActivity($message, $context = []) {
file_put_contents(
'/var/log/queue.log',
date('Y-m-d H:i:s') . ' ' . $message . PHP_EOL,
FILE_APPEND
);
}
选择实现方法时应考虑项目规模、性能需求和运维复杂度。小型项目可从数据库方案开始,高并发系统建议使用 Redis 或专业消息队列。






