跨机房容灾部署
RabbitMQ 通过 Federation 与 Shovel 插件实现跨机房消息同步,保障异地容灾能力。
跨机房架构
部署拓扑
Java
机房 A 机房 B
[RabbitMQ-A] ──同步──→ [RabbitMQ-B]
↓ ↓
生产者/消费者 生产者/消费者
跨机房同步方案:
| 方案 | 同步方式 | 延迟 | 适用场景 |
|---|---|---|---|
| Shovel | 拉取或推送 | 低 | 点对点同步 |
| Federation | 上游拉取 | 中 | 多对一汇聚 |
| Mirror Queue | 同步复制 | 极低 | 同城低延迟 |
Shovel 插件
工作原理
Shovel 将源队列的消息转发到目标队列:
ini
源队列 → Shovel 连接 → 目标交换器 → 目标队列
Shovel 工作流程:
- 建立到源 RabbitMQ 的连接
- 从源队列消费消息
- 发布到目标 RabbitMQ 的交换器
- 确认源队列消息(ack)
Shovel 配置
Java
import com.rabbitmq.client.*;
public class ShovelExample {
public static void main(String[] args) throws Exception {
// 目标机房连接
ConnectionFactory targetFactory = new ConnectionFactory();
targetFactory.setHost("rabbitmq-room-b");
targetFactory.setPort(5672);
try (Connection targetConn = targetFactory.newConnection();
Channel ch = targetConn.createChannel()) {
// 在目标机房声明队列(接收端)
ch.queueDeclare("replicated.queue", true, false, false, null);
ch.exchangeDeclare("replicated.exchange", BuiltinExchangeType.DIRECT, true);
ch.queueBind("replicated.queue", "replicated.exchange", "replicate.key");
System.out.println("目标队列已就绪");
System.out.println("Shovel 插件配置通过 rabbitmqctl 或 management UI 完成");
}
}
}
Shovel 插件配置(rabbitmq.conf):
Java
# 定义 Shovel
rabbitmq_shovel.direct.conf = [{"name": "room-a-to-b",
"source-uri": "amqp://rabbitmq-room-a/%2f",
"source-queue": "source.queue",
"dest-uri": "amqp://rabbitmq-room-b/%2f",
"dest-exchange": "replicated.exchange",
"dest-publish-properties": {"delivery_mode": 2}}]
动态 Shovel
ini
import com.rabbitmq.client.*;
import java.util.*;
public class DynamicShovelExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 通过 HTTP API 创建动态 Shovel
// PUT /api/parameters/shovel/%2f/shovel-room-a-to-b
// Body: {"src-uri": "amqp://room-a/",
// "src-queue": "source.queue",
// "dest-uri": "amqp://room-b/",
// "dest-exchange": "replicated.exchange"}
System.out.println("动态 Shovel 通过 Management API 配置");
System.out.println("无需修改配置文件,运行时生效");
}
}
}
Federation 插件
工作原理
Federation 上游从下游拉取消息:
Java
上游队列(本地)← Federation ← 下游队列(远程)
与 Shovel 的区别:
| 特性 | Shovel | Federation |
|---|---|---|
| 同步方向 | 源 → 目标 | 上游 ← 下游 |
| 连接方式 | 推送或拉取 | 上游拉取 |
| 适用场景 | 点对点 | 多对一汇聚 |
| 配置复杂度 | 低 | 中 |
Federation 配置
Java
import com.rabbitmq.client.*;
import java.util.*;
public class FederationExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq-room-a");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 声明交换器(启用 Federation)
Map<String, Object> args_map = new HashMap<>();
args_map.put("federation-upstream", "room-b-upstream");
ch.exchangeDeclare("federated.exchange",
BuiltinExchangeType.DIRECT, true, false, args_map);
ch.queueDeclare("federated.queue", true, false, false, null);
ch.queueBind("federated.queue", "federated.exchange", "federation.key");
System.out.println("Federation 交换器已声明");
System.out.println("消息将从上游机房自动拉取");
}
}
}
Federation 上游配置(rabbitmq.conf):
text
# 定义上游
rabbitmq_federation.direct.conf = [{"name": "room-b-upstream",
"uri": "amqp://rabbitmq-room-b/%2f"}]
# 策略绑定
rabbitmqctl set_policy federate "^federated\." \
'{"federation-upstream-set":"all"}' --apply-to exchanges
跨机房双向同步
架构设计
text
机房 A ←─Shovel─→ 机房 B
↓ ↓
生产者/消费者 生产者/消费者
双向同步配置:
text
import com.rabbitmq.client.*;
public class BidirectionalSyncExample {
public static void main(String[] args) throws Exception {
// 机房 A 连接
ConnectionFactory factoryA = new ConnectionFactory();
factoryA.setHost("rabbitmq-room-a");
// 机房 B 连接
ConnectionFactory factoryB = new ConnectionFactory();
factoryB.setHost("rabbitmq-room-b");
try (Connection connA = factoryA.newConnection();
Connection connB = factoryB.newConnection();
Channel chA = connA.createChannel();
Channel chB = connB.createChannel()) {
// 两个机房各自声明本地队列
chA.queueDeclare("room-a.local", true, false, false, null);
chB.queueDeclare("room-b.local", true, false, false, null);
// Shovel 配置双向同步
// A → B: Shovel 将 room-a.local 转发到 B
// B → A: Shovel 将 room-b.local 转发到 A
System.out.println("双向同步需配置两个 Shovel 实例");
System.out.println("注意避免消息循环转发");
}
}
}
双向同步需配置消息路由规则,避免同一消息在两个机房之间无限循环转发。
容灾切换
故障场景
text
机房 A 故障 → 生产者/消费者切换到机房 B
切换流程:
- 检测机房 A 不可达
- 生产者重新连接到机房 B
- 消费者从机房 B 消费
- 机房 A 恢复后,通过 Shovel 同步缺失消息
客户端切换
text
import com.rabbitmq.client.*;
import java.util.*;
public class RoomFailoverExample {
public static void main(String[] args) throws Exception {
// 配置多机房地址
List<Address> addresses = Arrays.asList(
new Address("rabbitmq-room-a", 5672),
new Address("rabbitmq-room-b", 5672)
);
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
try (Connection conn = factory.newConnection(addresses);
Channel ch = conn.createChannel()) {
ch.queueDeclare("failover.queue", true, false, false, null);
ch.basicPublish("", "failover.queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
"Cross-room message".getBytes());
System.out.println("消息已发布到当前可用机房");
System.out.println("机房故障时自动切换到备用地址");
}
}
}
注意事项
Shovel 和 Federation 是异步同步,跨机房延迟取决于网络带宽与距离,不保证强一致性。
Federation 适合多对一汇聚场景,Shovel 适合点对点同步,根据业务需求选择。
跨机房同步消息不保证顺序,网络抖动可能导致消息到达顺序与发送顺序不同。
双向同步需避免消息循环,可通过设置
x-shovel-destination-queue或使用不同的路由键。
要点总结
- 跨机房容灾使用 Shovel 或 Federation 插件实现消息异步同步
- Shovel 将源队列消息转发到目标交换器,适合点对点同步
- Federation 上游从下游拉取消息,适合多对一汇聚
- 双向同步需配置两个 Shovel 实例,注意避免消息循环
- 跨机房同步不保证强一致性与消息顺序
- 客户端通过多地址配置实现机房故障自动切换
文章存放路径:D:\git2\jwdev\articles\RABBITMQ\专家\高可用与容灾\跨机房容灾部署.md
📝 发现内容有误?点击此处直接编辑