js 实现消息队列
实现消息队列的基本概念
消息队列是一种异步通信机制,允许不同的应用程序或组件通过发送和接收消息来交互。在JavaScript中,可以使用数组、对象或第三方库来实现消息队列的功能。
使用数组实现简单消息队列
利用数组的push和shift方法可以模拟队列的先进先出(FIFO)特性。

class SimpleQueue {
constructor() {
this.queue = [];
}
enqueue(item) {
this.queue.push(item);
}
dequeue() {
return this.queue.shift();
}
isEmpty() {
return this.queue.length === 0;
}
size() {
return this.queue.length;
}
}
// 使用示例
const queue = new SimpleQueue();
queue.enqueue('Task 1');
queue.enqueue('Task 2');
console.log(queue.dequeue()); // 输出: Task 1
使用Promise实现异步消息队列
结合Promise可以实现异步任务队列,确保任务按顺序执行。
class AsyncQueue {
constructor() {
this.queue = [];
this.isProcessing = false;
}
enqueue(task) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
if (!this.isProcessing) this.process();
});
}
async process() {
this.isProcessing = true;
while (this.queue.length > 0) {
const { task, resolve, reject } = this.queue.shift();
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
}
this.isProcessing = false;
}
}
// 使用示例
const asyncQueue = new AsyncQueue();
asyncQueue.enqueue(() => new Promise(res => setTimeout(() => res('Task 1 done'), 1000)));
asyncQueue.enqueue(() => new Promise(res => setTimeout(() => res('Task 2 done'), 500)));
使用第三方库
对于更复杂的场景,可以使用专门的库如Bull、RabbitMQ或AWS SQS。

Bull示例(基于Redis)
const Queue = require('bull');
const taskQueue = new Queue('tasks', 'redis://127.0.0.1:6379');
taskQueue.process(async (job) => {
console.log(`Processing job ${job.id}:`, job.data);
return { result: job.data.value * 2 };
});
taskQueue.add({ value: 10 });
taskQueue.add({ value: 20 });
处理并发和优先级
可以通过设置并发限制或优先级来优化队列性能。
// Bull并发控制
taskQueue.process(5, async (job) => { /* 最多同时处理5个任务 */ });
// 优先级队列
taskQueue.add({ value: 30 }, { priority: 1 }); // 更高优先级
错误处理和重试机制
为队列添加错误处理和自动重试逻辑。
taskQueue.process(async (job) => {
try {
// 业务逻辑
} catch (error) {
console.error(`Job ${job.id} failed:`, error);
throw error; // Bull会自动重试(如果配置了重试次数)
}
});
// 配置重试
taskQueue.add({ value: 40 }, { attempts: 3, backoff: 1000 });






