js 实现一个并发队列
并发队列的实现方法
在JavaScript中实现并发队列可以通过多种方式完成,通常需要结合Promise、async/await以及队列管理机制。以下是几种常见的实现方法。
使用Promise和async/await
通过Promise和async/await可以方便地控制并发任务的执行顺序和并发数量。
class ConcurrentQueue {
constructor(concurrency) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
add(task) {
this.queue.push(task);
this.next();
}
next() {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift();
this.running++;
task().then(() => {
this.running--;
this.next();
});
}
}
}
使用async库
async库提供了多种并发控制方法,如parallelLimit和queue,可以简化并发队列的实现。
const async = require('async');
const queue = async.queue((task, callback) => {
task().then(() => callback()).catch(callback);
}, concurrencyLimit);
queue.drain = () => {
console.log('All tasks completed');
};
tasks.forEach(task => queue.push(task));
使用Worker Threads
对于CPU密集型任务,可以使用Node.js的Worker Threads实现真正的并行处理。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.postMessage('task data');
} else {
parentPort.on('message', (msg) => {
// Process task
parentPort.postMessage('result');
});
}
使用事件驱动机制
通过事件驱动机制可以实现任务的动态调度和并发控制。
const EventEmitter = require('events');
class TaskQueue extends EventEmitter {
constructor(concurrency) {
super();
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
push(task) {
this.queue.push(task);
process.nextTick(() => this.next());
return this;
}
next() {
if (this.running >= this.concurrency || !this.queue.length) return;
const task = this.queue.shift();
task(() => {
this.running--;
this.next();
});
this.running++;
this.next();
}
}
使用生成器函数
生成器函数可以用于控制任务的执行顺序和并发数量。
function* taskGenerator(tasks) {
for (const task of tasks) {
yield task();
}
}
const tasks = [/* array of async functions */];
const gen = taskGenerator(tasks);
function run(concurrency) {
for (let i = 0; i < concurrency; i++) {
const { value, done } = gen.next();
if (done) break;
value.then(() => run(1));
}
}
run(concurrencyLimit);
注意事项
- 确保任务错误被正确处理,避免未捕获的Promise rejection。
- 根据任务类型选择合适的并发控制方法,IO密集型任务和CPU密集型任务可能需要不同的策略。
- 监控队列状态,避免内存泄漏或任务堆积。







