延迟队列实现
延迟队列通过组合 TTL 和死信交换机实现:消息在业务队列中等待 TTL 过期后,自动转发到死信队列供消费者处理。
实现原理
Java
生产者 -> 延迟交换机 -> 延迟队列(设置TTL+DLX) --TTL过期--> DLX -> 实际消费队列 -> 消费者
核心思路:
- 生产者将消息发送到延迟队列,不直接发送到消费队列
- 延迟队列设置 TTL,消息在队列中等待
- 延迟队列配置 DLX,TTL 过期后消息自动转发到目标队列
- 消费者绑定目标队列,只消费过期后的消息
Java 示例:订单超时取消
text
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayQueueExample {
// 延迟队列相关
private static final String DELAY_EXCHANGE = "delay.exchange";
private static final String DELAY_QUEUE = "delay.queue";
private static final String DELAY_ROUTING_KEY = "order.delay";
// 实际消费队列(死信队列)
private static final String ORDER_QUEUE = "order.cancel.queue";
private static final String ORDER_ROUTING_KEY = "order.cancel";
// 死信交换机
private static final String DLX_EXCHANGE = "dlx.exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 1. 声明死信交换机
channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 2. 声明实际消费队列(接收死信)
channel.queueDeclare(ORDER_QUEUE, true, false, false, null);
channel.queueBind(ORDER_QUEUE, DLX_EXCHANGE, ORDER_ROUTING_KEY);
// 3. 声明延迟交换机
channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// 4. 声明延迟队列(设置 TTL + DLX)
Map<String, Object> delayArgs = new HashMap<>();
delayArgs.put("x-message-ttl", 30000); // 30秒后过期
delayArgs.put("x-dead-letter-exchange", DLX_EXCHANGE);
delayArgs.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
channel.queueDeclare(DELAY_QUEUE, true, false, false, delayArgs);
channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);
// 5. 发送延迟消息
String orderId = "ORDER-20260522-001";
channel.basicPublish(DELAY_EXCHANGE, DELAY_ROUTING_KEY, null,
orderId.getBytes(StandardCharsets.UTF_8));
System.out.println("订单延迟消息已发送,30秒后处理取消");
// 6. 消费实际队列中的消息
channel.basicConsume(ORDER_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("收到延迟订单: " + msg);
// 执行业务逻辑:取消订单、释放库存等
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Thread.sleep(35000); // 等待延迟消息到达
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
注意事项
- RabbitMQ 仅按队列头部检查 TTL,队列尾部消息即使先过期也要等待前面消息过期
- 如果需要精确延迟,建议使用 RabbitMQ 官方插件
rabbitmq-delayed-message-exchange - 不同 TTL 的消息放入同一个延迟队列时,可能出现队头阻塞问题
- 解决方案:按 TTL 分组,每种延迟时间使用独立延迟队列
对于固定延迟时间(如统一30分钟),使用 TTL+DLX 方案足够;对于动态延迟时间,建议使用延迟插件。
要点总结
- 延迟队列 = 队列 TTL + 死信交换机 + 目标消费队列
- 消息在延迟队列中等待 TTL 过期后,自动转发到目标队列
- 适用于固定延迟时间的场景(订单超时、定时提醒等)
- 队头阻塞问题:RabbitMQ 只检查队列头部消息的 TTL
- 精确延迟需求建议使用
rabbitmq-delayed-message-exchange插件
📝 发现内容有误?点击此处直接编辑