Topic 交换机
Topic交换机通过通配符模式匹配路由键,实现基于主题结构的灵活消息路由。
定义
Topic交换机将消息路由到绑定队列中路由键模式匹配的队列。支持通配符*(匹配一个单词)和#(匹配零个或多个单词),路由键以点号.分隔形成层级结构。
核心机制
通配符规则
*:匹配一个单词段,如stock.*匹配stock.usd但不匹配stock.usd.nyse#:匹配零个或多个单词段,如stock.#匹配stock.usd、stock.usd.nyse- 路由键格式:以点号分隔的单词序列,如
order.created.success
声明与绑定示例
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicExchangeExample {
private static final String EXCHANGE_NAME = "topic_orders";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明Topic交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// 声明队列
channel.queueDeclare("queue_all_orders", true, false, false, null);
channel.queueDeclare("queue_us_orders", true, false, false, null);
channel.queueDeclare("queue_critical", true, false, false, null);
// 绑定队列与通配符路由键
// 接收所有订单消息
channel.queueBind("queue_all_orders", EXCHANGE_NAME, "order.*");
// 仅接收美国订单
channel.queueBind("queue_us_orders", EXCHANGE_NAME, "order.us.*");
// 接收所有紧急订单
channel.queueBind("queue_critical", EXCHANGE_NAME, "#.critical");
// 发布消息
String[] routingKeys = {
"order.us.created",
"order.eu.created",
"order.us.critical",
"payment.critical.failed"
};
for (String routingKey : routingKeys) {
String message = "Message with routing key: " + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null,
message.getBytes("UTF-8"));
System.out.println("Sent: " + routingKey);
}
}
}
}
消费者示例
Java
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicConsumer {
private static final String EXCHANGE_NAME = "topic_orders";
private static final String QUEUE_NAME = "queue_us_orders";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定通配符路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.us.*");
System.out.println("Waiting for US order messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("Received [" + routingKey + "]: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
注意事项
- 路由键单词段数建议不超过10个,避免模式匹配性能下降
#通配符匹配零个或多个单词,#.critical可匹配critical本身- Topic交换机匹配性能略低于Direct交换机,需权衡灵活性与性能
- 路由键设计应具有明确层级结构,如
区域.业务.操作.状态- 不支持正则表达式,仅支持
*和#两种通配符
要点总结
- Topic交换机通过
*和#通配符实现灵活的模式匹配路由 *匹配一个单词段,#匹配零个或多个单词段- 适用于基于主题分类、多条件过滤的消息路由场景
- 路由键设计应遵循层级结构规范,便于通配符匹配
- 性能略低于Direct交换机,但提供更高的路由灵活性
文章存放路径:D:\git2\jwdev\articles\RABBITMQ\进阶\交换机类型详解\Topic 交换机.md
📝 发现内容有误?点击此处直接编辑