全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 10 分钟 ✍️ juanwangdev

跨机房容灾部署

RabbitMQ 通过 Federation 与 Shovel 插件实现跨机房消息同步,保障异地容灾能力。

跨机房架构

部署拓扑

Java
机房 A                    机房 B
[RabbitMQ-A] ──同步──→ [RabbitMQ-B]
      ↓                       ↓
  生产者/消费者           生产者/消费者

跨机房同步方案:

方案同步方式延迟适用场景
Shovel拉取或推送点对点同步
Federation上游拉取多对一汇聚
Mirror Queue同步复制极低同城低延迟

Shovel 插件

工作原理

Shovel 将源队列的消息转发到目标队列:

ini
源队列 → Shovel 连接 → 目标交换器 → 目标队列

Shovel 工作流程:

  1. 建立到源 RabbitMQ 的连接
  2. 从源队列消费消息
  3. 发布到目标 RabbitMQ 的交换器
  4. 确认源队列消息(ack)

Shovel 配置

Java
import com.rabbitmq.client.*;

public class ShovelExample {
    public static void main(String[] args) throws Exception {
        // 目标机房连接
        ConnectionFactory targetFactory = new ConnectionFactory();
        targetFactory.setHost("rabbitmq-room-b");
        targetFactory.setPort(5672);
        
        try (Connection targetConn = targetFactory.newConnection();
             Channel ch = targetConn.createChannel()) {
            
            // 在目标机房声明队列(接收端)
            ch.queueDeclare("replicated.queue", true, false, false, null);
            ch.exchangeDeclare("replicated.exchange", BuiltinExchangeType.DIRECT, true);
            ch.queueBind("replicated.queue", "replicated.exchange", "replicate.key");
            
            System.out.println("目标队列已就绪");
            System.out.println("Shovel 插件配置通过 rabbitmqctl 或 management UI 完成");
        }
    }
}

Shovel 插件配置(rabbitmq.conf):

Java
# 定义 Shovel
rabbitmq_shovel.direct.conf = [{"name": "room-a-to-b",
  "source-uri": "amqp://rabbitmq-room-a/%2f",
  "source-queue": "source.queue",
  "dest-uri": "amqp://rabbitmq-room-b/%2f",
  "dest-exchange": "replicated.exchange",
  "dest-publish-properties": {"delivery_mode": 2}}]

动态 Shovel

ini
import com.rabbitmq.client.*;
import java.util.*;

public class DynamicShovelExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            // 通过 HTTP API 创建动态 Shovel
            // PUT /api/parameters/shovel/%2f/shovel-room-a-to-b
            // Body: {"src-uri": "amqp://room-a/", 
            //        "src-queue": "source.queue",
            //        "dest-uri": "amqp://room-b/",
            //        "dest-exchange": "replicated.exchange"}
            
            System.out.println("动态 Shovel 通过 Management API 配置");
            System.out.println("无需修改配置文件,运行时生效");
        }
    }
}

Federation 插件

工作原理

Federation 上游从下游拉取消息:

Java
上游队列(本地)← Federation ← 下游队列(远程)

与 Shovel 的区别:

特性ShovelFederation
同步方向源 → 目标上游 ← 下游
连接方式推送或拉取上游拉取
适用场景点对点多对一汇聚
配置复杂度

Federation 配置

Java
import com.rabbitmq.client.*;
import java.util.*;

public class FederationExample {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("rabbitmq-room-a");
        
        try (Connection conn = factory.newConnection();
             Channel ch = conn.createChannel()) {
            
            // 声明交换器(启用 Federation)
            Map<String, Object> args_map = new HashMap<>();
            args_map.put("federation-upstream", "room-b-upstream");
            
            ch.exchangeDeclare("federated.exchange", 
                BuiltinExchangeType.DIRECT, true, false, args_map);
            
            ch.queueDeclare("federated.queue", true, false, false, null);
            ch.queueBind("federated.queue", "federated.exchange", "federation.key");
            
            System.out.println("Federation 交换器已声明");
            System.out.println("消息将从上游机房自动拉取");
        }
    }
}

