全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-24 15 分钟 ✍️ juanwangdev

ZooKeeper Epoch与数据同步机制

理解消息队列、Epoch和三种同步方式。

消息队列设计

Leader发送队列

Java
提议队列: 待发送的提议(Proposal)
提交队列: 待发送的提交通知(Commit)

Follower接收队列

Java
待处理提议: 收到但未处理的提议
待提交提议: 已投票但未提交的提议

队列实现

Java
// Leader队列
LinkedBlockingQueue<Proposal> toSend;
LinkedBlockingQueue<Commit> toCommit;

// Follower队列
LinkedBlockingQueue<Proposal> pendingProposals;
LinkedBlockingQueue<Commit> pendingCommits;

批处理机制

Java
public void sendBatch(List<Proposal> batch) {
    // 打包多个提议
    QuorumPacket packet = new QuorumPacket();
    packet.setType(Leader.PROPOSAL);
    packet.setData(serializeBatch(batch));
    
    // 单次发送多个提议
    learner.writePacket(packet);
}

流水线机制

Java
Leader: propose1 → propose2 → propose3 → ...
Follower: ack1 → ack2 → ack3 → ...
Leader: commit1 → commit2 → commit3 → ...

提议、ACK、提交并行处理
不阻塞等待前一个完成

队列优势

优势说明
解耦发送和接收异步
流量控制队列缓冲调节
批处理减少网络往返

提示:队列设计是ZAB高性能的基础。

Epoch纪元管理

Epoch概念

Java
Epoch标识Leader任期
每次选举Epoch递增
新Epoch的提议优于旧Epoch提议

ZXID结构

Java
ZXID = Epoch(高32位) + Counter(低32位)

Epoch: Leader纪元,选举后递增
Counter: 同Epoch内事务计数

Epoch生成

Java
// 新Leader生成Epoch
public long getNewEpoch() {
    // 获取所有Follower的Epoch
    long maxEpoch = getMaxEpochFromFollowers();
    // 新Epoch必须大于旧Epoch
    return maxEpoch + 1;
}

Epoch验证

text
// Follower接收提议时验证Epoch
public void processProposal(Proposal proposal) {
    long epoch = getEpoch(proposal.zxid);
    if (epoch < currentEpoch) {
        // 旧Epoch提议,拒绝
        return;
    }
    if (epoch > currentEpoch) {
        // 新Epoch,更新状态
        currentEpoch = epoch;
    }
    // 处理提议
}

Epoch作用

作用说明
标识任期区分不同Leader
防止假复活旧Leader提议无效
数据一致性新Epoch覆盖旧数据

Epoch变更流程

text
1. 新Leader选举
2. Leader生成新Epoch
3. Leader发送NEWLEADER提议
4. Follower确认NEWLEADER
5. Epoch更新完成

注意:Epoch机制防止旧Leader"假复活"破坏一致性。

Follower数据同步

三种同步方式

方式触发条件说明
DIFFFollower落后少量发送差异事务
TRUNCFollower超前截断未提交提议
SNAPFollower落后大量发送完整快照

DIFF同步

text
public void syncDiff(LearnerHandler learner, long followerZxid) {
    // 获取差距范围内的事务
    List<Proposal> diffs = getProposalRange(followerZxid, leaderZxid);
    
    for (Proposal p : diffs) {
        learner.sendPacket(p);
    }
}

TRUNC同步

text
public void syncTrunc(LearnerHandler learner, long followerZxid) {
    // Follower ZXID > Leader committed ZXID
    // 说明有旧Leader未提交提议
    learner.sendPacket(new Trunc(leaderZxid));
}

SNAP同步

text
public void syncSnap(LearnerHandler learner) {
    // 发送完整快照
    DataTree snapshot = zks.getZKDatabase().getDataTree();
    learner.sendPacket(new Snap(snapshot));
}

同步方式选择

text
public void chooseSyncType(long followerZxid) {
    if (followerZxid > lastCommitted) {
        // 超前,需要TRUNC
        syncTrunc(learner, followerZxid);
    } else if (followerZxid < minProposal) {
        // 落后太多,SNAP
        syncSnap(learner);
    } else {
        // 落后少量,DIFF
        syncDiff(learner, followerZxid);
    }
}

同步触发时机

时机说明
Follower加入集群初始同步
Leader选举后Epoch更新同步
Follower重启数据恢复同步

提示:SNAP开销大,定期清理事务日志避免触发。

要点总结

  • 发送队列存提议,接收队列存待处理提议
  • 批处理打包多个提议减少网络往返
  • 流水线并行提议、ACK、提交
  • Epoch每次选举递增,标识Leader任期
  • 新Epoch提议优先于旧Epoch
  • DIFF/TRUNC/SNAP三种同步适配不同场景
  • SNAP开销大,落后太多时触发

📝 发现内容有误?点击此处直接编辑

← 上一篇 ZooKeeper Curator框架实战
下一篇 → ZooKeeper ZAB协议源码实现
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库