全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

延迟队列实现

延迟队列通过组合 TTL 和死信交换机实现:消息在业务队列中等待 TTL 过期后,自动转发到死信队列供消费者处理。

实现原理

Java
生产者 -> 延迟交换机 -> 延迟队列(设置TTL+DLX) --TTL过期--> DLX -> 实际消费队列 -> 消费者

核心思路:

  1. 生产者将消息发送到延迟队列,不直接发送到消费队列
  2. 延迟队列设置 TTL,消息在队列中等待
  3. 延迟队列配置 DLX,TTL 过期后消息自动转发到目标队列
  4. 消费者绑定目标队列,只消费过期后的消息

Java 示例:订单超时取消

text
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayQueueExample {

    // 延迟队列相关
    private static final String DELAY_EXCHANGE = "delay.exchange";
    private static final String DELAY_QUEUE = "delay.queue";
    private static final String DELAY_ROUTING_KEY = "order.delay";

    // 实际消费队列(死信队列)
    private static final String ORDER_QUEUE = "order.cancel.queue";
    private static final String ORDER_ROUTING_KEY = "order.cancel";

    // 死信交换机
    private static final String DLX_EXCHANGE = "dlx.exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 1. 声明死信交换机
            channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true);

            // 2. 声明实际消费队列(接收死信)
            channel.queueDeclare(ORDER_QUEUE, true, false, false, null);
            channel.queueBind(ORDER_QUEUE, DLX_EXCHANGE, ORDER_ROUTING_KEY);

            // 3. 声明延迟交换机
            channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, true);

            // 4. 声明延迟队列(设置 TTL + DLX)
            Map<String, Object> delayArgs = new HashMap<>();
            delayArgs.put("x-message-ttl", 30000);         // 30秒后过期
            delayArgs.put("x-dead-letter-exchange", DLX_EXCHANGE);
            delayArgs.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
            channel.queueDeclare(DELAY_QUEUE, true, false, false, delayArgs);
            channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);

            // 5. 发送延迟消息
            String orderId = "ORDER-20260522-001";
            channel.basicPublish(DELAY_EXCHANGE, DELAY_ROUTING_KEY, null,
                    orderId.getBytes(StandardCharsets.UTF_8));
            System.out.println("订单延迟消息已发送,30秒后处理取消");

            // 6. 消费实际队列中的消息
            channel.basicConsume(ORDER_QUEUE, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, StandardCharsets.UTF_8);
                    System.out.println("收到延迟订单: " + msg);
                    // 执行业务逻辑:取消订单、释放库存等
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });

            Thread.sleep(35000); // 等待延迟消息到达
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

注意事项

  • RabbitMQ 仅按队列头部检查 TTL,队列尾部消息即使先过期也要等待前面消息过期
  • 如果需要精确延迟,建议使用 RabbitMQ 官方插件 rabbitmq-delayed-message-exchange
  • 不同 TTL 的消息放入同一个延迟队列时,可能出现队头阻塞问题
  • 解决方案:按 TTL 分组,每种延迟时间使用独立延迟队列

对于固定延迟时间(如统一30分钟),使用 TTL+DLX 方案足够;对于动态延迟时间,建议使用延迟插件。

要点总结

  • 延迟队列 = 队列 TTL + 死信交换机 + 目标消费队列
  • 消息在延迟队列中等待 TTL 过期后,自动转发到目标队列
  • 适用于固定延迟时间的场景(订单超时、定时提醒等)
  • 队头阻塞问题:RabbitMQ 只检查队列头部消息的 TTL
  • 精确延迟需求建议使用 rabbitmq-delayed-message-exchange 插件

📝 发现内容有误?点击此处直接编辑

← 上一篇 TTL 消息过期
下一篇 → 延迟队列插件
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库