死信消息处理
死信队列中积累了无法被正常消费的消息,需要建立监控、分析和补偿机制。
监控死信队列
通过 HTTP API 或管理插件监控死信队列消息堆积情况:
Bash
# 查看死信队列消息数量
rabbitmqctl list_queues name messages | grep dlx
# 或通过 Management API
curl -u guest:guest http://localhost:15672/api/queues/%2F/dlx.queue
分析死信原因
死信消息的 headers 中携带 x-death 信息,可用于排查原因。
Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DlxAnalysisExample {
private static final String DLX_QUEUE = "dlx.queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.basicConsume(DLX_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
// 获取 x-death 信息
Map<String, Object> headers = properties.getHeaders();
if (headers != null && headers.containsKey("x-death")) {
List<Map<String, Object>> xDeaths =
(List<Map<String, Object>>) headers.get("x-death");
for (Map<String, Object> xDeath : xDeaths) {
String reason = (String) xDeath.get("reason");
String queue = (String) xDeath.get("queue");
Long count = (Long) xDeath.get("count");
System.out.println("死信分析:");
System.out.println(" 原因: " + reason);
System.out.println(" 原队列: " + queue);
System.out.println(" 死信次数: " + count);
System.out.println(" 消息内容: " + message);
}
} else {
System.out.println("无 x-death 信息: " + message);
}
// 手动确认,避免消息再次进入死信循环
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.out.println("死信监控消费者已启动");
}
}
}
死信补偿处理
根据不同死因实现补偿策略:
Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DlxCompensationExample {
private static final String DLX_QUEUE = "dlx.queue";
private static final String RETRY_EXCHANGE = "retry.exchange";
private static final String MANUAL_EXCHANGE = "manual.review.exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明重试交换机和人工审核交换机
channel.exchangeDeclare(RETRY_EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.exchangeDeclare(MANUAL_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 消费死信队列,根据原因路由到不同处理队列
channel.basicConsume(DLX_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
Map<String, Object> headers = properties.getHeaders();
String reason = extractReason(headers);
if ("rejected".equals(reason)) {
// 消费者主动拒绝:可能是业务逻辑异常,尝试重试
channel.basicPublish(RETRY_EXCHANGE, "retry.key", properties, body);
System.out.println("消息已转入重试队列");
} else if ("expired".equals(reason)) {
// TTL 过期:可能是延迟业务,正常处理
processMessage(body);
} else if ("overflow".equals(reason)) {
// 队列满:发送人工审核告警
channel.basicPublish(MANUAL_EXCHANGE, "review.key", properties, body);
System.out.println("队列溢出,消息已转入人工审核");
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.out.println("死信补偿处理器已启动");
}
}
private static String extractReason(Map<String, Object> headers) {
if (headers == null || !headers.containsKey("x-death")) {
return "unknown";
}
List<Map<String, Object>> xDeaths =
(List<Map<String, Object>>) headers.get("x-death");
if (xDeaths.isEmpty()) {
return "unknown";
}
return (String) xDeaths.get(0).get("reason");
}
private static void processMessage(byte[] body) {
System.out.println("处理消息: " + new String(body));
}
}
补偿策略汇总
| 死信原因 | 建议策略 |
|---|---|
| rejected | 转入重试队列,限制最大重试次数 |
| expired | 正常消费(延迟业务场景) |
| overflow | 转入人工审核队列,发送告警 |
死信队列消费必须调用
basicAck,否则消息会再次进入死信循环。
要点总结
- 通过
x-deathheaders 分析死信原因、原队列、死信次数 - 按死信原因分类补偿:rejected 重试、expired 正常消费、overflow 人工审核
- 死信消费者必须
basicAck,避免死信循环 - 建议配合监控告警,及时发现死信堆积问题
- 生产环境建议记录死信日志,便于后续排查
📝 发现内容有误?点击此处直接编辑