消息去重方案
在消息重试、网络重传或生产者重发等场景下,消费者可能收到重复消息。消息去重确保同一消息仅被处理一次,保障业务幂等性。
为什么需要消息去重
重复消息来源
| 来源 | 说明 |
|---|---|
| 生产者重发 | 网络超时未收到 Confirm,生产者重新发送 |
| 消息重新入队 | 消费者超时未确认,消息重新入队被再次消费 |
| RabbitMQ 重启 | 持久化消息在极端情况下可能重复投递 |
注意:RabbitMQ 本身不提供去重机制,需在业务层实现幂等性校验。
基于消息 ID 去重
发送端设置消息 ID
Java
// Maven 依赖
// <dependency>
// <groupId>com.rabbitmq</groupId>
// <artifactId>amqp-client</artifactId>
// <version>5.20.0</version>
// </dependency>
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class MessageIdPublisher {
private static final String EXCHANGE_NAME = "dedup_exchange";
private static final String ROUTING_KEY = "dedup.key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
String message = "Order-Payment-12345";
String messageId = UUID.randomUUID().toString();
// 设置 message_id
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息已发送, messageId: " + messageId);
}
}
}
消费端去重逻辑
Java
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
public class DedupConsumer {
// 本地缓存已处理的消息 ID(生产环境应使用 Redis/DB)
private static final Set<String> processedIds = ConcurrentHashMap.newKeySet();
public static void consume(Channel channel) throws IOException {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String messageId = delivery.getProperties().getMessageId();
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
// 检查是否已处理
if (messageId != null && processedIds.contains(messageId)) {
System.out.println("重复消息,已跳过: " + messageId);
channel.basicAck(deliveryTag, false);
return;
}
try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
processMessage(message);
// 处理成功后记录 messageId
if (messageId != null) {
processedIds.add(messageId);
}
channel.basicAck(deliveryTag, false);
System.out.println("消息处理成功: " + messageId);
} catch (Exception e) {
System.err.println("消息处理失败: " + e.getMessage());
channel.basicNack(deliveryTag, false, true);
}
};
channel.basicConsume("dedup_queue", false, deliverCallback, consumerTag -> {});
}
private static void processMessage(String message) {
// 业务处理逻辑
System.out.println("处理消息: " + message);
}
}
基于业务唯一键去重
适用场景
当消息本身没有 messageId 时,可从消息内容中提取业务唯一键(如订单号、流水号)进行去重。
Java
public class BusinessKeyDedup {
public static void consume(Channel channel) throws IOException {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
// 从消息内容提取业务唯一键
String businessKey = extractBusinessKey(message);
if (isProcessed(businessKey)) {
System.out.println("业务键重复,已跳过: " + businessKey);
channel.basicAck(deliveryTag, false);
return;
}
try {
processMessage(message);
markProcessed(businessKey);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
};
channel.basicConsume("biz_queue", false, deliverCallback, consumerTag -> {});
}
private static String extractBusinessKey(String message) {
// 解析消息提取业务唯一键,如订单号
// 示例:{"orderId":"10001","amount":100}
return message.split("\"orderId\":\"")[1].split("\"")[0];
}
private static boolean isProcessed(String businessKey) {
// 查询数据库或缓存判断是否已处理
return false;
}
private static void markProcessed(String businessKey) {
// 标记为已处理
}
private static void processMessage(String message) {
// 业务处理
}
}
基于数据库唯一约束去重
方案说明
利用数据库唯一索引实现幂等性,是最可靠的去重方式。
Java
import java.sql.*;
public class DatabaseDedup {
private static final String INSERT_SQL =
"INSERT INTO message_log (message_id, content, status) VALUES (?, ?, ?)";
public static boolean tryInsert(Connection conn, String messageId, String content) {
try (PreparedStatement ps = conn.prepareStatement(INSERT_SQL)) {
ps.setString(1, messageId);
ps.setString(2, content);
ps.setString(3, "PROCESSED");
ps.executeUpdate();
return true;
} catch (SQLException e) {
// 唯一约束冲突,说明已处理过
if (e.getSQLState().equals("23505")) {
System.out.println("消息已处理,跳过: " + messageId);
return false;
}
throw new RuntimeException(e);
}
}
}
注意:数据库唯一约束需提前在
message_id字段上创建 UNIQUE INDEX。
注意事项
- 本地内存缓存(如 ConcurrentHashMap)在服务重启后失效,仅适用于单机短期场景。
- 生产环境推荐使用 Redis(SETNX)或数据库唯一约束实现分布式去重。
- 去重逻辑应在消息确认之前执行,避免确认后再处理失败导致重复消费。
- 消息 ID 由生产者设置,若未设置则需从业务内容中提取唯一键。
- 去重缓存需设置过期时间,避免内存无限增长,过期时间应大于消息重试窗口。
要点总结
- RabbitMQ 本身不提供去重机制,需在消费端实现幂等性校验
- 基于
messageId去重需生产者在发送时设置唯一消息 ID - 无 messageId 时可从消息内容提取业务唯一键(如订单号)进行去重
- 数据库唯一约束是最可靠的去重方式,适用于分布式场景
- 去重状态需设置过期时间,避免内存持续增长
📝 发现内容有误?点击此处直接编辑