privatevoidunsafeRegisterConfChange(final Configuration oldConf, final Configuration newConf, final Closure done){
Requires.requireTrue(newConf.isValid(), "Invalid new conf: %s", newConf); // The new conf entry(will be stored in log manager) should be valid Requires.requireTrue(new ConfigurationEntry(null, newConf, oldConf).isValid(), "Invalid conf entry: %s", newConf);
if (this.state != State.STATE_LEADER) { // error return; } // 配置正在更改中 if (this.confCtx.isBusy()) { if (done != null) { Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Doing another configuration change.")); } return; } if (this.conf.getConf().equals(newConf)) { Utils.runClosureInThread(done); return; } this.confCtx.start(oldConf, newConf, done);//ConfigurationCtx,启动更新配置流程。 }
//com.alipay.sofa.jraft.core.NodeImpl.ConfigurationCtx voidnextStage(){ Requires.requireTrue(isBusy(), "Not in busy stage"); switch (this.stage) { case STAGE_CATCHING_UP: if (this.nchanges > 1) { this.stage = Stage.STAGE_JOINT; this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), new Configuration(this.oldPeers), false);// new and old return; } // 如果只更改一个节点,网络出现故障的情况下,新增节点要么被孤立要么融入 // 删除节点要么成功要么被孤立,不影响系统可用性 // 可以跳过同时保留新旧配置 case STAGE_JOINT: this.stage = Stage.STAGE_STABLE; this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), null, false); // only new break; case STAGE_STABLE: finalboolean shouldStepDown = !this.newPeers.contains(this.node.serverId); reset(new Status()); if (shouldStepDown) { this.node.stepDown(this.node.currTerm, true, new Status(RaftError.ELEADERREMOVED, "This node was removed.")); } break; case STAGE_NONE: // noinspection ConstantConditions Requires.requireTrue(false, "Can't reach here"); break; } } //写日志,等复制给Follower后,调用configurationChangeDone,进入下一步STAGE_STABLE privatevoidunsafeApplyConfiguration(final Configuration newConf, final Configuration oldConf, finalboolean leaderStart){ Requires.requireTrue(this.confCtx.isBusy(), "ConfigurationContext is not busy"); final LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); entry.setId(new LogId(0, this.currTerm)); entry.setPeers(newConf.listPeers()); entry.setLearners(newConf.listLearners()); if (oldConf != null) { entry.setOldPeers(oldConf.listPeers()); entry.setOldLearners(oldConf.listLearners()); } final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart); // Use the new_conf to deal the quorum of this very log if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) { Utils.runClosureInThread(configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task.")); return; } final List<LogEntry> entries = new ArrayList<>(); entries.add(entry); this.logManager.appendEntries(entries, new LeaderStableClosure(entries)); checkAndSetConfiguration(false); }
public Status transferLeadershipTo(final PeerId peer){ Requires.requireNonNull(peer, "Null peer"); this.writeLock.lock(); try { if (this.state != State.STATE_LEADER) { returnnew Status(this.state == State.STATE_TRANSFERRING ? RaftError.EBUSY : RaftError.EPERM, "Not a leader"); } if (this.confCtx.isBusy()) { //nope!更改配置过程非常混乱,拒绝转移Leader returnnew Status(RaftError.EBUSY, "Changing the configuration"); }
PeerId peerId = peer.copy(); // if peer_id is ANY_PEER(0.0.0.0:0:0), the peer with the largest // last_log_id will be selected. if (peerId.equals(PeerId.ANY_PEER)) { if ((peerId = this.replicatorGroup.findTheNextCandidate(this.conf)) == null) { returnnew Status(-1, "Candidate not found for any peer"); } } // ...
finallong lastLogIndex = this.logManager.getLastLogIndex(); if (!this.replicatorGroup.transferLeadershipTo(peerId, lastLogIndex)) { returnnew Status(RaftError.EINVAL, "No such peer %s", peer); } this.state = State.STATE_TRANSFERRING; final Status status = new Status(RaftError.ETRANSFERLEADERSHIP, "Raft leader is transferring leadership to %s", peerId); onLeaderStop(status); final StopTransferArg stopArg = new StopTransferArg(this, this.currTerm, peerId); this.stopTransferArg = stopArg; this.transferTimer = this.timerManager.schedule(() -> onTransferTimeout(stopArg), this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);//转移Leader有超时时间