全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息拒绝与重新入队

消息拒绝是消费者处理异常时的标准操作,可选择将消息丢弃或放回队列重新消费。

定义

拒绝消息指消费者收到消息后,因业务逻辑无法处理而显式通知 RabbitMQ 丢弃或重新投递该消息。RabbitMQ 提供 basicReject(单条)和 basicNack(批量)两种拒绝方式,通过 requeue 参数控制是否重新入队。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

配置示例

basicReject 单条拒绝

Java
import com.rabbitmq.client.*;

import java.io.IOException;

public class RejectConsumer {
    private static final String QUEUE_NAME = "reject_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("收到消息: " + message);

                    try {
                        processMessage(message);
                        // 处理成功,确认消息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        System.err.println("处理失败,拒绝消息: " + e.getMessage());
                        // requeue=true 重新入队
                        channel.basicReject(envelope.getDeliveryTag(), true);
                        // requeue=false 丢弃消息
                        // channel.basicReject(envelope.getDeliveryTag(), false);
                    }
                }
            });

            Thread.sleep(Long.MAX_VALUE);
        }
    }

    private static void processMessage(String message) {
        if (message.contains("error")) {
            throw new RuntimeException("模拟处理异常");
        }
        System.out.println("消息处理成功");
    }
}

basicNack 批量拒绝

Java
import com.rabbitmq.client.*;

import java.io.IOException;

public class NackConsumer {
    private static final String QUEUE_NAME = "nack_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("收到消息: " + message);

                    try {
                        processMessage(message);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        // multiple=false 仅拒绝当前消息, requeue=true 重新入队
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    }
                }
            });

            Thread.sleep(Long.MAX_VALUE);
        }
    }

    private static void processMessage(String message) {
        System.out.println("处理消息: " + message);
    }
}

死信队列配合(丢弃消息的标准做法)

Java
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadLetterConsumer {
    private static final String DLX_QUEUE = "dlx_queue";
    private static final String BUSINESS_QUEUE = "business_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明死信队列
            channel.queueDeclare(DLX_QUEUE, true, false, false, null);

            // 声明业务队列并绑定 DLX
            Map<String, Object> args_map = new HashMap<>();
            args_map.put("x-dead-letter-exchange", "");       // 死信交换机(空为默认)
            args_map.put("x-dead-letter-routing-key", DLX_QUEUE);
            channel.queueDeclare(BUSINESS_QUEUE, true, false, false, args_map);

            boolean autoAck = false;
            channel.basicConsume(BUSINESS_QUEUE, autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    try {
                        processMessage(message);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        // requeue=false 消息进入死信队列
                        channel.basicReject(envelope.getDeliveryTag(), false);
                    }
                }
            });

            Thread.sleep(Long.MAX_VALUE);
        }
    }

    private static void processMessage(String message) {
        System.out.println("处理业务消息: " + message);
    }
}

注意事项

basicReject 仅支持单条消息拒绝,basicNack 支持通过 multiple 参数批量拒绝。

requeue=true 时消息会重新回到队列尾部,若消费者持续失败,可能导致消息反复重投形成死循环。

requeue=false 时消息被丢弃,若未配置死信队列,该消息将永久丢失。

建议配合死信队列使用,将 requeue=false 的消息路由到 DLX,便于后续排查。

要点总结

  • 单条拒绝:channel.basicReject(deliveryTag, requeue),requeue=true 重新入队,false 丢弃
  • 批量拒绝:channel.basicNack(deliveryTag, multiple, requeue),功能更强支持批量操作
  • requeue=true 可能导致消息反复重投,需控制重试次数或配合重试机制
  • 生产环境建议配置死信队列,将拒绝的消息集中存储便于排查

📝 发现内容有误?点击此处直接编辑

← 上一篇 手动确认模式
下一篇 → 消息持久化配置
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库