Firehose 消息追踪
Firehose 是 RabbitMQ 内置的全局消息追踪机制,可将所有 Publish/Deliver 事件复制到指定队列。
工作原理
Bash
生产者 → Exchange → Queue → 消费者
| |
↓ ↓
amq.rabbitmq.trace ← 所有消息副本
(topic exchange)
↓
publish.# / deliver.# 路由键
Firehose 开启后,Broker 自动将所有消息事件发布到 amq.rabbitmq.trace 交换器。
启用方式
命令行启用
Bash
# 开启 Publish 追踪
rabbitmqctl trace_on
# 开启所有事件(Publish + Deliver)
rabbitmqctl trace_on -p /
# 关闭追踪
rabbitmqctl trace_off
HTTP API 启用
Java
curl -X PUT -u guest:guest \
http://localhost:15672/api/traces/%2f/amq.rabbitmq.trace \
-H "Content-Type: application/json" \
-d '{"format":"json","ack":true}'
Java 客户端消费追踪队列
XML
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class FirehoseConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 声明追踪队列
String queue = "firehose.trace.queue";
ch.queueDeclare(queue, true, false, false, null);
// 绑定到 trace exchange
// publish.# 捕获所有发布事件
// deliver.# 捕获所有投递事件
ch.queueBind(queue, "amq.rabbitmq.trace", "publish.#");
ch.queueBind(queue, "amq.rabbitmq.trace", "deliver.#");
// 消费追踪消息
ch.basicConsume(queue, true, (consumerTag, delivery) -> {
String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Trace Event: " + body);
}, consumerTag -> {});
System.out.println("Firehose consumer started, waiting for events...");
Thread.currentThread().join();
}
}
}
Maven 依赖:
JSON
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
追踪事件格式
Firehose 输出的消息体为 JSON 格式,包含以下字段:
Publish 事件
JSON
{
"type": "publish",
"timestamp": 1716364800000,
"connection": "127.0.0.1:5672 -> 127.0.0.1:49152",
"virtual_host": "/",
"node": "rabbit@node1",
"channel": 1,
"exchange": "order.exchange",
"routing_key": "order.created",
"payload": "{\"orderId\": 1001}",
"payload_encoding": "string",
"properties": {
"delivery_mode": 2,
"message_id": "msg-001",
"headers": {}
}
}
Deliver 事件
Java
{
"type": "deliver",
"timestamp": 1716364800100,
"connection": "127.0.0.1:5672 -> 127.0.0.1:49153",
"virtual_host": "/",
"node": "rabbit@node1",
"channel": 1,
"queue": "order.queue",
"consumer_tag": "amq.ctag-xxx",
"redelivered": false,
"properties": {
"delivery_mode": 2,
"message_id": "msg-001"
}
}
路由键规则
| 路由键模式 | 匹配事件 | 说明 |
|---|---|---|
publish.# | 所有发布事件 | 生产者发送消息到 Exchange |
deliver.# | 所有投递事件 | Broker 投递消息到消费者 |
publish.exchange_name.# | 指定 Exchange 发布 | 按 Exchange 过滤 |
deliver.queue_name.# | 指定 Queue 投递 | 按 Queue 过滤 |
按需过滤
仅追踪特定 Exchange 的消息:
Java
// 只绑定特定 Exchange 的追踪
ch.queueBind(queue, "amq.rabbitmq.trace", "publish.order.exchange.#");
仅追踪特定 Queue 的投递:
Java
// 只绑定特定 Queue 的投递事件
ch.queueBind(queue, "amq.rabbitmq.trace", "deliver.order.queue.#");
追踪数据解析
text
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class FirehoseParser {
private static final ObjectMapper MAPPER = new ObjectMapper();
public static void parseTraceEvent(String json) throws Exception {
JsonNode event = MAPPER.readTree(json);
String type = event.get("type").asText();
switch (type) {
case "publish" -> {
String exchange = event.get("exchange").asText();
String routingKey = event.get("routing_key").asText();
String payload = event.get("payload").asText();
System.out.printf("PUBLISH: exchange=%s, key=%s, payload=%s%n",
exchange, routingKey, payload);
}
case "deliver" -> {
String queue = event.get("queue").asText();
boolean redelivered = event.get("redelivered").asBoolean();
System.out.printf("DELIVER: queue=%s, redelivered=%b%n",
queue, redelivered);
}
}
}
}
注意事项
- Firehose 会将所有消息副本复制到 trace exchange,高吞吐场景会产生大量数据,磁盘和 IO 压力显著
- 生产环境不建议长期开启,仅用于问题排查时段开启,排查完立即关闭
- 追踪队列必须设置合理的 TTL 或最大长度策略,避免无限积压
- Firehose 不保证事件顺序,publish 和 deliver 事件可能乱序到达
- 消息体较大时建议关闭 payload 记录,仅保留 headers 和 routing 信息
要点总结
- Firehose 是 RabbitMQ 内置的全局消息追踪机制,复制所有 Publish/Deliver 事件
- 通过
rabbitmqctl trace_on开启,事件发布到amq.rabbitmq.trace交换器 - 路由键
publish.#和deliver.#分别捕获发布和投递事件 - 高吞吐场景不建议长期开启,仅用于临时排查
- 追踪队列需设置 TTL 或长度限制,避免磁盘耗尽
📝 发现内容有误?点击此处直接编辑