ZooKeeper Epoch与数据同步机制
理解消息队列、Epoch和三种同步方式。
消息队列设计
Leader发送队列:
Java
提议队列: 待发送的提议(Proposal)
提交队列: 待发送的提交通知(Commit)
Follower接收队列:
Java
待处理提议: 收到但未处理的提议
待提交提议: 已投票但未提交的提议
队列实现:
Java
// Leader队列
LinkedBlockingQueue<Proposal> toSend;
LinkedBlockingQueue<Commit> toCommit;
// Follower队列
LinkedBlockingQueue<Proposal> pendingProposals;
LinkedBlockingQueue<Commit> pendingCommits;
批处理机制:
Java
public void sendBatch(List<Proposal> batch) {
// 打包多个提议
QuorumPacket packet = new QuorumPacket();
packet.setType(Leader.PROPOSAL);
packet.setData(serializeBatch(batch));
// 单次发送多个提议
learner.writePacket(packet);
}
流水线机制:
Java
Leader: propose1 → propose2 → propose3 → ...
Follower: ack1 → ack2 → ack3 → ...
Leader: commit1 → commit2 → commit3 → ...
提议、ACK、提交并行处理
不阻塞等待前一个完成
队列优势:
| 优势 | 说明 |
|---|---|
| 解耦 | 发送和接收异步 |
| 流量控制 | 队列缓冲调节 |
| 批处理 | 减少网络往返 |
提示:队列设计是ZAB高性能的基础。
Epoch纪元管理
Epoch概念:
Java
Epoch标识Leader任期
每次选举Epoch递增
新Epoch的提议优于旧Epoch提议
ZXID结构:
Java
ZXID = Epoch(高32位) + Counter(低32位)
Epoch: Leader纪元,选举后递增
Counter: 同Epoch内事务计数
Epoch生成:
Java
// 新Leader生成Epoch
public long getNewEpoch() {
// 获取所有Follower的Epoch
long maxEpoch = getMaxEpochFromFollowers();
// 新Epoch必须大于旧Epoch
return maxEpoch + 1;
}
Epoch验证:
text
// Follower接收提议时验证Epoch
public void processProposal(Proposal proposal) {
long epoch = getEpoch(proposal.zxid);
if (epoch < currentEpoch) {
// 旧Epoch提议,拒绝
return;
}
if (epoch > currentEpoch) {
// 新Epoch,更新状态
currentEpoch = epoch;
}
// 处理提议
}
Epoch作用:
| 作用 | 说明 |
|---|---|
| 标识任期 | 区分不同Leader |
| 防止假复活 | 旧Leader提议无效 |
| 数据一致性 | 新Epoch覆盖旧数据 |
Epoch变更流程:
text
1. 新Leader选举
2. Leader生成新Epoch
3. Leader发送NEWLEADER提议
4. Follower确认NEWLEADER
5. Epoch更新完成
注意:Epoch机制防止旧Leader"假复活"破坏一致性。
Follower数据同步
三种同步方式:
| 方式 | 触发条件 | 说明 |
|---|---|---|
| DIFF | Follower落后少量 | 发送差异事务 |
| TRUNC | Follower超前 | 截断未提交提议 |
| SNAP | Follower落后大量 | 发送完整快照 |
DIFF同步:
text
public void syncDiff(LearnerHandler learner, long followerZxid) {
// 获取差距范围内的事务
List<Proposal> diffs = getProposalRange(followerZxid, leaderZxid);
for (Proposal p : diffs) {
learner.sendPacket(p);
}
}
TRUNC同步:
text
public void syncTrunc(LearnerHandler learner, long followerZxid) {
// Follower ZXID > Leader committed ZXID
// 说明有旧Leader未提交提议
learner.sendPacket(new Trunc(leaderZxid));
}
SNAP同步:
text
public void syncSnap(LearnerHandler learner) {
// 发送完整快照
DataTree snapshot = zks.getZKDatabase().getDataTree();
learner.sendPacket(new Snap(snapshot));
}
同步方式选择:
text
public void chooseSyncType(long followerZxid) {
if (followerZxid > lastCommitted) {
// 超前,需要TRUNC
syncTrunc(learner, followerZxid);
} else if (followerZxid < minProposal) {
// 落后太多,SNAP
syncSnap(learner);
} else {
// 落后少量,DIFF
syncDiff(learner, followerZxid);
}
}
同步触发时机:
| 时机 | 说明 |
|---|---|
| Follower加入集群 | 初始同步 |
| Leader选举后 | Epoch更新同步 |
| Follower重启 | 数据恢复同步 |
提示:SNAP开销大,定期清理事务日志避免触发。
要点总结
- 发送队列存提议,接收队列存待处理提议
- 批处理打包多个提议减少网络往返
- 流水线并行提议、ACK、提交
- Epoch每次选举递增,标识Leader任期
- 新Epoch提议优先于旧Epoch
- DIFF/TRUNC/SNAP三种同步适配不同场景
- SNAP开销大,落后太多时触发
📝 发现内容有误?点击此处直接编辑