全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-15 10 分钟 ✍️ juanwangdev

网络编程底层实现

Node.js 网络模块通过 libuv 封装 socket 操作,实现跨平台非阻塞网络 IO。

Socket 基础

Socket 类型

类型说明Node.js 模块
TCP流式、可靠连接net
UDP报式、无连接dgram
Unix Socket本机进程通信net
PipeWindows named pipenet

Socket 生命周期

C
TCP Socket:
  socket() → bind() → listen() → accept() → recv/send → close()

UDP Socket:
  socket() → bind() → recvfrom/sendto → close()

libuv 网络封装

uv_tcp 结构

C
struct uv_tcp_s {
  uv_stream_t stream;    // 继承 stream
  UV_TCP_PRIVATE_FIELDS
};

// 初始化
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle);

// 绑定地址
int uv_tcp_bind(uv_tcp_t* handle,
                const struct sockaddr* addr,
                unsigned int flags);

// 监听
int uv_listen(uv_stream_t* stream,
              int backlog,
              uv_connection_cb cb);

// 接受连接
int uv_accept(uv_stream_t* server,
              uv_stream_t* client);

uv_udp 结构

C
struct uv_udp_s {
  uv_handle_t handle;
  UV_UDP_PRIVATE_FIELDS
};

// 初始化
int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle);

// 绑定
int uv_udp_bind(uv_udp_t* handle,
                const struct sockaddr* addr,
                unsigned int flags);

// 接收
int uv_udp_recv_start(uv_udp_t* handle,
                      uv_alloc_cb alloc_cb,
                      uv_udp_recv_cb recv_cb);

// 发送
int uv_udp_send(uv_udp_send_t* req,
                uv_udp_t* handle,
                const uv_buf_t bufs[],
                unsigned int nbufs,
                const struct sockaddr* addr,
                uv_udp_send_cb send_cb);

TCP 实现

服务端实现

C
// 创建服务器
uv_tcp_t server;
uv_tcp_init(loop, &server);

// 绑定地址
struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 8080, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);

// 监听
uv_listen((uv_stream_t*)&server, 128, on_new_connection);

// 新连接回调
void on_new_connection(uv_stream_t* server, int status) {
  if (status < 0) return;

  // 创建客户端 handle
  uv_tcp_t* client = malloc(sizeof(uv_tcp_t));
  uv_tcp_init(loop, client);

  // 接受连接
  if (uv_accept(server, (uv_stream_t*)client) == 0) {
    // 开始读取
    uv_read_start((uv_stream_t*)client, alloc_buffer, on_read);
  } else {
    uv_close((uv_handle_t*)client, NULL);
  }
}

数据读写

C
// 分配缓冲区
void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
  buf->base = malloc(suggested_size);
  buf->len = suggested_size;
}

// 读取回调
void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
  if (nread < 0) {
    uv_close((uv_handle_t*)stream, NULL);
    free(buf->base);
    return;
  }

  // 处理数据
  printf("Received %zd bytes\n", nread);

  // 发送响应
  uv_write_t* req = malloc(sizeof(uv_write_t));
  uv_buf_t wrbuf = uv_buf_init("response", 8);
  uv_write(req, stream, &wrbuf, 1, on_write);
}

// 写入回调
void on_write(uv_write_t* req, int status) {
  if (status < 0) {
    fprintf(stderr, "Write error\n");
  }
  free(req);
}

客户端连接

C
// 连接服务器
uv_tcp_t client;
uv_connect_t req;
uv_tcp_init(loop, &client);

struct sockaddr_in addr;
uv_ip4_addr("127.0.0.1", 8080, &addr);

uv_tcp_connect(&req, &client, (const struct sockaddr*)&addr, on_connect);

// 连接回调
void on_connect(uv_connect_t* req, int status) {
  if (status < 0) {
    fprintf(stderr, "Connect error\n");
    return;
  }

  // 开始读写
  uv_read_start((uv_stream_t*)req->handle, alloc_buffer, on_read);
}

UDP 实现

UDP 服务端

