消息路由流程
RabbitMQ 消息路由是消息从生产到消费的全链路流转过程,核心组件为交换器与绑定关系。
路由架构概览
Java
生产者 → Channel → 交换器 → 绑定匹配 → 队列 → 消费者
路由流程关键步骤:
- 生产者发布消息到交换器
- 交换器根据路由键匹配绑定的队列
- 消息复制到匹配的队列
- 队列将消息推送给订阅的消费者
交换器类型与匹配规则
| 交换器类型 | 匹配规则 | 适用场景 |
|---|---|---|
| Direct | 精确匹配路由键 | 点对点、单播 |
| Fanout | 忽略路由键,广播所有绑定队列 | 广播通知 |
| Topic | 通配符匹配 * 和 # | 多主题订阅 |
| Headers | 匹配消息头属性 | 复杂条件路由 |
Direct 交换器
Java
import com.rabbitmq.client.*;
public class DirectRoutingExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 声明 direct 交换器
ch.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
// 声明队列
ch.queueDeclare("order.queue", true, false, false, null);
ch.queueDeclare("refund.queue", true, false, false, null);
// 绑定:路由键精确匹配
ch.queueBind("order.queue", "direct.exchange", "order.created");
ch.queueBind("refund.queue", "direct.exchange", "order.refund");
// 发布消息,路由键为 "order.created"
ch.basicPublish("direct.exchange", "order.created",
MessageProperties.PERSISTENT_TEXT_PLAIN,
"New order created".getBytes());
// 消息只路由到 order.queue
System.out.println("消息已路由到 order.queue");
}
}
}
Topic 交换器
Java
import com.rabbitmq.client.*;
public class TopicRoutingExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
ch.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC, true);
ch.queueDeclare("us.orders", true, false, false, null);
ch.queueDeclare("all.orders", true, false, false, null);
// 通配符绑定
ch.queueBind("us.orders", "topic.exchange", "us.order.*");
ch.queueBind("all.orders", "topic.exchange", "#.order.*");
// 路由键 "us.order.created" 匹配两个队列
ch.basicPublish("topic.exchange", "us.order.created",
null, "Order from US".getBytes());
// 路由键 "eu.order.created" 只匹配 all.orders
ch.basicPublish("topic.exchange", "eu.order.created",
null, "Order from EU".getBytes());
}
}
}
路由匹配算法
匹配流程
Java
消息到达交换器 → 获取路由键 → 遍历绑定关系 → 执行匹配规则 → 匹配队列列表 → 消息复制
- 交换器收到消息,提取路由键
- 遍历该交换器的所有绑定关系
- 根据交换器类型执行匹配:
- Direct:
routing_key == binding_key - Topic:通配符模式匹配
- Fanout:直接匹配所有队列
- Headers:比较消息头与绑定头
- Direct:
- 收集所有匹配队列
- 消息副本写入每个匹配队列
Topic 交换器使用通配符
*匹配一个单词,#匹配零个或多个单词,匹配从左到右贪心进行。
队列分发机制
推送模式
队列将消息投递给消费者有两种方式:
| 模式 | 触发方式 | 适用场景 |
|---|---|---|
| Push | Broker 主动推送 | 消费者处理能力稳定 |
| Pull | 消费者主动拉取(basic.get) | 按需获取消息 |
Push 模式示例
Java
import com.rabbitmq.client.*;
public class PushConsumeExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
ch.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
ch.queueDeclare("push.queue", true, false, false, null);
ch.queueBind("push.queue", "direct.exchange", "push.key");
// Broker 自动推送消息到消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("收到推送消息: " + msg);
ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// basicQos 控制预取数量
ch.basicQos(10);
ch.basicConsume("push.queue", false, deliverCallback, consumerTag -> {});
System.out.println("等待推送...");
Thread.sleep(30000);
}
}
}
Pull 模式示例
text
import com.rabbitmq.client.*;
import java.io.IOException;
public class PullConsumeExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
ch.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
ch.queueDeclare("pull.queue", true, false, false, null);
ch.queueBind("pull.queue", "direct.exchange", "pull.key");
// 主动拉取消息
GetResponse response = ch.basicGet("pull.queue", false);
if (response != null) {
String msg = new String(response.getBody(), "UTF-8");
System.out.println("拉取消息: " + msg);
ch.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("队列为空");
}
}
}
}
路由失败处理
不可路由消息
当消息无法匹配到任何队列时:
- 消息被丢弃(默认行为)
- 如果设置了
mandatory=true,返回生产者 - 如果配置了备用交换器(Alternate Exchange),转发到备用交换器
text
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class UnroutableMessageExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 声明备用交换器
ch.exchangeDeclare("ae.exchange", BuiltinExchangeType.FANOUT, true);
ch.queueDeclare("unrouted.queue", true, false, false, null);
ch.queueBind("unrouted.queue", "ae.exchange", "");
// 主交换器配置备用交换器
Map<String, Object> args_map = new HashMap<>();
args_map.put("alternate-exchange", "ae.exchange");
ch.exchangeDeclare("main.exchange", BuiltinExchangeType.DIRECT, true, false, args_map);
// 发布不可路由的消息,将转发到 ae.exchange
ch.basicPublish("main.exchange", "nonexistent.key",
MessageProperties.PERSISTENT_TEXT_PLAIN,
"Unroutable message".getBytes());
System.out.println("不可路由消息已转发到备用交换器");
}
}
}
如果
mandatory=true且消息不可路由,Broker 会通过ReturnListener返回给生产者。
注意事项
消息路由匹配是同步操作,大量绑定关系会增加匹配延迟,建议单个交换器绑定队列数 ≤ 1000。
Topic 交换器通配符匹配有性能开销,
#通配符比*开销更大,路由键应尽量简洁。
消息路由失败时默认丢弃,生产环境应配置备用交换器或启用
mandatory模式防止消息丢失。
要点总结
- 消息路由流程:生产者 → 交换器 → 绑定匹配 → 队列 → 消费者
- Direct 交换器精确匹配,Fanout 广播所有,Topic 通配符匹配
- 路由匹配算法遍历绑定关系,根据交换器类型执行匹配规则
- 队列分发支持 Push(Broker 推送)和 Pull(消费者拉取)两种模式
- 不可路由消息默认丢弃,可配置备用交换器或 mandatory 模式
- 单个交换器绑定队列数建议 ≤ 1000,避免匹配延迟
文章存放路径:D:\git2\jwdev\articles\RABBITMQ\专家\底层原理与架构\消息路由流程.md
📝 发现内容有误?点击此处直接编辑