消息拒绝与重新入队
消息拒绝是消费者处理异常时的标准操作,可选择将消息丢弃或放回队列重新消费。
定义
拒绝消息指消费者收到消息后,因业务逻辑无法处理而显式通知 RabbitMQ 丢弃或重新投递该消息。RabbitMQ 提供 basicReject(单条)和 basicNack(批量)两种拒绝方式,通过 requeue 参数控制是否重新入队。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
配置示例
basicReject 单条拒绝
Java
import com.rabbitmq.client.*;
import java.io.IOException;
public class RejectConsumer {
private static final String QUEUE_NAME = "reject_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息: " + message);
try {
processMessage(message);
// 处理成功,确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("处理失败,拒绝消息: " + e.getMessage());
// requeue=true 重新入队
channel.basicReject(envelope.getDeliveryTag(), true);
// requeue=false 丢弃消息
// channel.basicReject(envelope.getDeliveryTag(), false);
}
}
});
Thread.sleep(Long.MAX_VALUE);
}
}
private static void processMessage(String message) {
if (message.contains("error")) {
throw new RuntimeException("模拟处理异常");
}
System.out.println("消息处理成功");
}
}
basicNack 批量拒绝
Java
import com.rabbitmq.client.*;
import java.io.IOException;
public class NackConsumer {
private static final String QUEUE_NAME = "nack_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息: " + message);
try {
processMessage(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// multiple=false 仅拒绝当前消息, requeue=true 重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
Thread.sleep(Long.MAX_VALUE);
}
}
private static void processMessage(String message) {
System.out.println("处理消息: " + message);
}
}
死信队列配合(丢弃消息的标准做法)
Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DeadLetterConsumer {
private static final String DLX_QUEUE = "dlx_queue";
private static final String BUSINESS_QUEUE = "business_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明死信队列
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
// 声明业务队列并绑定 DLX
Map<String, Object> args_map = new HashMap<>();
args_map.put("x-dead-letter-exchange", ""); // 死信交换机(空为默认)
args_map.put("x-dead-letter-routing-key", DLX_QUEUE);
channel.queueDeclare(BUSINESS_QUEUE, true, false, false, args_map);
boolean autoAck = false;
channel.basicConsume(BUSINESS_QUEUE, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
processMessage(message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// requeue=false 消息进入死信队列
channel.basicReject(envelope.getDeliveryTag(), false);
}
}
});
Thread.sleep(Long.MAX_VALUE);
}
}
private static void processMessage(String message) {
System.out.println("处理业务消息: " + message);
}
}
注意事项
basicReject仅支持单条消息拒绝,basicNack支持通过 multiple 参数批量拒绝。
requeue=true时消息会重新回到队列尾部,若消费者持续失败,可能导致消息反复重投形成死循环。
requeue=false时消息被丢弃,若未配置死信队列,该消息将永久丢失。
建议配合死信队列使用,将
requeue=false的消息路由到 DLX,便于后续排查。
要点总结
- 单条拒绝:
channel.basicReject(deliveryTag, requeue),requeue=true 重新入队,false 丢弃 - 批量拒绝:
channel.basicNack(deliveryTag, multiple, requeue),功能更强支持批量操作 - requeue=true 可能导致消息反复重投,需控制重试次数或配合重试机制
- 生产环境建议配置死信队列,将拒绝的消息集中存储便于排查
📝 发现内容有误?点击此处直接编辑