C
uv_udp_t server;
uv_udp_init(loop, &server);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 8080, &addr);
uv_udp_bind(&server, (const struct sockaddr*)&addr, UV_UDP_REUSEADDR);

// 开始接收
uv_udp_recv_start(&server, alloc_buffer, on_udp_recv);

// 接收回调
void on_udp_recv(uv_udp_t* handle,
                 ssize_t nread,
                 const uv_buf_t* buf,
                 const struct sockaddr* addr,
                 unsigned flags) {
  if (nread < 0) {
    uv_close((uv_handle_t*)handle, NULL);
    free(buf->base);
    return;
  }

  printf("Received %zd bytes from %s\n", nread,
         inet_ntoa(((struct sockaddr_in*)addr)->sin_addr));

  // 发送响应
  uv_udp_send_t* req = malloc(sizeof(uv_udp_send_t));
  uv_buf_t wrbuf = uv_buf_init("response", 8);
  uv_udp_send(req, handle, &wrbuf, 1, addr, on_udp_send);

  free(buf->base);
}

UDP 客户端

C
uv_udp_t client;
uv_udp_init(loop, &client);

struct sockaddr_in addr;
uv_ip4_addr("127.0.0.1", 8080, &addr);

// 发送数据
uv_udp_send_t req;
uv_buf_t buf = uv_buf_init("hello", 5);
uv_udp_send(&req, &client, &buf, 1, (const struct sockaddr*)&addr, on_send);

IO 观察器机制

uv__io_t 结构

C
// IO 观察器:监听 fd 事件
struct uv__io_s {
  uv_loop_t* loop;
  int fd;
  unsigned int events;    // POLLIN/POLLOUT
  uv__io_cb cb;           // 事件回调
  QUEUE pending_queue;
  QUEUE watcher_queue;
};

// 注册观察器
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  if (w->events == 0) {
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
  }
  w->events |= events;

  // 更新 epoll/kqueue
  if (loop->backend_fd >= 0) {
    uv__platform_io_start(loop, w, events);
  }
}

事件触发流程

C++
1. uv__io_start() 注册 fd 和事件
2. uv__io_poll() 调用 epoll_wait/kqueue
3. 系统返回活跃 fd 列表
4. 遍历调用各 fd 的 uv__io_cb
5. 回调处理数据(read/write)

非阻塞 IO 实现

设置非阻塞

C++
// 设置 socket 非阻塞
int uv__nonblock(int fd, int set) {
  int flags = fcntl(fd, F_GETFL, 0);
  if (flags == -1) return -errno;

  if (set)
    flags |= O_NONBLOCK;
  else
    flags &= ~O_NONBLOCK;

  if (fcntl(fd, F_SETFL, flags) == -1)
    return -errno;

  return 0;
}

// 创建时设置
int uv__socket(int domain, int type, int protocol) {
  int fd = socket(domain, type | SOCK_NONBLOCK, protocol);
  return fd;
}

IO 多路复用对比

平台系统调用最大连接数性能
Linuxepoll无限制O(1)
macOSkqueue无限制O(1)
WindowsIOCP无限制O(1)
旧版select1024O(n)

Node.js net 模块绑定

TCP 类绑定

C
// node_net.cc
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);

  // 创建 uv_tcp_t
  uv_tcp_t* handle = new uv_tcp_t();
  uv_tcp_init(env->event_loop(), handle);

  // 关联 JS 对象
  args.This()->SetAlignedPointerInInternalField(0, handle);
}

void TCPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
  uv_tcp_t* handle = Unwrap(args.This());

  sockaddr_in addr;
  // 解析地址...

  int err = uv_tcp_bind(handle, (sockaddr*)&addr, 0);
  args.GetReturnValue().Set(err);
}

void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {
  uv_tcp_t* handle = Unwrap(args.This());
  int backlog = args[0]->Int32Value();

  int err = uv_listen((uv_stream_t*)handle, backlog, OnConnection);
  args.GetReturnValue().Set(err);
}

连接事件传递

