php队列实现
PHP 队列实现方法
PHP 队列可以通过多种方式实现,包括数据库、文件系统、Redis 和专业的队列系统如 RabbitMQ。以下是几种常见的实现方法:
使用数据库实现队列
数据库是最简单的队列实现方式之一,可以通过创建任务表来管理队列任务。
// 创建任务表
CREATE TABLE queue_tasks (
id INT AUTO_INCREMENT PRIMARY KEY,
task_data TEXT NOT NULL,
status ENUM('pending', 'processing', 'completed') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
插入任务到队列:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'username', 'password');
$stmt = $pdo->prepare("INSERT INTO queue_tasks (task_data) VALUES (?)");
$stmt->execute([json_encode(['action' => 'send_email', 'data' => $emailData])]);
处理队列任务:
$stmt = $pdo->prepare("SELECT * FROM queue_tasks WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1 FOR UPDATE");
$stmt->execute();
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
$pdo->prepare("UPDATE queue_tasks SET status = 'processing' WHERE id = ?")->execute([$task['id']]);
// 处理任务逻辑
$taskData = json_decode($task['task_data'], true);
// 任务完成后更新状态
$pdo->prepare("UPDATE queue_tasks SET status = 'completed' WHERE id = ?")->execute([$task['id']]);
}
使用 Redis 实现队列
Redis 的列表数据结构非常适合实现队列,具有高性能和原子性操作的优势。
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 添加任务到队列
$redis->lPush('task_queue', json_encode(['action' => 'process_image', 'data' => $imageData]));
// 处理队列任务
while ($taskJson = $redis->rPop('task_queue')) {
$task = json_decode($taskJson, true);
// 执行任务逻辑
}
使用专业队列系统(如 RabbitMQ)
RabbitMQ 是一个功能强大的消息队列系统,适合高并发和分布式场景。
安装 RabbitMQ PHP 客户端:
composer require php-amqplib/php-amqplib
生产者代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$msg = new AMQPMessage(json_encode(['action' => 'generate_report', 'data' => $reportData]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');
$channel->close();
$connection->close();
消费者代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$callback = function ($msg) {
$task = json_decode($msg->body, true);
// 处理任务逻辑
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
使用 Laravel 队列系统
Laravel 提供了内置的队列系统,支持数据库、Redis、Amazon SQS 等多种驱动。
创建任务:
php artisan make:job ProcessPodcast
定义任务逻辑:
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function handle()
{
// 任务处理逻辑
}
}
分发任务:
ProcessPodcast::dispatch($podcast);
启动队列处理器:
php artisan queue:work
使用 Symfony Messenger 组件
Symfony 的 Messenger 组件提供了灵活的队列消息处理系统。
安装组件:
composer require symfony/messenger
配置消息和处理器:
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\MyMessage': async
创建消息和处理器:
namespace App\Message;
class MyMessage
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
分发消息:
$bus->dispatch(new MyMessage('Hello!'));
启动消息处理器:
php bin/console messenger:consume async -vv
队列实现的选择建议
- 对于小型应用或开发环境,数据库队列简单易用
- 需要高性能时,Redis 是理想选择
- 大型分布式系统建议使用 RabbitMQ 或 Amazon SQS
- Laravel 和 Symfony 项目可以使用框架内置队列系统
- 关键任务需要考虑队列持久化和失败重试机制
每种实现方式都有其适用场景,应根据项目规模、性能需求和团队熟悉程度选择合适的方案。







