全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

延迟队列插件

传统延迟消息需借助死信队列+TTL实现,配置复杂且存在队头阻塞问题。rabbitmq-delayed-message-exchange 插件提供原生延迟消息支持,简化配置并解决队头阻塞。

插件安装

安装步骤

  1. 下载插件:从 RabbitMQ Community Plugins 仓库获取 rabbitmq_delayed_message_exchange 插件
  2. .ez 文件放入 RabbitMQ 插件目录(通常为 /plugins
  3. 启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  4. 重启 RabbitMQ 服务

注意:插件需在 RabbitMQ 3.6.0 及以上版本使用,集群环境下所有节点均需安装。

延迟交换机类型

x-delayed-message 交换机

Java
// Maven 依赖
// <dependency>
//     <groupId>com.rabbitmq</groupId>
//     <artifactId>amqp-client</artifactId>
//     <version>5.20.0</version>
// </dependency>

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayedExchangeExample {
    
    private static final String DELAYED_EXCHANGE = "delayed_exchange";
    private static final String QUEUE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "delayed.key";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // ========== 1. 声明延迟交换机 ==========
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-delayed-type", "direct");  // 底层使用 direct 类型
            channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", true, false, argsMap);
            
            // ========== 2. 声明队列并绑定 ==========
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, DELAYED_EXCHANGE, ROUTING_KEY);
            
            // ========== 3. 发送延迟消息 ==========
            String message = "Delayed Message";
            
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .headers(Map.of("x-delay", 5000))  // 延迟 5000ms
                    .build();
            
            channel.basicPublish(DELAYED_EXCHANGE, ROUTING_KEY, props,
                    message.getBytes(StandardCharsets.UTF_8));
            
            System.out.println("延迟消息已发送,将在 5 秒后投递到队列");
            
            // 等待消息投递
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

底层交换机类型选择

x-delayed-type 值路由行为适用场景
direct精确匹配路由键单队列延迟投递
topic模式匹配路由键多队列按规则延迟
fanout广播到所有绑定队列全局延迟通知
headers按消息头匹配复杂条件延迟

延迟消息特性

动态延迟时间

每条消息可设置不同的延迟时间:

Java
public class DynamicDelayExample {
    
    public static void sendWithDifferentDelays(Channel channel) 
            throws IOException {
        // 延迟 3 秒
        AMQP.BasicProperties props3s = new AMQP.BasicProperties.Builder()
                .headers(Map.of("x-delay", 3000))
                .build();
        channel.basicPublish("delayed_exchange", "route", props3s,
                "3s delay".getBytes(StandardCharsets.UTF_8));
        
        // 延迟 10 秒
        AMQP.BasicProperties props10s = new AMQP.BasicProperties.Builder()
                .headers(Map.of("x-delay", 10000))
                .build();
        channel.basicPublish("delayed_exchange", "route", props10s,
                "10s delay".getBytes(StandardCharsets.UTF_8));
        
        // 延迟 1 分钟
        AMQP.BasicProperties props1m = new AMQP.BasicProperties.Builder()
                .headers(Map.of("x-delay", 60000))
                .build();
        channel.basicPublish("delayed_exchange", "route", props1m,
                "1m delay".getBytes(StandardCharsets.UTF_8));
    }
}

延迟与 TTL 对比

特性插件延迟消息TTL + 死信队列
配置复杂度简单,仅需声明交换机复杂,需声明队列、TTL、DLX
队头阻塞无,每条消息独立计时有,仅队头消息过期后投递
延迟时间每条消息可不同队列级别,所有消息相同
插件依赖需安装插件原生支持
适用场景灵活延迟、订单超时取消固定延迟、简单场景

完整示例:订单超时取消

Java
public class OrderTimeoutExample {
    
    private static final String DELAY_EXCHANGE = "order_delay_exchange";
    private static final String ORDER_QUEUE = "order_timeout_queue";
    private static final String ORDER_ROUTE = "order.timeout";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明延迟交换机
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-delayed-type", "direct");
            channel.exchangeDeclare(DELAY_EXCHANGE, "x-delayed-message", true, false, argsMap);
            channel.queueDeclare(ORDER_QUEUE, true, false, false, null);
            channel.queueBind(ORDER_QUEUE, DELAY_EXCHANGE, ORDER_ROUTE);
            
            // 消费者:处理超时订单
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("订单超时,执行取消: " + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            channel.basicConsume(ORDER_QUEUE, false, deliverCallback, consumerTag -> {});
            
            // 模拟下单,发送延迟消息(30 秒后超时)
            String orderId = "ORDER-10001";
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .headers(Map.of("x-delay", 30000))
                    .build();
            channel.basicPublish(DELAY_EXCHANGE, ORDER_ROUTE, props,
                    orderId.getBytes(StandardCharsets.UTF_8));
            
            System.out.println("订单已创建,将在 30 秒后检查超时");
        }
    }
}

注意事项

  1. 插件延迟消息存储在内存中,RabbitMQ 重启后未投递的延迟消息会丢失。
  2. x-delay 值为毫秒,最大值受 max_delay 配置限制(默认无限制)。
  3. x-delay 设置为负数或 0,消息会立即投递到队列。
  4. 延迟交换机底层仍需声明 x-delayed-type 指定实际路由类型。
  5. 集群环境下需确保所有节点均安装插件,否则延迟交换机声明会失败。
  6. 插件方案不保证延迟消息持久化,高可靠场景需配合业务层补偿机制。

要点总结

  • rabbitmq-delayed-message-exchange 插件提供原生延迟消息支持,简化配置
  • 延迟时间通过消息头 x-delay 设置,单位为毫秒,每条消息可独立配置
  • 相比 TTL+死信队列方案,插件无队头阻塞问题,支持动态延迟时间
  • 声明延迟交换机时需设置 x-delayed-type 指定底层路由类型
  • 插件延迟消息存储在内存中,重启后丢失,不适用于高可靠持久化场景

📝 发现内容有误?点击此处直接编辑

← 上一篇 延迟队列实现
下一篇 → 死信交换机概念
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库