全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 10 分钟 ✍️ juanwangdev

消息处理链路

一条消息从生产者发送到消费者确认,经历网络接收、协议解析、路由分发、持久化存储、消费者投递五个核心阶段。

完整调用链

erlang
生产者发送
    │
    ▼
[rabbit_reader] TCP 接收帧
    │
    ▼
[rabbit_frame] 解析 AMQP 帧
    │
    ▼
[rabbit_channel] 处理 basic.publish
    │
    ▼
[rabbit_exchange] 查找 Exchange + 匹配 Binding
    │
    ▼
[rabbit_amqqueue] 消息入队
    │
    ▼
[rabbit_backing_queue] 选择存储策略(RAM / Disk)
    │
    ▼
[rabbit_msg_store] 持久化到磁盘(如标记为 persistent)
    │
    ▼
[rabbit_amqqueue] 通知消费者进程
    │
    ▼
[rabbit_channel] 执行 basic.deliver
    │
    ▼
[rabbit_writer] 发送帧到消费者
    │
    ▼
消费者处理 + ACK
    │
    ▼
[rabbit_channel] 处理 basic.ack
    │
    ▼
[rabbit_amqqueue] 从队列移除消息

阶段一:网络接收

rabbit_reader.erl 负责 TCP 连接管理,接收原始字节流。

