libuv工作原理
libuv 是 Node.js 异步 IO 的底层实现,封装各平台异步机制,提供统一的事件驱动接口。
libuv 架构
核心组件
C
+--------------------------------------------------+
| Node.js |
+--------------------------------------------------+
| libuv |
| +-----------+ +-----------+ +-----------+ |
| |Event Loop | |Thread Pool| |Async IO | |
| +-----------+ +-----------+ +-----------+ |
+--------------------------------------------------+
| Platform Abstraction |
| +-----+ +------+ +------+ +------+ |
| |epoll| |kqueue| |IOCP | |ports | |
| |Linux| |macOS | |Win | |Solaris| |
| +-----+ +------+ +------+ +------+ |
+--------------------------------------------------+
主要功能模块
| 模块 | 功能 | 实现方式 |
|---|---|---|
| Event Loop | 事件调度 | 单线程循环 |
| Thread Pool | 文件 IO/DNS | 4个工作线程 |
| Network IO | TCP/UDP | 各平台异步接口 |
| Timer | 定时器 | 红黑树 |
| Signal | 信号处理 | 信号处理器 |
| Async | 异步通知 | async_handle |
事件循环实现
uv_loop 结构
C
// libuv 核心结构
struct uv_loop_s {
void* data; // 用户数据
unsigned int active_handles; // 活跃 handle 数
unsigned int active_reqs; // 活跃 request 数
unsigned int stop_flag; // 停止标志
// 队列
QUEUE handle_queue; // handle 队列
QUEUE req_queue; // request 队列
// 定时器
uv_timer_t* timer_heap; // 定时器堆
// IO 观察
uv__io_t** watchers; // IO 观察器
unsigned int nwatchers; // 观察器数量
unsigned int nfds; // 文件描述符数量
// 后端实现
uv__io_t backend_fd; // epoll/kqueue/IOCP
};
事件循环流程
C
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
while (loop->stop_flag == 0) {
// 1. 更新时间
uv__update_time(loop);
// 2. 运行定时器
uv__run_timers(loop);
// 3. 运行 pending callbacks
uv__run_pending(loop);
// 4. 运行 idle handlers(可选)
uv__run_idle(loop);
// 5. 运行 prepare handlers(可选)
uv__run_prepare(loop);
// 6. IO poll(核心)
uv__io_poll(loop, timeout);
// 7. 运行 check handlers
uv__run_check(loop);
// 8. 运行 close callbacks
uv__run_closing_handles(loop);
}
}
六阶段对应关系
C
Node.js阶段 libuv函数
─────────────────────────────────
timers uv__run_timers()
pending callbacks uv__run_pending()
idle, prepare uv__run_idle/prepare()
poll uv__io_poll()
check uv__run_check()
close callbacks uv__run_closing_handles()
Handle 与 Request
Handle(长期存在)
C
// Handle 类型:长期存在的资源
typedef struct {
uv_handle_t handle; // 基类
// 特定数据
} uv_tcp_t, uv_timer_t, uv_idle_t;
// Handle 操作
uv_handle_init(loop, handle);
uv_handle_start(handle); // 开始监听
uv_handle_stop(handle); // 停止监听
uv_close(handle, callback); // 关闭并回收
Request(一次性)
C
// Request 类型:一次性操作
typedef struct {
uv_req_t req; // 基类
// 特定数据
} uv_write_t, uv_connect_t;
// Request 操作流程
uv_write_t* req = malloc(...);
uv_write(req, stream, bufs, nbufs, callback);
// callback 触发后 request 完成
类型对比
| 类型 | 生命周期 | 示例 |
|---|---|---|
| Handle | 长期存在 | TCP连接、定时器、信号 |
| Request | 一次性 | write、connect、DNS查询 |
线程池机制
线程池架构
C
Main Thread
│
┌──────────┼──────────┐
│ │ │
Worker1 Worker2 Worker3 Worker4
│ │ │ │
└──────────┴──────────┴────┘
│
Task Queue
线程池配置
C
// 默认 4 个线程
// 通过环境变量调整
UV_THREADPOOL_SIZE=128
// 最大限制
#define UV_THREADPOOL_MAX 128
线程池处理类型
C
// 使用线程池的操作
uv_fs_read() // 文件读取
uv_fs_write() // 文件写入
uv_fs_stat() // 文件状态
uv_getaddrinfo() // DNS 解析
uv_getnameinfo() // DNS 反查
uv_random() // 随机数
任务队列实现
C
// 任务队列结构
static QUEUE work_queue;
static uv_async_t async_handle;
// 提交任务
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
uv__work_cb work_cb,
uv__done_cb done_cb) {
w->loop = loop;
w->work_cb = work_cb;
w->done_cb = done_cb;
QUEUE_INSERT_TAIL(&work_queue, &w->wq);
uv_async_send(&async_handle); // 通知主线程
}
// Worker 执行
static void worker(void* arg) {
for (;;) {
uv_mutex_lock(&mutex);
while (QUEUE_EMPTY(&work_queue))
uv_cond_wait(&cond, &mutex);
QUEUE* q = QUEUE_HEAD(&work_queue);
QUEUE_REMOVE(q);
uv_mutex_unlock(&mutex);
uv__work* w = QUEUE_DATA(q, uv__work, wq);
w->work_cb(w); // 执行任务
post(loop, w); // 通知完成
}
}
IO Poll 实现
epoll(Linux)
C
// epoll 实现
static void uv__io_poll(uv_loop_t* loop, int timeout) {
// 构建 epoll_event 数组
struct epoll_event events[1024];
int nevents;
// 调用 epoll_wait
nevents = epoll_wait(loop->backend_fd, events, 1024, timeout);
// 处理事件
for (i = 0; i < nevents; i++) {
uv__io_t* w = events[i].data.ptr;
w->cb(loop, w, events[i].events);
}
}
kqueue(macOS)
C
// kqueue 实现
static void uv__io_poll(uv_loop_t* loop, int timeout) {
struct kevent events[1024];
int nevents;
// 调用 kevent
nevents = kevent(loop->backend_fd, NULL, 0, events, 1024, &spec);
// 处理事件
for (i = 0; i < nevents; i++) {
uv__io_t* w = events[i].udata;
w->cb(loop, w, events[i].filter);
}
}
IOCP(Windows)
C
// IOCP 实现
static void uv__io_poll(uv_loop_t* loop, int timeout) {
ULONG bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
// 调用 GetQueuedCompletionStatus
GetQueuedCompletionStatus(loop->iocp, &bytes, &key, &overlapped, timeout);
// 处理完成
uv_req_t* req = (uv_req_t*)key;
req->cb(req, bytes);
}
平台差异对比
| 平台 | 系统调用 | 特点 |
|---|---|---|
| Linux | epoll | 边缘触发/水平触发 |
| macOS | kqueue | 事件过滤器 |
| Windows | IOCP | 完成端口,真正的异步 |
| Solaris | port_create | 事件端口 |
定时器实现
红黑树结构
C
// 定时器使用红黑树存储(按时间排序)
struct uv_timer_s {
uv_handle_t handle;
uint64_t timeout; // 触发时间
uint64_t repeat; // 重复间隔
uv_timer_cb cb; // 回调函数
};
// 插入定时器
void uv_timer_start(uv_timer_t* handle,
uv_timer_cb cb,
uint64_t timeout,
uint64_t repeat) {
handle->timeout = loop->time + timeout;
handle->repeat = repeat;
handle->cb = cb;
// 插入红黑树
heap_insert(&loop->timer_heap, handle);
}
定时器触发
C
static void uv__run_timers(uv_loop_t* loop) {
while (loop->timer_heap != NULL) {
uv_timer_t* handle = loop->timer_heap;
// 时间未到,退出
if (handle->timeout > loop->time)
break;
// 移除并触发回调
heap_remove(&loop->timer_heap, handle);
handle->cb(handle);
// 重复定时器重新插入
if (handle->repeat > 0) {
handle->timeout += handle->repeat;
heap_insert(&loop->timer_heap, handle);
}
}
}
异步通知机制
uv_async
C
// async 用于跨线程通知
struct uv_async_s {
uv_handle_t handle;
uv_async_cb cb;
};
// 发送通知(可在任意线程调用)
int uv_async_send(uv_async_t* handle) {
// 设置事件标志
uv__async_send(handle->loop);
// 事件循环会在 poll 阶段检测到并触发回调
}
// 实现原理(Linux)
static void uv__async_send(uv_loop_t* loop) {
// 写入通知管道
write(loop->async_pipe[1], "x", 1);
}
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// 读取并清空管道
char buf[1024];
read(loop->async_pipe[0], buf, sizeof(buf));
// 触发 async 回调
QUEUE_FOREACH(q, &loop->async_handles) {
uv_async_t* handle = QUEUE_DATA(q, uv_async_t, queue);
handle->cb(handle);
}
}
Handle 类型详解
TCP Handle
JavaScript
// TCP 连接管理
struct uv_tcp_s {
uv_stream_t stream; // 继承 stream
int fd; // socket fd
};
// 创建 TCP
uv_tcp_init(loop, &tcp);
// 绑定地址
uv_tcp_bind(&tcp, &addr, 0);
// 监听
uv_listen((uv_stream_t*)&tcp, backlog, on_connection);
// 接受连接
uv_accept((uv_stream_t*)&server, (uv_stream_t*)&client);
Pipe Handle
JavaScript
// Unix pipe 或 Windows named pipe
struct uv_pipe_s {
uv_stream_t stream;
int fd;
};
// 创建 pipe
uv_pipe_init(loop, &pipe, 0);
// 打开已有 fd
uv_pipe_open(&pipe, fd);
// 绑定地址
uv_pipe_bind(&pipe, "/tmp/my.pipe");
// 连接
uv_pipe_connect(&req, &pipe, "/tmp/my.pipe", on_connect);
libuv 与 Node.js 绑定
Node.js 绑定层
Bash
// node.cc 中的绑定
void LibuvWrap(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
// 创建 JS 对象
Local<Object> obj = args[0]->ToObject(isolate);
// 关联 libuv handle
uv_handle_t* handle = new uv_handle_t();
handle->data = obj; // 保存 JS 对象引用
args.GetReturnValue().Set(obj);
}
JS 层调用
text
// net.js
const { TCP, Pipe } = process.binding('libuv');
// 创建 TCP handle
const handle = new TCP();
handle.bind(address);
handle.listen(backlog);
调试与分析
查看 libuv 内部状态
text
const uv = process.binding('uv');
// 获取 loop 信息
console.log(uv.loop_alive());
// 获取 handle 数量
console.log(uv.handle_count());
libuv 日志
text
# 编译时启用日志
./configure --enable-debug
make
# 运行时打印
UV_DEBUG=1 node app.js
注意:libuv 线程池默认 4 线程,大文件 IO 场景建议增大到 4×CPU核心数。
要点总结
- libuv 封装 epoll/kqueue/IOCP 提供跨平台异步 IO
- Handle 长期存在(TCP连接),Request 一次性(write操作)
- 文件 IO 和 DNS 使用线程池,网络 IO 使用系统异步接口
- 定时器用红黑树存储,按时间排序高效触发
- uv_async 实现跨线程安全通知
📝 发现内容有误?点击此处直接编辑