消息处理链路
一条消息从生产者发送到消费者确认,经历网络接收、协议解析、路由分发、持久化存储、消费者投递五个核心阶段。
完整调用链
erlang
生产者发送
│
▼
[rabbit_reader] TCP 接收帧
│
▼
[rabbit_frame] 解析 AMQP 帧
│
▼
[rabbit_channel] 处理 basic.publish
│
▼
[rabbit_exchange] 查找 Exchange + 匹配 Binding
│
▼
[rabbit_amqqueue] 消息入队
│
▼
[rabbit_backing_queue] 选择存储策略(RAM / Disk)
│
▼
[rabbit_msg_store] 持久化到磁盘(如标记为 persistent)
│
▼
[rabbit_amqqueue] 通知消费者进程
│
▼
[rabbit_channel] 执行 basic.deliver
│
▼
[rabbit_writer] 发送帧到消费者
│
▼
消费者处理 + ACK
│
▼
[rabbit_channel] 处理 basic.ack
│
▼
[rabbit_amqqueue] 从队列移除消息
阶段一:网络接收
rabbit_reader.erl 负责 TCP 连接管理,接收原始字节流。
erlang
% rabbit_reader.erl - 简化示例
main_loop(Socket, State) ->
% 接收数据
case gen_tcp:recv(Socket, 0) of
{ok, Data} ->
% 解析帧
{Frame, Rest} = parse_frame(Data),
% 分发到对应 Channel
dispatch_frame(Frame, State),
% 继续循环
main_loop(Socket, State#state{buffer = Rest});
{error, closed} ->
% 连接关闭
handle_close(State)
end.
阶段二:协议解析
rabbit_framing.erl 将字节流解析为 AMQP 帧结构。
erlang
% rabbit_framing.erl - 帧解析
parse_frame(Data) ->
% 帧格式: type(1) + channel(2) + size(4) + payload(N) + end(1)
<<Type:8, Channel:16, Size:32, Payload:Size/binary, 0:8, Rest/binary>> = Data,
% 解码方法帧
Method = decode_method(Type, Payload),
{#frame{type = Type, channel = Channel, method = Method}, Rest}.
阶段三:Channel 处理
rabbit_channel.erl 处理 AMQP 方法命令。
erlang
% rabbit_channel.erl - basic.publish 处理
handle_method(#'basic.publish'{exchange = Ex, routing_key = RK},
Content,
#ch{vhost = VHost} = State) ->
% 1. 查找 Exchange
case rabbit_exchange:lookup(VHost, Ex) of
{ok, Exchange} ->
% 2. 路由消息
RouteResult = rabbit_exchange:route(Exchange, RK),
% 3. 投递到队列
deliver_to_queuesues(RouteResult, Content, State),
% 4. 发送 Confirm(如开启)
maybe_send_confirm(State),
{noreply, State};
{error, not_found} ->
% Exchange 不存在,消息丢弃
{noreply, State}
end.
阶段四:路由分发
rabbit_exchange.erl 根据 Exchange 类型和 Binding 规则匹配目标队列。
erlang
% rabbit_exchange.erl - topic exchange 路由
route(#exchange{type = <<"topic">>, bindings = Bindings}, RoutingKey) ->
% 拆分路由键
Tokens = binary:split(RoutingKey, <<".">>, [global]),
% 匹配每个 binding
[Queue || {Pattern, Queue} <- Bindings,
match_pattern(Tokens, Pattern)].
% topic 模式匹配
match_pattern(Tokens, Pattern) ->
PatternTokens = binary:split(Pattern, <<".">>, [global]),
match_tokens(Tokens, PatternTokens).
阶段五:消息入队与存储
rabbit_amqqueue.erl 将消息写入队列。
erlang
% rabbit_amqqueue.erl - 消息入队
deliver(Queue, Message) ->
% 判断是否持久化
case Message#message.delivery_mode of
2 -> % persistent
% 写入持久化存储
rabbit_msg_store:write(Message),
% 更新队列索引
rabbit_backing_queue:persist(Queue, Message);
1 -> % non-persistent
% 仅内存存储
rabbit_backing_queue:store(Queue, Message)
end,
% 通知消费者进程
notify_consumers(Queue).
阶段六:消费者投递
rabbit_channel.erl 执行 basic.deliver 将消息发送给消费者。
erlang
% rabbit_channel.erl - 投递到消费者
deliver_to_consumer(ConsumerPid, Message, State) ->
% 构建 deliver 帧
Deliver = #'basic.deliver'{
consumer_tag = State#ch.consumer_tag,
delivery_tag = State#ch.delivery_tag
},
% 构建 header 帧
Header = build_header_frame(Message),
% 构建 body 帧
Body = build_body_frame(Message),
% 发送
rabbit_writer:send(ConsumerPid, [Deliver, Header, Body]),
% 更新未确认计数
State#ch{unacked = State#ch.unacked + 1}.
阶段七:ACK 处理
rabbit_channel.erl 处理消费者确认。
Java
% rabbit_channel.erl - basic.ack 处理
handle_method(#'basic.ack'{delivery_tag = Tag, multiple = Multiple}, _, State) ->
% 查找未确认消息
Unacked = State#ch.unacked_messages,
case Multiple of
true ->
% 确认所有 <= Tag 的消息
Confirmed = [M || M <- Unacked, M#message.tag =< Tag];
false ->
% 仅确认当前 Tag
Confirmed = [M || M <- Unacked, M#message.tag =:= Tag]
end,
% 从队列移除已确认消息
rabbit_amqqueue:ack(Confirmed),
% 更新状态
{noreply, State#ch{unacked = State#ch.unacked - length(Confirmed)}}.
Java 客户端验证
XML
import com.rabbitmq.client.*;
public class MessagePathVerifier {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 开启 Confirm 追踪发布阶段
ch.confirmSelect();
// 开启 Return 追踪路由阶段
ch.addReturnListener((code, text, ex, rk, props, body) -> {
System.err.println("[路由失败] 消息未匹配到队列: " + rk);
});
// 发布消息
String message = "trace-path-message";
ch.basicPublish("test.exchange", "test.key",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 验证发布成功
boolean confirmed = ch.waitForConfirms(5000);
System.out.println("[发布阶段] Confirm: " + confirmed);
// 消费验证投递阶段
ch.basicConsume("test.queue", false, (tag, delivery) -> {
String body = new String(delivery.getBody());
System.out.println("[投递阶段] Received: " + body);
// 验证确认阶段
ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("[确认阶段] ACK sent");
}, tag -> {});
Thread.sleep(2000);
}
}
}
Maven 依赖:
text
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
关键路径总结
| 阶段 | 源码文件 | 核心函数 |
|---|---|---|
| 网络接收 | rabbit_reader.erl | main_loop/2 |
| 协议解析 | rabbit_framing.erl | parse_frame/1 |
| Channel 处理 | rabbit_channel.erl | handle_method/3 |
| 路由分发 | rabbit_exchange.erl | route/2 |
| 消息入队 | rabbit_amqqueue.erl | deliver/2 |
| 持久化 | rabbit_msg_store.erl | write/1 |
| 消费者投递 | rabbit_channel.erl | deliver_to_consumer/3 |
| ACK 处理 | rabbit_channel.erl | handle_method(basic.ack) |
注意事项
- Exchange 不存在时消息直接丢弃,不会报错,这是消息"消失"的常见原因
- 持久化消息需经历内存 + 磁盘两次写入,性能开销约为非持久化的 2~3 倍
basic.ack的multiple=true时批量确认,可减少网络往返但增加丢失风险- 消费者 prefetch 值影响消息拉取批量,过大导致单消费者积压
- 镜像队列场景下消息需同步到镜像节点,延迟取决于网络与消息大小
要点总结
- 消息处理链路分为七阶段:网络接收 → 协议解析 → Channel 处理 → 路由分发 → 消息入队 → 消费者投递 → ACK 确认
- 核心源码文件:
rabbit_reader、rabbit_channel、rabbit_exchange、rabbit_amqqueue、rabbit_msg_store - Exchange 路由匹配决定消息是否入队,匹配失败则消息丢失
- 持久化消息需写入磁盘,非持久化消息仅存储于内存
- 消费者 ACK 确认后消息才从队列中真正移除
📝 发现内容有误?点击此处直接编辑