erlang
% rabbit_reader.erl - 简化示例
main_loop(Socket, State) ->
    % 接收数据
    case gen_tcp:recv(Socket, 0) of
        {ok, Data} ->
            % 解析帧
            {Frame, Rest} = parse_frame(Data),
            % 分发到对应 Channel
            dispatch_frame(Frame, State),
            % 继续循环
            main_loop(Socket, State#state{buffer = Rest});
        {error, closed} ->
            % 连接关闭
            handle_close(State)
    end.

阶段二:协议解析

rabbit_framing.erl 将字节流解析为 AMQP 帧结构。

erlang
% rabbit_framing.erl - 帧解析
parse_frame(Data) ->
    % 帧格式: type(1) + channel(2) + size(4) + payload(N) + end(1)
    <<Type:8, Channel:16, Size:32, Payload:Size/binary, 0:8, Rest/binary>> = Data,
    % 解码方法帧
    Method = decode_method(Type, Payload),
    {#frame{type = Type, channel = Channel, method = Method}, Rest}.

阶段三:Channel 处理

rabbit_channel.erl 处理 AMQP 方法命令。

erlang
% rabbit_channel.erl - basic.publish 处理
handle_method(#'basic.publish'{exchange = Ex, routing_key = RK},
              Content,
              #ch{vhost = VHost} = State) ->

    % 1. 查找 Exchange
    case rabbit_exchange:lookup(VHost, Ex) of
        {ok, Exchange} ->
            % 2. 路由消息
            RouteResult = rabbit_exchange:route(Exchange, RK),
            % 3. 投递到队列
            deliver_to_queuesues(RouteResult, Content, State),
            % 4. 发送 Confirm(如开启)
            maybe_send_confirm(State),
            {noreply, State};
        {error, not_found} ->
            % Exchange 不存在,消息丢弃
            {noreply, State}
    end.

阶段四:路由分发

rabbit_exchange.erl 根据 Exchange 类型和 Binding 规则匹配目标队列。

erlang
% rabbit_exchange.erl - topic exchange 路由
route(#exchange{type = <<"topic">>, bindings = Bindings}, RoutingKey) ->
    % 拆分路由键
    Tokens = binary:split(RoutingKey, <<".">>, [global]),
    % 匹配每个 binding
    [Queue || {Pattern, Queue} <- Bindings,
                match_pattern(Tokens, Pattern)].

% topic 模式匹配
match_pattern(Tokens, Pattern) ->
    PatternTokens = binary:split(Pattern, <<".">>, [global]),
    match_tokens(Tokens, PatternTokens).

阶段五:消息入队与存储

rabbit_amqqueue.erl 将消息写入队列。

erlang
% rabbit_amqqueue.erl - 消息入队
deliver(Queue, Message) ->
    % 判断是否持久化
    case Message#message.delivery_mode of
        2 -> % persistent
            % 写入持久化存储
            rabbit_msg_store:write(Message),
            % 更新队列索引
            rabbit_backing_queue:persist(Queue, Message);
        1 -> % non-persistent
            % 仅内存存储
            rabbit_backing_queue:store(Queue, Message)
    end,
    % 通知消费者进程
    notify_consumers(Queue).

阶段六:消费者投递

rabbit_channel.erl 执行 basic.deliver 将消息发送给消费者。

erlang
% rabbit_channel.erl - 投递到消费者
deliver_to_consumer(ConsumerPid, Message, State) ->
    % 构建 deliver 帧
    Deliver = #'basic.deliver'{
        consumer_tag = State#ch.consumer_tag,
        delivery_tag = State#ch.delivery_tag
    },
    % 构建 header 帧
    Header = build_header_frame(Message),
    % 构建 body 帧
    Body = build_body_frame(Message),
    % 发送
    rabbit_writer:send(ConsumerPid, [Deliver, Header, Body]),
    % 更新未确认计数
    State#ch{unacked = State#ch.unacked + 1}.

阶段七:ACK 处理

rabbit_channel.erl 处理消费者确认。

Java
% rabbit_channel.erl - basic.ack 处理
handle_method(#'basic.ack'{delivery_tag = Tag, multiple = Multiple}, _, State) ->
    % 查找未确认消息
    Unacked = State#ch.unacked_messages,
    case Multiple of
        true ->
            % 确认所有 <= Tag 的消息
            Confirmed = [M || M <- Unacked, M#message.tag =< Tag];
        false ->
            % 仅确认当前 Tag
            Confirmed = [M || M <- Unacked, M#message.tag =:= Tag]
    end,
    % 从队列移除已确认消息
    rabbit_amqqueue:ack(Confirmed),
    % 更新状态
    {noreply, State#ch{unacked = State#ch.unacked - length(Confirmed)}}.

Java 客户端验证

XML
import com.rabbitmq.client.*;

public class MessagePathVerifier {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {

            // 开启 Confirm 追踪发布阶段
            ch.confirmSelect();

            // 开启 Return 追踪路由阶段
            ch.addReturnListener((code, text, ex, rk, props, body) -> {
                System.err.println("[路由失败] 消息未匹配到队列: " + rk);
            });

            // 发布消息
            String message = "trace-path-message";
            ch.basicPublish("test.exchange", "test.key",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());

            // 验证发布成功
            boolean confirmed = ch.waitForConfirms(5000);
            System.out.println("[发布阶段] Confirm: " + confirmed);

            // 消费验证投递阶段
            ch.basicConsume("test.queue", false, (tag, delivery) -> {
                String body = new String(delivery.getBody());
                System.out.println("[投递阶段] Received: " + body);

                // 验证确认阶段
                ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println("[确认阶段] ACK sent");
            }, tag -> {});

            Thread.sleep(2000);
        }
    }
}

Maven 依赖:

text
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

关键路径总结

阶段源码文件核心函数
网络接收rabbit_reader.erlmain_loop/2
协议解析rabbit_framing.erlparse_frame/1
Channel 处理rabbit_channel.erlhandle_method/3
路由分发rabbit_exchange.erlroute/2
消息入队rabbit_amqqueue.erldeliver/2
持久化rabbit_msg_store.erlwrite/1
消费者投递rabbit_channel.erldeliver_to_consumer/3
ACK 处理rabbit_channel.erlhandle_method(basic.ack)

注意事项

  1. Exchange 不存在时消息直接丢弃,不会报错,这是消息"消失"的常见原因
  2. 持久化消息需经历内存 + 磁盘两次写入,性能开销约为非持久化的 2~3 倍
  3. basic.ackmultiple=true 时批量确认,可减少网络往返但增加丢失风险
  4. 消费者 prefetch 值影响消息拉取批量,过大导致单消费者积压
  5. 镜像队列场景下消息需同步到镜像节点,延迟取决于网络与消息大小

要点总结

  • 消息处理链路分为七阶段:网络接收 → 协议解析 → Channel 处理 → 路由分发 → 消息入队 → 消费者投递 → ACK 确认
  • 核心源码文件:rabbit_readerrabbit_channelrabbit_exchangerabbit_amqqueuerabbit_msg_store
  • Exchange 路由匹配决定消息是否入队,匹配失败则消息丢失
  • 持久化消息需写入磁盘,非持久化消息仅存储于内存
  • 消费者 ACK 确认后消息才从队列中真正移除

📝 发现内容有误?点击此处直接编辑

← 上一篇 核心模块源码结构
下一篇 → 调试与性能分析
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库