全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 10 分钟 ✍️ juanwangdev

消息路由流程

RabbitMQ 消息路由是消息从生产到消费的全链路流转过程,核心组件为交换器与绑定关系。

路由架构概览

Java
生产者 → Channel → 交换器 → 绑定匹配 → 队列 → 消费者

路由流程关键步骤:

  1. 生产者发布消息到交换器
  2. 交换器根据路由键匹配绑定的队列
  3. 消息复制到匹配的队列
  4. 队列将消息推送给订阅的消费者

交换器类型与匹配规则

交换器类型匹配规则适用场景
Direct精确匹配路由键点对点、单播
Fanout忽略路由键,广播所有绑定队列广播通知
Topic通配符匹配 *#多主题订阅
Headers匹配消息头属性复杂条件路由

Direct 交换器

Java
import com.rabbitmq.client.*;

public class DirectRoutingExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            // 声明 direct 交换器
            ch.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
            
            // 声明队列
            ch.queueDeclare("order.queue", true, false, false, null);
            ch.queueDeclare("refund.queue", true, false, false, null);
            
            // 绑定:路由键精确匹配
            ch.queueBind("order.queue", "direct.exchange", "order.created");
            ch.queueBind("refund.queue", "direct.exchange", "order.refund");
            
            // 发布消息,路由键为 "order.created"
            ch.basicPublish("direct.exchange", "order.created",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                "New order created".getBytes());
            
            // 消息只路由到 order.queue
            System.out.println("消息已路由到 order.queue");
        }
    }
}

Topic 交换器

Java
import com.rabbitmq.client.*;

public class TopicRoutingExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            ch.exchangeDeclare("topic.exchange", BuiltinExchangeType.TOPIC, true);
            
            ch.queueDeclare("us.orders", true, false, false, null);
            ch.queueDeclare("all.orders", true, false, false, null);
            
            // 通配符绑定
            ch.queueBind("us.orders", "topic.exchange", "us.order.*");
            ch.queueBind("all.orders", "topic.exchange", "#.order.*");
            
            // 路由键 "us.order.created" 匹配两个队列
            ch.basicPublish("topic.exchange", "us.order.created",
                null, "Order from US".getBytes());
            
            // 路由键 "eu.order.created" 只匹配 all.orders
            ch.basicPublish("topic.exchange", "eu.order.created",
                null, "Order from EU".getBytes());
        }
    }
}

路由匹配算法

匹配流程

Java
消息到达交换器 → 获取路由键 → 遍历绑定关系 → 执行匹配规则 → 匹配队列列表 → 消息复制
  1. 交换器收到消息,提取路由键
  2. 遍历该交换器的所有绑定关系
  3. 根据交换器类型执行匹配:
    • Direct:routing_key == binding_key
    • Topic:通配符模式匹配
    • Fanout:直接匹配所有队列
    • Headers:比较消息头与绑定头
  4. 收集所有匹配队列
  5. 消息副本写入每个匹配队列

Topic 交换器使用通配符 * 匹配一个单词,# 匹配零个或多个单词,匹配从左到右贪心进行。

队列分发机制

推送模式

队列将消息投递给消费者有两种方式:

模式触发方式适用场景
PushBroker 主动推送消费者处理能力稳定
Pull消费者主动拉取(basic.get按需获取消息

Push 模式示例

Java
import com.rabbitmq.client.*;

public class PushConsumeExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            ch.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
            ch.queueDeclare("push.queue", true, false, false, null);
            ch.queueBind("push.queue", "direct.exchange", "push.key");
            
            // Broker 自动推送消息到消费者
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String msg = new String(delivery.getBody(), "UTF-8");
                System.out.println("收到推送消息: " + msg);
                ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            
            // basicQos 控制预取数量
            ch.basicQos(10);
            ch.basicConsume("push.queue", false, deliverCallback, consumerTag -> {});
            
            System.out.println("等待推送...");
            Thread.sleep(30000);
        }
    }
}

Pull 模式示例

text
import com.rabbitmq.client.*;
import java.io.IOException;

public class PullConsumeExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            ch.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, true);
            ch.queueDeclare("pull.queue", true, false, false, null);
            ch.queueBind("pull.queue", "direct.exchange", "pull.key");
            
            // 主动拉取消息
            GetResponse response = ch.basicGet("pull.queue", false);
            if (response != null) {
                String msg = new String(response.getBody(), "UTF-8");
                System.out.println("拉取消息: " + msg);
                ch.basicAck(response.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("队列为空");
            }
        }
    }
}

路由失败处理

不可路由消息

当消息无法匹配到任何队列时:

  1. 消息被丢弃(默认行为)
  2. 如果设置了 mandatory=true,返回生产者
  3. 如果配置了备用交换器(Alternate Exchange),转发到备用交换器
text
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;

public class UnroutableMessageExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            // 声明备用交换器
            ch.exchangeDeclare("ae.exchange", BuiltinExchangeType.FANOUT, true);
            ch.queueDeclare("unrouted.queue", true, false, false, null);
            ch.queueBind("unrouted.queue", "ae.exchange", "");
            
            // 主交换器配置备用交换器
            Map<String, Object> args_map = new HashMap<>();
            args_map.put("alternate-exchange", "ae.exchange");
            ch.exchangeDeclare("main.exchange", BuiltinExchangeType.DIRECT, true, false, args_map);
            
            // 发布不可路由的消息,将转发到 ae.exchange
            ch.basicPublish("main.exchange", "nonexistent.key",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                "Unroutable message".getBytes());
            
            System.out.println("不可路由消息已转发到备用交换器");
        }
    }
}

如果 mandatory=true 且消息不可路由,Broker 会通过 ReturnListener 返回给生产者。

注意事项

消息路由匹配是同步操作,大量绑定关系会增加匹配延迟,建议单个交换器绑定队列数 ≤ 1000。

Topic 交换器通配符匹配有性能开销,# 通配符比 * 开销更大,路由键应尽量简洁。

消息路由失败时默认丢弃,生产环境应配置备用交换器或启用 mandatory 模式防止消息丢失。

要点总结

  • 消息路由流程:生产者 → 交换器 → 绑定匹配 → 队列 → 消费者
  • Direct 交换器精确匹配,Fanout 广播所有,Topic 通配符匹配
  • 路由匹配算法遍历绑定关系,根据交换器类型执行匹配规则
  • 队列分发支持 Push(Broker 推送)和 Pull(消费者拉取)两种模式
  • 不可路由消息默认丢弃,可配置备用交换器或 mandatory 模式
  • 单个交换器绑定队列数建议 ≤ 1000,避免匹配延迟

文章存放路径:D:\git2\jwdev\articles\RABBITMQ\专家\底层原理与架构\消息路由流程.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 消息存储机制
下一篇 → 调度器与进程模型
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库