全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 10 分钟 ✍️ juanwangdev

Firehose 消息追踪

Firehose 是 RabbitMQ 内置的全局消息追踪机制,可将所有 Publish/Deliver 事件复制到指定队列。

工作原理

Bash
生产者 → Exchange → Queue → 消费者
            |                  |
            ↓                  ↓
        amq.rabbitmq.trace  ← 所有消息副本
        (topic exchange)
            ↓
        publish.# / deliver.# 路由键

Firehose 开启后,Broker 自动将所有消息事件发布到 amq.rabbitmq.trace 交换器。

启用方式

命令行启用

Bash
# 开启 Publish 追踪
rabbitmqctl trace_on

# 开启所有事件(Publish + Deliver)
rabbitmqctl trace_on -p /

# 关闭追踪
rabbitmqctl trace_off

HTTP API 启用

Java
curl -X PUT -u guest:guest \
  http://localhost:15672/api/traces/%2f/amq.rabbitmq.trace \
  -H "Content-Type: application/json" \
  -d '{"format":"json","ack":true}'

Java 客户端消费追踪队列

XML
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;

public class FirehoseConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {

            // 声明追踪队列
            String queue = "firehose.trace.queue";
            ch.queueDeclare(queue, true, false, false, null);

            // 绑定到 trace exchange
            // publish.# 捕获所有发布事件
            // deliver.# 捕获所有投递事件
            ch.queueBind(queue, "amq.rabbitmq.trace", "publish.#");
            ch.queueBind(queue, "amq.rabbitmq.trace", "deliver.#");

            // 消费追踪消息
            ch.basicConsume(queue, true, (consumerTag, delivery) -> {
                String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Trace Event: " + body);
            }, consumerTag -> {});

            System.out.println("Firehose consumer started, waiting for events...");
            Thread.currentThread().join();
        }
    }
}

Maven 依赖:

JSON
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

追踪事件格式

Firehose 输出的消息体为 JSON 格式,包含以下字段:

Publish 事件

JSON
{
  "type": "publish",
  "timestamp": 1716364800000,
  "connection": "127.0.0.1:5672 -> 127.0.0.1:49152",
  "virtual_host": "/",
  "node": "rabbit@node1",
  "channel": 1,
  "exchange": "order.exchange",
  "routing_key": "order.created",
  "payload": "{\"orderId\": 1001}",
  "payload_encoding": "string",
  "properties": {
    "delivery_mode": 2,
    "message_id": "msg-001",
    "headers": {}
  }
}

Deliver 事件

Java
{
  "type": "deliver",
  "timestamp": 1716364800100,
  "connection": "127.0.0.1:5672 -> 127.0.0.1:49153",
  "virtual_host": "/",
  "node": "rabbit@node1",
  "channel": 1,
  "queue": "order.queue",
  "consumer_tag": "amq.ctag-xxx",
  "redelivered": false,
  "properties": {
    "delivery_mode": 2,
    "message_id": "msg-001"
  }
}

路由键规则

路由键模式匹配事件说明
publish.#所有发布事件生产者发送消息到 Exchange
deliver.#所有投递事件Broker 投递消息到消费者
publish.exchange_name.#指定 Exchange 发布按 Exchange 过滤
deliver.queue_name.#指定 Queue 投递按 Queue 过滤

按需过滤

仅追踪特定 Exchange 的消息:

Java
// 只绑定特定 Exchange 的追踪
ch.queueBind(queue, "amq.rabbitmq.trace", "publish.order.exchange.#");

仅追踪特定 Queue 的投递:

Java
// 只绑定特定 Queue 的投递事件
ch.queueBind(queue, "amq.rabbitmq.trace", "deliver.order.queue.#");

追踪数据解析

text
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class FirehoseParser {

    private static final ObjectMapper MAPPER = new ObjectMapper();

    public static void parseTraceEvent(String json) throws Exception {
        JsonNode event = MAPPER.readTree(json);
        String type = event.get("type").asText();

        switch (type) {
            case "publish" -> {
                String exchange = event.get("exchange").asText();
                String routingKey = event.get("routing_key").asText();
                String payload = event.get("payload").asText();
                System.out.printf("PUBLISH: exchange=%s, key=%s, payload=%s%n",
                    exchange, routingKey, payload);
            }
            case "deliver" -> {
                String queue = event.get("queue").asText();
                boolean redelivered = event.get("redelivered").asBoolean();
                System.out.printf("DELIVER: queue=%s, redelivered=%b%n",
                    queue, redelivered);
            }
        }
    }
}

注意事项

  1. Firehose 会将所有消息副本复制到 trace exchange,高吞吐场景会产生大量数据,磁盘和 IO 压力显著
  2. 生产环境不建议长期开启,仅用于问题排查时段开启,排查完立即关闭
  3. 追踪队列必须设置合理的 TTL 或最大长度策略,避免无限积压
  4. Firehose 不保证事件顺序,publish 和 deliver 事件可能乱序到达
  5. 消息体较大时建议关闭 payload 记录,仅保留 headers 和 routing 信息

要点总结

  • Firehose 是 RabbitMQ 内置的全局消息追踪机制,复制所有 Publish/Deliver 事件
  • 通过 rabbitmqctl trace_on 开启,事件发布到 amq.rabbitmq.trace 交换器
  • 路由键 publish.#deliver.# 分别捕获发布和投递事件
  • 高吞吐场景不建议长期开启,仅用于临时排查
  • 追踪队列需设置 TTL 或长度限制,避免磁盘耗尽

📝 发现内容有误?点击此处直接编辑

← 上一篇 认证机制
下一篇 → Prometheus 集成
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库