全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

延迟消息插件

延迟消息插件提供原生延迟交换机类型,消息在指定时间后才投递到队列,简化延迟场景实现。

安装插件

插件需与 RabbitMQ 版本匹配,安装后重启服务生效。

Bash
# 下载插件(以 3.10.x 为例)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez

# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.10.2.ez /usr/lib/rabbitmq/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 重启 RabbitMQ
systemctl restart rabbitmq-server

# 验证插件状态
rabbitmq-plugins list | grep delayed

插件版本必须与 RabbitMQ 主版本一致,否则无法加载。

延迟交换机类型

延迟交换机类型为 x-delayed-message,支持以下路由策略:

路由类型x-delayed-type 值说明
Directdirect精确匹配路由键
Fanoutfanout广播到所有绑定队列
Topictopic通配符匹配路由键
Headersheaders基于消息头匹配

Java Client 声明延迟交换机

通过 x-delayed-type 参数指定底层路由策略。

Java
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明延迟交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 底层使用 direct 路由
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);

// 声明队列并绑定
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_key");

x-delayed-message 是交换机类型,x-delayed-type 指定底层路由策略。

发送延迟消息

通过消息头 x-delay 指定延迟时间(毫秒)。

Java
// 发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .headers(Map.of("x-delay", 5000)) // 延迟 5 秒
    .contentType("text/plain")
    .build();

channel.basicPublish(
    "delayed_exchange",    // 交换机
    "delayed_key",         // 路由键
    props,                 // 消息属性(含延迟时间)
    "延迟消息内容".getBytes()
);

System.out.println("延迟消息已发送,5秒后投递");

消息流转过程:

  1. 消息到达延迟交换机
  2. 交换机读取 x-delay 值,暂存消息
  3. 延迟时间到达后,消息路由到目标队列

动态延迟时间

每条消息可设置不同的延迟时间。

Java
// 不同延迟时间
int[] delays = {1000, 3000, 5000, 10000};
for (int delay : delays) {
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .headers(Map.of("x-delay", delay))
        .build();
    
    channel.basicPublish(
        "delayed_exchange",
        "delayed_key",
        props,
        ("延迟 " + delay + "ms").getBytes()
    );
}

x-delay 值为负数或 0 时,消息立即投递,不延迟。

延迟消息与死信队列对比

传统方案使用 TTL + DLX 实现延迟,对比如下:

特性延迟插件TTL + DLX
实现复杂度高(需多个队列+DLX)
动态延迟支持不支持(队列TTL固定)
消息排序按延迟时间排序按过期时间排序
插件依赖需安装插件原生支持

延迟插件适合动态延迟场景,TTL+DLX 适合固定延迟且无需插件的场景。

注意事项

  1. 延迟消息存储在内存中,Broker 重启后未到期消息丢失
  2. 延迟时间最大值受 delayed_message.max_delay 配置限制
  3. 延迟交换机不支持 auto-delete,否则消息无法暂存
  4. 大量延迟消息会占用内存,建议设置合理的延迟上限

要点总结

  • 延迟插件提供 x-delayed-message 交换机类型,需安装后启用
  • 声明交换机时通过 x-delayed-type 指定底层路由策略
  • 发送消息时通过 x-delay 消息头设置延迟时间(毫秒)
  • 每条消息可设置不同延迟时间,支持动态延迟
  • 延迟消息暂存内存,Broker 重启丢失,不适合持久化场景

📝 发现内容有误?点击此处直接编辑

← 上一篇 Shovel 插件与消息桥接
下一篇 → 插件安装与管理
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库