C
// OnConnection 回调触发 JS onconnection
void OnConnection(uv_stream_t* handle, int status) {
  TCPWrap* wrap = (TCPWrap*)handle->data;

  // 触发 JS 'connection' 事件
  Local<Value> argv[] = { Integer::New(status) };
  wrap->MakeCallback(env->onconnection_string(), 1, argv);
}

网络参数调优

连接队列

C
// listen backlog: 等待 accept 的连接数
uv_listen(stream, 128, cb);  // 建议 128-512

// 系统限制
/proc/sys/net/core/somaxconn  // Linux 默认 128

缓冲区大小

C
// Socket 缓冲区
int buf_size = 256 * 1024;  // 256KB
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size));
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size));

// Node.js 设置
socket.setRecvBufferSize(256 * 1024);
socket.setSendBufferSize(256 * 1024);

TCP 参数

JavaScript
// TCP_NODELAY: 禁用 Nagle 算法
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));

// Node.js
socket.setNoDelay(true);

// TCP_KEEPALIVE: 保活
int keepalive = 1;
int idle = 60;   // 60秒无数据发送探测
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive));

// Node.js
socket.setKeepAlive(true, 60000);

性能关键点

连接管理

C
// 高并发服务器优化
// 1. 预创建客户端 handle池
uv_tcp_t* client_pool[1024];
for (int i = 0; i < 1024; i++) {
  uv_tcp_init(loop, &client_pool[i]);
}

// 2. 快速 accept
void on_connection(uv_stream_t* server, int status) {
  uv_tcp_t* client = &client_pool[next++];
  uv_accept(server, (uv_stream_t*)client);
}

// 3. 及时释放关闭连接
uv_close((uv_handle_t*)client, on_close);

数据流处理

JavaScript
// Node.js 高效流处理
const net = require('net');

const server = net.createServer((socket) => {
  // 管道传输,避免内存拷贝
  socket.pipe(dest);

  // 或使用 Transform 流
  const transform = new Transform({
    transform(chunk, encoding, callback) {
      this.push(processChunk(chunk));
      callback();
    }
  });
  socket.pipe(transform).pipe(socket);
});

背压控制

C
// 写入队列满时的背压
int uv_write(uv_write_t* req, uv_stream_t* handle,
             uv_buf_t* bufs, unsigned int nbufs, uv_write_cb cb) {
  // 检查是否可写
  if (handle->write_queue_size > MAX_QUEUE_SIZE) {
    return UV_EAGAIN;  // 阻塞,等待可写事件
  }
  // 正常写入
}
C
// Node.js 背压处理
socket.write(data, (err) => {
  if (err) {
    // 写入失败,等待 drain
    socket.once('drain', () => {
      // 继续写入
    });
  }
});

// 或检查可写状态
if (!socket.write(data)) {
  socket.once('drain', continueWrite);
}

Socket 状态监控

状态查询

text
// libuv 状态
int uv_is_active(uv_handle_t* handle);
int uv_is_closing(uv_handle_t* handle);
size_t uv_stream_get_write_queue_size(uv_stream_t* stream);

// Node.js
socket.bytesRead;
socket.bytesWritten;
socket.readyState;  // 'connecting'/'open'/'closing'/'closed'

错误处理

text
// 错误码映射
UV_ECONNREFUSED   // 连接被拒绝
UV_ECONNRESET     // 连接重置
UV_ETIMEDOUT      // 连接超时
UV_EPIPE          // 管道断开

// Node.js 错误
socket.on('error', (err) => {
  if (err.code === 'ECONNRESET') {
    // 连接被客户端关闭
  }
});

注意:高并发场景下 listen backlog 需调高,避免连接队列溢出导致连接被拒绝。

要点总结

  • libuv 封装 epoll/kqueue/IOCP 实现跨平台非阻塞网络 IO
  • uv_tcp 处理 TCP 连接,uv_udp 处理 UDP 报文
  • uv__io_t 是 IO 观察器,注册 fd 事件并触发回调
  • backlog、缓冲区大小、TCP_NODELAY 是关键调优参数
  • write 返回 false 时等待 drain 事件处理背压

📝 发现内容有误?点击此处直接编辑

← 上一篇 垃圾回收机制
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库