全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 10 分钟 ✍️ juanwangdev

消息丢失排查

消息丢失是 RabbitMQ 最严重的故障。需按链路分段排查,逐步缩小范围。

排查链路

Java
生产者 → 网络 → Exchange → Queue → 消费者
  |         |        |        |        |
 确认     连接      路由     持久化    ACK
 机制     状态      规则     模式     机制

第一步:确认生产者端

检查发布确认

未启用 Confirm 模式时,生产者无法感知消息是否到达 Broker。

Java
import com.rabbitmq.client.*;

public class PublisherCheck {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {

            // 开启 Confirm 模式
            ch.confirmSelect();

            String exchange = "order.exchange";
            String routingKey = "order.created";
            String message = "{\"orderId\": 1001}";

            ch.basicPublish(exchange, routingKey,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());

            // 等待确认
            if (ch.waitForConfirms(5000)) {
                System.out.println("消息已到达 Broker");
            } else {
                System.out.println("消息未确认,可能丢失");
            }
        }
    }
}

未收到 basic.ack 的消息视为发布失败,可能是网络断开或 Exchange 不存在。

检查 Mandatory 标志

消息路由到 Queue 失败时,mandatory=true 会触发 basic.return 回调。

Bash
ch.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
    System.out.println("消息被退回: " + replyText);
    System.out.println("路由键: " + routingKey);
    // 记录或重试
});

ch.basicPublish("order.exchange", "invalid_key", true,
    MessageProperties.PERSISTENT_TEXT_PLAIN, "test".getBytes());

第二步:检查 Broker 路由

验证 Exchange 与 Queue 绑定

Java
# 查看 Exchange 绑定关系
rabbitmqctl list_bindings exchange_name source_name destination_name routing_key

# 查看 Exchange 类型
rabbitmqctl list_exchanges name type durable auto_delete

# 查看队列绑定
rabbitmqctl list_queues name messages consumers

常见路由错误

错误类型表现排查命令
Exchange 不存在消息直接丢弃rabbitmqctl list_exchanges
路由键不匹配消息未路由到 Queuerabbitmqctl list_bindings
Exchange 类型错误Fanout 忽略 routingKey检查 Exchange 类型
Binding 丢失非持久化绑定重启消失检查 durable 参数

第三步:检查队列存储

持久化配置

Bash
// 队列持久化
boolean durable = true;
ch.queueDeclare("order.queue", durable, false, false, null);

// 消息持久化
ch.basicPublish("", "order.queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());

仅队列持久化不够,消息也必须标记为 PERSISTENT,否则重启后消息丢失。

检查消息状态

Java
# 查看队列消息数
rabbitmqctl list_queues name messages_ready messages_unacknowledged

# 查看队列详情(含持久化状态)
rabbitmqctl eval 'rabbit_amqqueue:info(rabbit_amqqueue:find({resource, <<"/">>, queue, <<"order.queue">>}), [durable, policy, slave_pids]).'

第四步:检查消费者端

ACK 模式排查

Java
// 手动 ACK 模式
boolean autoAck = false;
ch.basicConsume("order.queue", autoAck, (consumerTag, delivery) -> {
    try {
        String msg = new String(delivery.getBody());
        processMessage(msg);
        ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        // 拒绝消息,不重新入队
        ch.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        // 或重新入队
        // ch.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
}, consumerTag -> {});

basicNackrequeue=false 时消息直接丢弃,需配合死信队列使用。

常见消费者端丢失场景

场景原因解决方案
自动 ACK 处理失败消息消费失败但已 ACK改用手动 ACK
NACK requeue=false消息被拒绝且不重新入队设置 requeue=true 或 DLX
消费者崩溃连接断开,消息重新入队检查心跳与重连机制
prefetch=0一次拉取过多消息积压设置合理 prefetch 值

完整排查脚本

text
import com.rabbitmq.client.*;
import java.util.concurrent.CountDownLatch;

public class MessageLossDebugger {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {

            // 1. 开启 Confirm
            ch.confirmSelect();

            // 2. 添加 Return 监听
            ch.addReturnListener((code, text, ex, rk, props, body) -> {
                System.err.printf("Return: code=%d, text=%s, key=%s%n", code, text, rk);
            });

            // 3. 发布消息
            String message = "debug-message-" + System.currentTimeMillis();
            ch.basicPublish("test.exchange", "test.key", true,
                MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

            // 4. 等待确认
            boolean confirmed = ch.waitForConfirms(5000);
            System.out.println("Confirm: " + confirmed);

            // 5. 检查队列
            AMQP.Queue.DeclareOk ok = ch.queueDeclarePassive("test.queue");
            System.out.println("Queue messages: " + ok.getMessageCount());

            // 6. 消费验证
            CountDownLatch latch = new CountDownLatch(1);
            ch.basicConsume("test.queue", false, (tag, delivery) -> {
                System.out.println("Received: " + new String(delivery.getBody()));
                ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                latch.countDown();
            }, tag -> {});

            latch.await();
        }
    }
}

注意事项

  1. 排查消息丢失必须先确认生产者是否收到 Confirm,这是链路起点
  2. Exchange 不存在或路由键错误是消息"消失"最常见原因
  3. 持久化需同时设置队列 durable=true 和消息 deliveryMode=2
  4. 消费者自动 ACK 模式下,消息在投递到消费者时即被确认,处理失败即丢失
  5. 集群场景需检查镜像队列同步状态,主节点故障可能导致未同步消息丢失

要点总结

  • 消息丢失排查遵循"生产者 → 路由 → 存储 → 消费者"四段式链路
  • 生产者端必须启用 Confirm + Return 机制感知发布状态
  • 路由阶段重点检查 Exchange 存在性、类型和 Binding 关系
  • 存储阶段确认队列和消息双重持久化配置
  • 消费者端使用手动 ACK + 死信队列避免处理失败导致丢失

📝 发现内容有误?点击此处直接编辑

← 上一篇 日志分析与解读
下一篇 → 监控指标体系
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库