延迟队列插件
传统延迟消息需借助死信队列+TTL实现,配置复杂且存在队头阻塞问题。rabbitmq-delayed-message-exchange 插件提供原生延迟消息支持,简化配置并解决队头阻塞。
插件安装
安装步骤
- 下载插件:从 RabbitMQ Community Plugins 仓库获取
rabbitmq_delayed_message_exchange插件 - 将
.ez文件放入 RabbitMQ 插件目录(通常为/plugins) - 启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange - 重启 RabbitMQ 服务
注意:插件需在 RabbitMQ 3.6.0 及以上版本使用,集群环境下所有节点均需安装。
延迟交换机类型
x-delayed-message 交换机
Java
// Maven 依赖
// <dependency>
// <groupId>com.rabbitmq</groupId>
// <artifactId>amqp-client</artifactId>
// <version>5.20.0</version>
// </dependency>
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayedExchangeExample {
private static final String DELAYED_EXCHANGE = "delayed_exchange";
private static final String QUEUE_NAME = "delayed_queue";
private static final String ROUTING_KEY = "delayed.key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// ========== 1. 声明延迟交换机 ==========
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-delayed-type", "direct"); // 底层使用 direct 类型
channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", true, false, argsMap);
// ========== 2. 声明队列并绑定 ==========
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, DELAYED_EXCHANGE, ROUTING_KEY);
// ========== 3. 发送延迟消息 ==========
String message = "Delayed Message";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 5000)) // 延迟 5000ms
.build();
channel.basicPublish(DELAYED_EXCHANGE, ROUTING_KEY, props,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("延迟消息已发送,将在 5 秒后投递到队列");
// 等待消息投递
Thread.sleep(6000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
底层交换机类型选择
| x-delayed-type 值 | 路由行为 | 适用场景 |
|---|---|---|
direct | 精确匹配路由键 | 单队列延迟投递 |
topic | 模式匹配路由键 | 多队列按规则延迟 |
fanout | 广播到所有绑定队列 | 全局延迟通知 |
headers | 按消息头匹配 | 复杂条件延迟 |
延迟消息特性
动态延迟时间
每条消息可设置不同的延迟时间:
Java
public class DynamicDelayExample {
public static void sendWithDifferentDelays(Channel channel)
throws IOException {
// 延迟 3 秒
AMQP.BasicProperties props3s = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 3000))
.build();
channel.basicPublish("delayed_exchange", "route", props3s,
"3s delay".getBytes(StandardCharsets.UTF_8));
// 延迟 10 秒
AMQP.BasicProperties props10s = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 10000))
.build();
channel.basicPublish("delayed_exchange", "route", props10s,
"10s delay".getBytes(StandardCharsets.UTF_8));
// 延迟 1 分钟
AMQP.BasicProperties props1m = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 60000))
.build();
channel.basicPublish("delayed_exchange", "route", props1m,
"1m delay".getBytes(StandardCharsets.UTF_8));
}
}
延迟与 TTL 对比
| 特性 | 插件延迟消息 | TTL + 死信队列 |
|---|---|---|
| 配置复杂度 | 简单,仅需声明交换机 | 复杂,需声明队列、TTL、DLX |
| 队头阻塞 | 无,每条消息独立计时 | 有,仅队头消息过期后投递 |
| 延迟时间 | 每条消息可不同 | 队列级别,所有消息相同 |
| 插件依赖 | 需安装插件 | 原生支持 |
| 适用场景 | 灵活延迟、订单超时取消 | 固定延迟、简单场景 |
完整示例:订单超时取消
Java
public class OrderTimeoutExample {
private static final String DELAY_EXCHANGE = "order_delay_exchange";
private static final String ORDER_QUEUE = "order_timeout_queue";
private static final String ORDER_ROUTE = "order.timeout";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明延迟交换机
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-delayed-type", "direct");
channel.exchangeDeclare(DELAY_EXCHANGE, "x-delayed-message", true, false, argsMap);
channel.queueDeclare(ORDER_QUEUE, true, false, false, null);
channel.queueBind(ORDER_QUEUE, DELAY_EXCHANGE, ORDER_ROUTE);
// 消费者:处理超时订单
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("订单超时,执行取消: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(ORDER_QUEUE, false, deliverCallback, consumerTag -> {});
// 模拟下单,发送延迟消息(30 秒后超时)
String orderId = "ORDER-10001";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 30000))
.build();
channel.basicPublish(DELAY_EXCHANGE, ORDER_ROUTE, props,
orderId.getBytes(StandardCharsets.UTF_8));
System.out.println("订单已创建,将在 30 秒后检查超时");
}
}
}
注意事项
- 插件延迟消息存储在内存中,RabbitMQ 重启后未投递的延迟消息会丢失。
x-delay值为毫秒,最大值受max_delay配置限制(默认无限制)。- 若
x-delay设置为负数或 0,消息会立即投递到队列。- 延迟交换机底层仍需声明
x-delayed-type指定实际路由类型。- 集群环境下需确保所有节点均安装插件,否则延迟交换机声明会失败。
- 插件方案不保证延迟消息持久化,高可靠场景需配合业务层补偿机制。
要点总结
rabbitmq-delayed-message-exchange插件提供原生延迟消息支持,简化配置- 延迟时间通过消息头
x-delay设置,单位为毫秒,每条消息可独立配置 - 相比 TTL+死信队列方案,插件无队头阻塞问题,支持动态延迟时间
- 声明延迟交换机时需设置
x-delayed-type指定底层路由类型 - 插件延迟消息存储在内存中,重启后丢失,不适用于高可靠持久化场景
📝 发现内容有误?点击此处直接编辑