集群加入与离开
集群节点的加入与离开涉及元数据同步、队列迁移和连接重定向,规范操作可避免服务中断与数据丢失。
定义
集群加入指将独立运行的RabbitMQ节点加入已有集群,共享元数据与队列;集群离开指将节点从集群中安全移除,确保其承载的队列正确迁移。
Maven依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
节点加入集群
操作流程
Bash
# 1. 确保目标节点与集群网络互通,erlang cookie一致
# /var/lib/rabbitmq/.erlang.cookie 必须相同
# 2. 停止目标节点应用(保留配置)
rabbitmqctl stop_app
# 3. 重置节点状态(清除本地元数据)
rabbitmqctl reset
# 4. 加入集群(以磁盘节点身份)
rabbitmqctl join_cluster rabbit@node-a
# 或以内存节点身份加入
# rabbitmqctl join_cluster rabbit@node-a --ram
# 5. 启动应用
rabbitmqctl start_app
# 6. 验证集群状态
rabbitmqctl cluster_status
验证集群状态
Bash
# 输出示例
Cluster status of node rabbit@node-b ...
[{nodes,[{disc,[rabbit@node-a,rabbit@node-b,rabbit@node-c]}]},
{running_nodes,[rabbit@node-a,rabbit@node-b,rabbit@node-c]},
{cluster_name,"rabbitmq-cluster"},
{partitions,[]}]
节点优雅离开集群
Bash
# 1. 确认要移除的节点承载的队列
rabbitmqctl list_queues name slave_pids synchronised_slave_pids
# 2. 在目标节点上执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
# 3. 或在其他节点上强制移除
rabbitmqctl forget_cluster_node rabbit@node-b
# 4. 验证集群状态
rabbitmqctl cluster_status
Java客户端连接集群
Java
import com.rabbitmq.client.*;
public class ClusterConnectionExample {
private static final String QUEUE_NAME = "cluster_ops_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 配置多个节点地址,客户端自动尝试连接
Address[] addresses = new Address[]{
new Address("node-a.rabbitmq.local", 5672),
new Address("node-b.rabbitmq.local", 5672),
new Address("node-c.rabbitmq.local", 5672)
};
// 连接可用节点(自动选择第一个成功)
Connection connection = factory.newConnection(addresses);
Channel channel = connection.createChannel();
// 声明队列(集群元数据自动同步)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送消息
String message = "cluster-ops-msg";
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("发送: " + message);
// 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("收到: " + msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
集群重新平衡
Bash
# 查看队列分布
rabbitmqctl list_queues name slave_pids master_slaves
# 手动触发队列重新平衡(将队列Master迁移到不同节点)
rabbitmqctl rebalance_queues
# 或指定策略重新平衡
rabbitmqctl rebalance_queues --force
节点加入集群前必须确保
erlang cookie一致,否则节点间无法建立Erlang分布式连接。该文件通常位于/var/lib/rabbitmq/.erlang.cookie或$HOME/.erlang.cookie。
注意事项
erlang cookie同步:加入集群前必须同步
~/.erlang.cookie文件,权限设为400,否则节点间无法通信。重置风险:
rabbitmqctl reset会清除节点所有元数据,已声明的队列和消息将丢失,仅在加入新集群前执行。离开集群影响:若离开的节点是某些队列的唯一Master,队列会不可用。需先迁移队列或确保有Slave可接管。
连接地址配置:客户端应配置多个节点地址,单个节点宕机时自动尝试下一个。
分区处理:网络恢复后若出现分区,使用
rabbitmqctl clear_cluster_status或重启分区节点重新加入集群。负载均衡:加入新节点后需更新负载均衡器配置,使流量分发到新节点。
要点总结
- 节点加入集群需执行
stop_app → reset → join_cluster → start_app流程 - 加入前必须同步
erlang cookie,确保节点间可建立分布式连接 - 节点离开需先确认其承载队列,避免Master丢失导致队列不可用
- 客户端配置多个节点地址,单点故障时自动尝试其他节点
- 集群扩缩容后可通过
rebalance_queues重新分布队列
📝 发现内容有误?点击此处直接编辑