全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

端到端可靠性

消息从生产者到消费者的完整链路中,任何环节异常都可能导致消息丢失。组合多种可靠性机制可实现消息端到端不丢失。

消息丢失环节分析

环节可能丢失原因防护机制
生产端 → 交换机网络异常、交换机不存在Publisher Confirm
交换机 → 队列路由失败、无匹配队列mandatory + Return 机制
队列存储RabbitMQ 重启、节点故障消息持久化
队列 → 消费者消费异常、未确认断开手动确认 + 重试

完整方案实现

生产端:Confirm + 持久化

Java
// Maven 依赖
// <dependency>
//     <groupId>com.rabbitmq</groupId>
//     <artifactId>amqp-client</artifactId>
//     <version>5.20.0</version>
// </dependency>

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class EndToEndReliability {
    
    private static final String EXCHANGE_NAME = "reliable_exchange";
    private static final String QUEUE_NAME = "reliable_queue";
    private static final String ROUTING_KEY = "reliable.key";
    private static final String DLX_EXCHANGE = "dlx_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // ========== 1. 声明持久化交换机和队列 ==========
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // durable=true
            channel.queueDeclare(QUEUE_NAME, true, false, false, null); // durable=true
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            
            // ========== 2. 开启 Confirm 模式 ==========
            channel.confirmSelect();
            
            // 维护未确认消息
            ConcurrentNavigableMap<Long, String> outstandingConfirms = 
                    new ConcurrentSkipListMap<>();
            
            // 注册异步 Confirm 回调
            channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    if (multiple) {
                        outstandingConfirms.headMap(deliveryTag, true).clear();
                    } else {
                        outstandingConfirms.remove(deliveryTag);
                    }
                    System.out.println("[Confirm] 消息已确认, tag: " + deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    if (multiple) {
                        var failed = outstandingConfirms.headMap(deliveryTag, true);
                        System.err.println("[Confirm] 批量消息未确认: " + failed.values());
                        failed.clear();
                    } else {
                        String msg = outstandingConfirms.get(deliveryTag);
                        System.err.println("[Confirm] 消息未确认,需重发: " + msg);
                        outstandingConfirms.remove(deliveryTag);
                    }
                }
            );
            
            // 注册 Return 回调
            channel.addReturnListener((replyCode, replyText, exchange, 
                    routingKey, properties, body) -> {
                System.err.println("[Return] 消息未路由: " + routingKey);
            });
            
            // ========== 3. 发送持久化消息 ==========
            String message = "Reliable Message";
            String messageId = UUID.randomUUID().toString();
            
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .messageId(messageId)
                    .deliveryMode(2)  // 2 = 持久化, 1 = 非持久化
                    .build();
            
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, props,
                    message.getBytes(StandardCharsets.UTF_8));
            outstandingConfirms.put(nextSeqNo, message);
            
            System.out.println("消息已发送: " + message + ", messageId: " + messageId);
            
            // 等待确认
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

消费端:手动确认 + 失败重试 + 死信

Java
public class ReliableConsumer {
    
    private static final int MAX_RETRY = 3;
    
    public static void consume() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 声明死信交换机
        channel.exchangeDeclare("dlx_exchange", "direct", true);
        channel.queueDeclare("dlx_queue", true, false, false, null);
        channel.queueBind("dlx_queue", "dlx_exchange", "dlx.route");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String messageId = delivery.getProperties().getMessageId();
            
            try {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                
                // 检查是否重复消费
                if (isDuplicate(messageId)) {
                    System.out.println("重复消息,跳过: " + messageId);
                    channel.basicAck(deliveryTag, false);
                    return;
                }
                
                // 处理消息
                processMessage(message);
                
                // 处理成功,确认消息
                markProcessed(messageId);
                channel.basicAck(deliveryTag, false);
                System.out.println("消息已确认: " + messageId);
                
            } catch (Exception e) {
                int retryCount = getRetryCount(delivery);
                
                if (retryCount < MAX_RETRY) {
                    // 重试:更新重试次数后重新入队
                    retryCount++;
                    republishWithRetry(channel, delivery, retryCount);
                    channel.basicAck(deliveryTag, false);
                } else {
                    // 超过最大重试次数,拒绝消息进入死信
                    channel.basicNack(deliveryTag, false, false);
                    System.err.println("消息进入死信队列: " + messageId);
                }
            }
        };
        
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }
    
    private static int getRetryCount(Delivery delivery) {
        Object count = delivery.getProperties().getHeaders().get("x-retry-count");
        return count != null ? (Integer) count : 0;
    }
    
    private static void republishWithRetry(Channel channel, Delivery delivery, 
            int retryCount) throws IOException {
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .messageId(delivery.getProperties().getMessageId())
                .deliveryMode(2)
                .headers(java.util.Map.of("x-retry-count", retryCount))
                .build();
        channel.basicPublish("reliable_exchange", "reliable.key", props, delivery.getBody());
    }
    
    private static boolean isDuplicate(String messageId) {
        // 查询去重存储
        return false;
    }
    
    private static void markProcessed(String messageId) {
        // 标记已处理
    }
    
    private static void processMessage(String message) {
        // 业务处理
    }
}

队列持久化配置

声明持久化队列与死信

Java
public class DurableQueueSetup {
    
    public static void setup(Channel channel) throws IOException {
        // 声明死信交换机
        channel.exchangeDeclare("dlx_exchange", "direct", true);
        channel.queueDeclare("dlx_queue", true, false, false, null);
        channel.queueBind("dlx_queue", "dlx_exchange", "dlx.route");
        
        // 声明主队列,绑定死信交换机
        java.util.Map<String, Object> args = new java.util.HashMap<>();
        args.put("x-dead-letter-exchange", "dlx_exchange");  // 死信交换机
        args.put("x-dead-letter-routing-key", "dlx.route");  // 死信路由键
        args.put("x-message-ttl", 60000);  // 消息 TTL(可选)
        
        channel.queueDeclare("main_queue", true, false, false, args);
        channel.queueBind("main_queue", "reliable_exchange", "reliable.key");
    }
}

注意事项

  1. 消息持久化(deliveryMode=2)需配合交换机和队列的 durable=true,仅消息持久化无效。
  2. Publisher Confirm 异步模式下需维护未确认消息集合,连接断开时需重发。
  3. 手动确认后若消费者未确认即断开,消息会自动重新入队。
  4. 死信队列需提前声明并绑定,消息被 nack(requeue=false) 或 TTL 过期后自动转入。
  5. 端到端可靠性 = 发布确认 + 消息持久化 + 手动确认 + 死信队列,缺一不可。
  6. 持久化会降低吞吐量,高吞吐场景可权衡可靠性与性能,选择部分关键消息持久化。

要点总结

  • 端到端可靠性需覆盖生产端 → 交换机 → 队列 → 消费端全链路
  • 生产端使用 Publisher Confirm 确保消息到达交换机
  • 消息设置 deliveryMode=2 实现持久化,防止 RabbitMQ 重启丢失
  • 消费端使用手动确认 + 重试机制,失败消息转入死信队列
  • 交换机、队列、消息均需设置 durable 属性才能完整持久化

📝 发现内容有误?点击此处直接编辑

← 上一篇 消费端可靠性
下一篇 → TTL 消息过期
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库