php实现mq
PHP 实现消息队列(MQ)的方法
使用 Redis 实现简单消息队列
Redis 的列表结构可以用于实现简单的消息队列。通过 LPUSH 和 BRPOP 命令实现生产者和消费者模式。
生产者示例代码:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->lPush('message_queue', 'Message content');
消费者示例代码:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while (true) {
$message = $redis->brPop('message_queue', 0);
processMessage($message[1]);
}
使用 RabbitMQ 实现专业消息队列
RabbitMQ 是一个功能完善的消息队列系统,需要安装 PHP 的 AMQP 扩展。
生产者示例代码:
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest'
]);
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消费者示例代码:
$connection = new AMQPConnection([...]);
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$callback = function($msg) {
echo "Received: ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
使用 Beanstalkd 实现轻量级队列
Beanstalkd 是一个轻量级的消息队列服务,适合简单应用场景。
生产者示例代码:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
$pheanstalk->useTube('testtube')->put('message data');
消费者示例代码:
$pheanstalk = new Pheanstalk\Pheanstalk('127.0.0.1');
$job = $pheanstalk->watch('testtube')->ignore('default')->reserve();
echo $job->getData();
$pheanstalk->delete($job);
使用数据库实现消息队列
在没有专门消息队列服务的情况下,可以使用数据库表模拟队列功能。
创建消息表:
CREATE TABLE message_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
message TEXT NOT NULL,
status TINYINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
生产者代码:
$pdo = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$stmt = $pdo->prepare("INSERT INTO message_queue (message) VALUES (?)");
$stmt->execute(['Message content']);
消费者代码:
$pdo = new PDO(...);
$pdo->beginTransaction();
$stmt = $pdo->prepare("SELECT * FROM message_queue WHERE status = 0 ORDER BY id ASC LIMIT 1 FOR UPDATE");
$stmt->execute();
$message = $stmt->fetch();
if ($message) {
processMessage($message['message']);
$pdo->prepare("UPDATE message_queue SET status = 1 WHERE id = ?")->execute([$message['id']]);
$pdo->commit();
}
使用 PHP 扩展 Swoole 实现高性能队列
Swoole 提供了高性能的进程间通信和队列功能。
示例代码:
$queue = new Swoole\MsgQueue(0x7000001);
$queue->push("Hello World");
$message = $queue->pop();
echo "Received: $message\n";






