Node.js worker_threads模块
worker_threads 提供真正的多线程支持,适合 CPU 密集型任务。
基本概念
与 child_process 对比
| 特性 | worker_threads | child_process |
|---|---|---|
| 内存 | 共享内存 | 独立内存 |
| 启动开销 | 低 | 高 |
| 通信 | SharedArrayBuffer | IPC 序列化 |
| 适用 | CPU密集型 | 任务隔离 |
创建 Worker
基本用法
JavaScript
const { Worker } = require('worker_threads');
const worker = new Worker('./worker.js');
worker.on('message', (result) => {
console.log('结果:', result);
});
worker.on('error', (err) => {
console.error('错误:', err);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.log('Worker 异常退出:', code);
}
});
Worker 文件
JavaScript
// worker.js
const { parentPort } = require('worker_threads');
// 接收消息
parentPort.on('message', (data) => {
// 处理任务
const result = heavyCompute(data);
parentPort.postMessage(result);
});
function heavyCompute(n) {
let sum = 0;
for (let i = 0; i < n; i++) {
sum += i;
}
return sum;
}
数据传递
postMessage
JavaScript
// 主线程
const worker = new Worker('./worker.js');
worker.postMessage({ data: 'hello' });
worker.on('message', (msg) => {
console.log('收到:', msg);
});
// Worker
parentPort.postMessage({ result: 'done' });
传递 Buffer
JavaScript
// 主线程
const buffer = Buffer.from('hello');
worker.postMessage(buffer);
// Worker 收到副本(非共享)
parentPort.on('message', (buffer) => {
console.log(buffer.toString());
});
共享内存
SharedArrayBuffer
JavaScript
const { Worker } = require('worker_threads');
// 创建共享内存
const sharedBuffer = new SharedArrayBuffer(4 * 1024);
const sharedArray = new Int32Array(sharedBuffer);
// 传递共享内存
const worker = new Worker('./worker.js', {
workerData: { shared: sharedArray }
});
// 主线程读写
sharedArray[0] = 100;
Worker 使用共享内存
JavaScript
// worker.js
const { workerData, parentPort } = require('worker_threads');
// 获取共享数据
const sharedArray = workerData.shared;
// 直接读写共享内存
sharedArray[0] = sharedArray[0] + 1;
// 不需要 postMessage 传递
parentPort.postMessage('done');
workerData
初始数据传递
JavaScript
const { Worker } = require('worker_threads');
const worker = new Worker('./worker.js', {
workerData: {
config: { threshold: 100 },
input: [1, 2, 3, 4, 5]
}
});
// worker.js
const { workerData } = require('worker_threads');
console.log('初始数据:', workerData);
const config = workerData.config;
const input = workerData.input;
MessageChannel
双向通信通道
JavaScript
const { Worker, MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
const worker = new Worker('./worker.js');
// 发送端口给 Worker
worker.postMessage({ port: port2 }, [port2]);
// 主线程使用 port1
port1.on('message', (msg) => {
console.log('port1 收到:', msg);
});
port1.postMessage('来自主线程');
Worker 接收端口
JavaScript
// worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (msg) => {
if (msg.port) {
const port = msg.port;
port.on('message', (data) => {
console.log('port 收到:', data);
port.postMessage('来自 Worker');
});
}
});
线程池实现
JavaScript
const { Worker } = require('worker_threads');
const path = require('path');
class ThreadPool {
constructor(size, workerFile) {
this.workers = [];
this.queue = [];
for (let i = 0; i < size; i++) {
const worker = new Worker(workerFile);
worker.on('message', (result) => {
this.resolve(result);
});
this.workers.push({ worker, busy: false });
}
}
run(data) {
const available = this.workers.find(w => !w.busy);
if (available) {
available.busy = true;
available.worker.postMessage(data);
} else {
this.queue.push(data);
}
}
resolve(result) {
console.log('结果:', result);
const worker = this.workers.find(w => w.busy);
if (worker) {
worker.busy = false;
if (this.queue.length) {
worker.busy = true;
worker.worker.postMessage(this.queue.shift());
}
}
}
}
const pool = new ThreadPool(4, './worker.js');
其他 API
isMainThread
JavaScript
const { isMainThread } = require('worker_threads');
if (isMainThread) {
console.log('主线程');
} else {
console.log('Worker 线程');
}
threadId
JavaScript
const { threadId } = require('worker_threads');
console.log('线程ID:', threadId);
getEnvironmentData
JavaScript
const { setEnvironmentData, getEnvironmentData } = require('worker_threads');
// 主线程设置
setEnvironmentData('key', 'value');
// Worker 获取
const value = getEnvironmentData('key');
注意事项
- Worker 不能访问主线程变量
- 共享内存需注意并发同步问题
- Worker 内可创建嵌套 Worker
- 使用 Atomics 实现原子操作避免竞争
要点总结
new Worker(file)创建线程parentPort.postMessage()发送消息SharedArrayBuffer实现共享内存workerData传递初始数据- 线程池管理多个 Worker
📝 发现内容有误?点击此处直接编辑