Federation 上游配置(rabbitmq.conf):

text
# 定义上游
rabbitmq_federation.direct.conf = [{"name": "room-b-upstream",
  "uri": "amqp://rabbitmq-room-b/%2f"}]

# 策略绑定
rabbitmqctl set_policy federate "^federated\." \
  '{"federation-upstream-set":"all"}' --apply-to exchanges

跨机房双向同步

架构设计

text
机房 A ←─Shovel─→ 机房 B
   ↓                  ↓
 生产者/消费者     生产者/消费者

双向同步配置:

text
import com.rabbitmq.client.*;

public class BidirectionalSyncExample {
    public static void main(String[] args) throws Exception {
        // 机房 A 连接
        ConnectionFactory factoryA = new ConnectionFactory();
        factoryA.setHost("rabbitmq-room-a");
        
        // 机房 B 连接
        ConnectionFactory factoryB = new ConnectionFactory();
        factoryB.setHost("rabbitmq-room-b");
        
        try (Connection connA = factoryA.newConnection();
             Connection connB = factoryB.newConnection();
             Channel chA = connA.createChannel();
             Channel chB = connB.createChannel()) {
            
            // 两个机房各自声明本地队列
            chA.queueDeclare("room-a.local", true, false, false, null);
            chB.queueDeclare("room-b.local", true, false, false, null);
            
            // Shovel 配置双向同步
            // A → B: Shovel 将 room-a.local 转发到 B
            // B → A: Shovel 将 room-b.local 转发到 A
            
            System.out.println("双向同步需配置两个 Shovel 实例");
            System.out.println("注意避免消息循环转发");
        }
    }
}

双向同步需配置消息路由规则,避免同一消息在两个机房之间无限循环转发。

容灾切换

故障场景

text
机房 A 故障 → 生产者/消费者切换到机房 B

切换流程:

  1. 检测机房 A 不可达
  2. 生产者重新连接到机房 B
  3. 消费者从机房 B 消费
  4. 机房 A 恢复后,通过 Shovel 同步缺失消息

客户端切换

text
import com.rabbitmq.client.*;
import java.util.*;

public class RoomFailoverExample {
    public static void main(String[] args) throws Exception {
        // 配置多机房地址
        List<Address> addresses = Arrays.asList(
            new Address("rabbitmq-room-a", 5672),
            new Address("rabbitmq-room-b", 5672)
        );
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);
        
        try (Connection conn = factory.newConnection(addresses);
             Channel ch = conn.createChannel()) {
            
            ch.queueDeclare("failover.queue", true, false, false, null);
            ch.basicPublish("", "failover.queue", 
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                "Cross-room message".getBytes());
            
            System.out.println("消息已发布到当前可用机房");
            System.out.println("机房故障时自动切换到备用地址");
        }
    }
}

注意事项

Shovel 和 Federation 是异步同步,跨机房延迟取决于网络带宽与距离,不保证强一致性。

Federation 适合多对一汇聚场景,Shovel 适合点对点同步,根据业务需求选择。

跨机房同步消息不保证顺序,网络抖动可能导致消息到达顺序与发送顺序不同。

双向同步需避免消息循环,可通过设置 x-shovel-destination-queue 或使用不同的路由键。

要点总结

  • 跨机房容灾使用 Shovel 或 Federation 插件实现消息异步同步
  • Shovel 将源队列消息转发到目标交换器,适合点对点同步
  • Federation 上游从下游拉取消息,适合多对一汇聚
  • 双向同步需配置两个 Shovel 实例,注意避免消息循环
  • 跨机房同步不保证强一致性与消息顺序
  • 客户端通过多地址配置实现机房故障自动切换

文章存放路径:D:\git2\jwdev\articles\RABBITMQ\专家\高可用与容灾\跨机房容灾部署.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 灾难恢复演练
下一篇 → TLS/SSL 加密通信
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库