全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

Shovel 插件与消息桥接

Shovel 插件将消息从一个队列持续转发到另一个队列(可在不同集群),实现跨集群消息桥接。

安装与启用

Shovel 为官方内置插件,启用即可使用。

Bash
# 启用 Shovel 插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

# 重启服务
systemctl restart rabbitmq-server

# 验证插件状态
rabbitmq-plugins list | grep shovel

rabbitmq_shovel:核心转发引擎 rabbitmq_shovel_management:Management UI 管理界面支持

Shovel 插件会建立持久连接并持续拉取消息,需确保网络稳定。

Shovel 工作原理

Shovel 工作流程:

  1. 连接到源队列所在 Broker
  2. 从源队列拉取消息
  3. 发布到目标队列所在 Broker
  4. 确认源消息消费完成
Bash
[源集群]                  [目标集群]
  Queue A --Shovel拉取--> Queue B
  (确认)                  (接收)

关键特性:

  • 消息确认:目标队列接收后才确认源消息
  • 断线重连:网络断开后自动重连并继续
  • 消息持久化:支持持久化消息不丢失

静态 Shovel 配置

在配置文件中定义 Shovel,随 Broker 启动自动运行。

Bash
# 编辑 rabbitmq.conf
# Linux: /etc/rabbitmq/rabbitmq.conf

# 定义 Shovel 名称
shovel.my_shovel.uri = amqp://admin:admin@source-host:5672
shovel.my_shovel.source-queue = source_queue
shovel.my_shovel.source-prefetch-count = 100
shovel.my_shovel.destination-uri = amqp://admin:admin@dest-host:5672
shovel.my_shovel.destination-queue = dest_queue
shovel.my_shovel.destination-publish-properties = true

配置参数说明:

参数说明示例值
uri源 Broker 连接amqp://user:pass@host:5672
source-queue源队列名称source_queue
source-prefetch-count拉取预取数量100
destination-uri目标 Broker 连接amqp://user:pass@host:5672
destination-queue目标队列名称dest_queue
destination-publish-properties保留消息属性true

动态 Shovel 配置

通过 Policy 或 Management API 运行时创建 Shovel。

Bash
# 通过 Policy 创建动态 Shovel
rabbitmqctl set_parameter shovel my_dynamic_shovel \
  '{"src-uri":"amqp://source-host:5672","src-queue":"source_queue",\
    "dest-uri":"amqp://dest-host:5672","dest-queue":"dest_queue"}'

# 查看 Shovel 列表
rabbitmqctl list_shovels

# 删除 Shovel
rabbitmqctl clear_parameter shovel my_dynamic_shovel

通过 Management API:

Java
curl -u admin:admin -X PUT \
  http://localhost:15672/api/parameters/shovel/%2F/my_shovel \
  -H "Content-Type: application/json" \
  -d '{
    "src-uri": "amqp://source-host:5672",
    "src-queue": "source_queue",
    "dest-uri": "amqp://dest-host:5672",
    "dest-queue": "dest_queue"
  }'

Java Client 操作 Shovel

通过 HTTP API 管理 Shovel。

text
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.util.Base64;

public class ShovelManager {
    private static final String API_URL = "http://localhost:15672/api/parameters/shovel/%2F/";
    private static final String AUTH = "Basic " + Base64.getEncoder()
        .encodeToString("admin:admin".getBytes());
    
    private final HttpClient client = HttpClient.newHttpClient();
    
    // 创建 Shovel
    public void createShovel(String name, String srcUri, String srcQueue, 
                            String destUri, String destQueue) throws Exception {
        String body = String.format(
            "{\"src-uri\":\"%s\",\"src-queue\":\"%s\"," +
            "\"dest-uri\":\"%s\",\"dest-queue\":\"%s\"}",
            srcUri, srcQueue, destUri, destQueue
        );
        
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(API_URL + name))
            .header("Content-Type", "application/json")
            .header("Authorization", AUTH)
            .PUT(HttpRequest.BodyPublishers.ofString(body))
            .build();
        
        HttpResponse<String> response = client.send(request, 
            HttpResponse.BodyHandlers.ofString());
        
        if (response.statusCode() != 204) {
            throw new RuntimeException("创建 Shovel 失败: " + response.body());
        }
    }
    
    // 删除 Shovel
    public void deleteShovel(String name) throws Exception {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(API_URL + name))
            .header("Authorization", AUTH)
            .DELETE()
            .build();
        
        client.send(request, HttpResponse.BodyHandlers.ofString());
    }
}

Shovel 与 Federation 对比

特性ShovelFederation
拓扑单向推送双向同步
连接持续连接按需连接
适用场景数据迁移、异地备份多活集群
消息确认目标确认后确认源独立确认
配置复杂度

单向消息同步选 Shovel,多活双向同步选 Federation。

注意事项

  1. Shovel 会持续拉取消息,源队列消息被消费后不再可转发
  2. 网络不稳定时 Shovel 会重连,期间消息可能重复
  3. 静态 Shovel 随 Broker 启动,动态 Shovel 需手动创建
  4. Shovel 不适用于高吞吐实时同步场景,建议使用 Federation

要点总结

  • Shovel 插件实现跨集群消息单向同步,从源队列拉取到目标队列
  • 支持静态配置(随 Broker 启动)和动态配置(运行时创建)
  • 消息在目标队列确认后才确认源消息,保证不丢失
  • 通过 Management API 或 rabbitmqctl 管理 Shovel 生命周期
  • 单向同步选 Shovel,多活双向同步选 Federation

📝 发现内容有误?点击此处直接编辑

← 上一篇 连接池配置
下一篇 → 延迟消息插件
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库