网络编程底层实现
Node.js 网络模块通过 libuv 封装 socket 操作,实现跨平台非阻塞网络 IO。
Socket 基础
Socket 类型
| 类型 | 说明 | Node.js 模块 |
|---|---|---|
| TCP | 流式、可靠连接 | net |
| UDP | 报式、无连接 | dgram |
| Unix Socket | 本机进程通信 | net |
| Pipe | Windows named pipe | net |
Socket 生命周期
C
TCP Socket:
socket() → bind() → listen() → accept() → recv/send → close()
UDP Socket:
socket() → bind() → recvfrom/sendto → close()
libuv 网络封装
uv_tcp 结构
C
struct uv_tcp_s {
uv_stream_t stream; // 继承 stream
UV_TCP_PRIVATE_FIELDS
};
// 初始化
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle);
// 绑定地址
int uv_tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int flags);
// 监听
int uv_listen(uv_stream_t* stream,
int backlog,
uv_connection_cb cb);
// 接受连接
int uv_accept(uv_stream_t* server,
uv_stream_t* client);
uv_udp 结构
C
struct uv_udp_s {
uv_handle_t handle;
UV_UDP_PRIVATE_FIELDS
};
// 初始化
int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle);
// 绑定
int uv_udp_bind(uv_udp_t* handle,
const struct sockaddr* addr,
unsigned int flags);
// 接收
int uv_udp_recv_start(uv_udp_t* handle,
uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb);
// 发送
int uv_udp_send(uv_udp_send_t* req,
uv_udp_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
const struct sockaddr* addr,
uv_udp_send_cb send_cb);
TCP 实现
服务端实现
C
// 创建服务器
uv_tcp_t server;
uv_tcp_init(loop, &server);
// 绑定地址
struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 8080, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
// 监听
uv_listen((uv_stream_t*)&server, 128, on_new_connection);
// 新连接回调
void on_new_connection(uv_stream_t* server, int status) {
if (status < 0) return;
// 创建客户端 handle
uv_tcp_t* client = malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
// 接受连接
if (uv_accept(server, (uv_stream_t*)client) == 0) {
// 开始读取
uv_read_start((uv_stream_t*)client, alloc_buffer, on_read);
} else {
uv_close((uv_handle_t*)client, NULL);
}
}
数据读写
C
// 分配缓冲区
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
// 读取回调
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
if (nread < 0) {
uv_close((uv_handle_t*)stream, NULL);
free(buf->base);
return;
}
// 处理数据
printf("Received %zd bytes\n", nread);
// 发送响应
uv_write_t* req = malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init("response", 8);
uv_write(req, stream, &wrbuf, 1, on_write);
}
// 写入回调
void on_write(uv_write_t* req, int status) {
if (status < 0) {
fprintf(stderr, "Write error\n");
}
free(req);
}
客户端连接
C
// 连接服务器
uv_tcp_t client;
uv_connect_t req;
uv_tcp_init(loop, &client);
struct sockaddr_in addr;
uv_ip4_addr("127.0.0.1", 8080, &addr);
uv_tcp_connect(&req, &client, (const struct sockaddr*)&addr, on_connect);
// 连接回调
void on_connect(uv_connect_t* req, int status) {
if (status < 0) {
fprintf(stderr, "Connect error\n");
return;
}
// 开始读写
uv_read_start((uv_stream_t*)req->handle, alloc_buffer, on_read);
}
UDP 实现
UDP 服务端
C
uv_udp_t server;
uv_udp_init(loop, &server);
struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 8080, &addr);
uv_udp_bind(&server, (const struct sockaddr*)&addr, UV_UDP_REUSEADDR);
// 开始接收
uv_udp_recv_start(&server, alloc_buffer, on_udp_recv);
// 接收回调
void on_udp_recv(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf,
const struct sockaddr* addr,
unsigned flags) {
if (nread < 0) {
uv_close((uv_handle_t*)handle, NULL);
free(buf->base);
return;
}
printf("Received %zd bytes from %s\n", nread,
inet_ntoa(((struct sockaddr_in*)addr)->sin_addr));
// 发送响应
uv_udp_send_t* req = malloc(sizeof(uv_udp_send_t));
uv_buf_t wrbuf = uv_buf_init("response", 8);
uv_udp_send(req, handle, &wrbuf, 1, addr, on_udp_send);
free(buf->base);
}
UDP 客户端
C
uv_udp_t client;
uv_udp_init(loop, &client);
struct sockaddr_in addr;
uv_ip4_addr("127.0.0.1", 8080, &addr);
// 发送数据
uv_udp_send_t req;
uv_buf_t buf = uv_buf_init("hello", 5);
uv_udp_send(&req, &client, &buf, 1, (const struct sockaddr*)&addr, on_send);
IO 观察器机制
uv__io_t 结构
C
// IO 观察器:监听 fd 事件
struct uv__io_s {
uv_loop_t* loop;
int fd;
unsigned int events; // POLLIN/POLLOUT
uv__io_cb cb; // 事件回调
QUEUE pending_queue;
QUEUE watcher_queue;
};
// 注册观察器
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
if (w->events == 0) {
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
}
w->events |= events;
// 更新 epoll/kqueue
if (loop->backend_fd >= 0) {
uv__platform_io_start(loop, w, events);
}
}
事件触发流程
C++
1. uv__io_start() 注册 fd 和事件
2. uv__io_poll() 调用 epoll_wait/kqueue
3. 系统返回活跃 fd 列表
4. 遍历调用各 fd 的 uv__io_cb
5. 回调处理数据(read/write)
非阻塞 IO 实现
设置非阻塞
C++
// 设置 socket 非阻塞
int uv__nonblock(int fd, int set) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -errno;
if (set)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1)
return -errno;
return 0;
}
// 创建时设置
int uv__socket(int domain, int type, int protocol) {
int fd = socket(domain, type | SOCK_NONBLOCK, protocol);
return fd;
}
IO 多路复用对比
| 平台 | 系统调用 | 最大连接数 | 性能 |
|---|---|---|---|
| Linux | epoll | 无限制 | O(1) |
| macOS | kqueue | 无限制 | O(1) |
| Windows | IOCP | 无限制 | O(1) |
| 旧版 | select | 1024 | O(n) |
Node.js net 模块绑定
TCP 类绑定
C
// node_net.cc
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
// 创建 uv_tcp_t
uv_tcp_t* handle = new uv_tcp_t();
uv_tcp_init(env->event_loop(), handle);
// 关联 JS 对象
args.This()->SetAlignedPointerInInternalField(0, handle);
}
void TCPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
uv_tcp_t* handle = Unwrap(args.This());
sockaddr_in addr;
// 解析地址...
int err = uv_tcp_bind(handle, (sockaddr*)&addr, 0);
args.GetReturnValue().Set(err);
}
void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {
uv_tcp_t* handle = Unwrap(args.This());
int backlog = args[0]->Int32Value();
int err = uv_listen((uv_stream_t*)handle, backlog, OnConnection);
args.GetReturnValue().Set(err);
}
连接事件传递
C
// OnConnection 回调触发 JS onconnection
void OnConnection(uv_stream_t* handle, int status) {
TCPWrap* wrap = (TCPWrap*)handle->data;
// 触发 JS 'connection' 事件
Local<Value> argv[] = { Integer::New(status) };
wrap->MakeCallback(env->onconnection_string(), 1, argv);
}
网络参数调优
连接队列
C
// listen backlog: 等待 accept 的连接数
uv_listen(stream, 128, cb); // 建议 128-512
// 系统限制
/proc/sys/net/core/somaxconn // Linux 默认 128
缓冲区大小
C
// Socket 缓冲区
int buf_size = 256 * 1024; // 256KB
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size));
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size));
// Node.js 设置
socket.setRecvBufferSize(256 * 1024);
socket.setSendBufferSize(256 * 1024);
TCP 参数
JavaScript
// TCP_NODELAY: 禁用 Nagle 算法
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
// Node.js
socket.setNoDelay(true);
// TCP_KEEPALIVE: 保活
int keepalive = 1;
int idle = 60; // 60秒无数据发送探测
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive));
// Node.js
socket.setKeepAlive(true, 60000);
性能关键点
连接管理
C
// 高并发服务器优化
// 1. 预创建客户端 handle池
uv_tcp_t* client_pool[1024];
for (int i = 0; i < 1024; i++) {
uv_tcp_init(loop, &client_pool[i]);
}
// 2. 快速 accept
void on_connection(uv_stream_t* server, int status) {
uv_tcp_t* client = &client_pool[next++];
uv_accept(server, (uv_stream_t*)client);
}
// 3. 及时释放关闭连接
uv_close((uv_handle_t*)client, on_close);
数据流处理
JavaScript
// Node.js 高效流处理
const net = require('net');
const server = net.createServer((socket) => {
// 管道传输,避免内存拷贝
socket.pipe(dest);
// 或使用 Transform 流
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(processChunk(chunk));
callback();
}
});
socket.pipe(transform).pipe(socket);
});
背压控制
C
// 写入队列满时的背压
int uv_write(uv_write_t* req, uv_stream_t* handle,
uv_buf_t* bufs, unsigned int nbufs, uv_write_cb cb) {
// 检查是否可写
if (handle->write_queue_size > MAX_QUEUE_SIZE) {
return UV_EAGAIN; // 阻塞,等待可写事件
}
// 正常写入
}
C
// Node.js 背压处理
socket.write(data, (err) => {
if (err) {
// 写入失败,等待 drain
socket.once('drain', () => {
// 继续写入
});
}
});
// 或检查可写状态
if (!socket.write(data)) {
socket.once('drain', continueWrite);
}
Socket 状态监控
状态查询
text
// libuv 状态
int uv_is_active(uv_handle_t* handle);
int uv_is_closing(uv_handle_t* handle);
size_t uv_stream_get_write_queue_size(uv_stream_t* stream);
// Node.js
socket.bytesRead;
socket.bytesWritten;
socket.readyState; // 'connecting'/'open'/'closing'/'closed'
错误处理
text
// 错误码映射
UV_ECONNREFUSED // 连接被拒绝
UV_ECONNRESET // 连接重置
UV_ETIMEDOUT // 连接超时
UV_EPIPE // 管道断开
// Node.js 错误
socket.on('error', (err) => {
if (err.code === 'ECONNRESET') {
// 连接被客户端关闭
}
});
注意:高并发场景下 listen backlog 需调高,避免连接队列溢出导致连接被拒绝。
要点总结
- libuv 封装 epoll/kqueue/IOCP 实现跨平台非阻塞网络 IO
- uv_tcp 处理 TCP 连接,uv_udp 处理 UDP 报文
- uv__io_t 是 IO 观察器,注册 fd 事件并触发回调
- backlog、缓冲区大小、TCP_NODELAY 是关键调优参数
- write 返回 false 时等待 drain 事件处理背压
📝 发现内容有误?点击此处直接编辑