【JRaft源码分析03】成员变化


第三篇说成员变化,有了对选举和日志复制的认识,这个模块就很轻松简单了。

成员变化就两种情况,增加删除更换节点,和转移领导人。

1、更改一般节点

一般成员节点变化

这张图看起来很复杂,但整体流程其实很简单,非常清晰明了的。下面的方法就是修改配置的统一入口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void unsafeRegisterConfChange(final Configuration oldConf, final Configuration newConf, final Closure done) {

    Requires.requireTrue(newConf.isValid(), "Invalid new conf: %s", newConf);
    // The new conf entry(will be stored in log manager) should be valid
    Requires.requireTrue(new ConfigurationEntry(null, newConf, oldConf).isValid(), "Invalid conf entry: %s",
        newConf);

    if (this.state != State.STATE_LEADER) {
        // error
        return;
    }
    // 配置正在更改中
    if (this.confCtx.isBusy()) {
        if (done != null) {
            Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Doing another configuration change."));
        }
        return;
    }
    if (this.conf.getConf().equals(newConf)) {
        Utils.runClosureInThread(done);
        return;
    }
    this.confCtx.start(oldConf, newConf, done);//ConfigurationCtx,启动更新配置流程。
}

JRaft把更新配置拆解为四个步骤:

1.1、日志追赶(STAGE_CATCHING_UP)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void start(final Configuration oldConf, final Configuration newConf, final Closure done) {
    this.done = done;
    this.stage = Stage.STAGE_CATCHING_UP;
    this.oldPeers = oldConf.listPeers();
    this.newPeers = newConf.listPeers();
    this.oldLearners = oldConf.listLearners();
    this.newLearners = newConf.listLearners();
    final Configuration adding = new Configuration();
    final Configuration removing = new Configuration();
    newConf.diff(oldConf, adding, removing);
    this.nchanges = adding.size() + removing.size();

    addNewLearners();
    if (adding.isEmpty()) {//只删不增
        nextStage();
        return;
    }
    addNewPeers(adding);
}

private void addNewPeers(final Configuration adding) {
    this.addingPeers = adding.listPeers();
    for (final PeerId newPeer : this.addingPeers) {
        if (!this.node.replicatorGroup.addReplicator(newPeer)) {
            onCaughtUp(this.version, newPeer, false);//复制器启动异常,立即放弃追赶
            return;
        }
        final OnCaughtUp caughtUp = new OnCaughtUp(this.node, this.node.currTerm, newPeer, this.version);
        final long dueTime = Utils.nowMs() + this.node.options.getElectionTimeoutMs();
        //设置caughtUp回调,等待replicator完成复制工作
        if (!this.node.replicatorGroup.waitCaughtUp(newPeer, this.node.options.getCatchupMargin(), dueTime, caughtUp)) {
            onCaughtUp(this.version, newPeer, false);
            return;
        }
    }
}

Replicator启动后依旧做日常工作,onAppendEntriesReturned每次复制成功都会执行r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void notifyOnCaughtUp(final int code, final boolean beforeDestroy) {
    if (this.catchUpClosure == null) {
        return;
    }
    //这里有一大段关于waitCaughtUp定时器的,不说了

    final CatchUpClosure savedClosure = this.catchUpClosure;
    this.catchUpClosure = null;
    Utils.runClosureInThread(savedClosure, savedClosure.getStatus());//执行上面说到的OnCaughtUp
}

private void onCaughtUp(final PeerId peer, final long term, final long version, final Status st) {
    this.writeLock.lock();
    try {
        // 被选下去
        if (term != this.currTerm && this.state != State.STATE_LEADER) {
            return;
        }
        if (st.isOk()) {
            // Caught up successfully
            this.confCtx.onCaughtUp(version, peer, true);
            return;
        }
        // 超时继续等待
        if (st.getCode() == RaftError.ETIMEDOUT.getNumber()
            && Utils.monotonicMs() - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options
                .getElectionTimeoutMs()) {
            // ...return
        }
        this.confCtx.onCaughtUp(version, peer, false);
    } finally {
        this.writeLock.unlock();
    }
}

void com.alipay.sofa.jraft.core.NodeImpl.ConfigurationCtx#onCaughtUp
        (final long version, final PeerId peer, final boolean success) {
    
    Requires.requireTrue(this.stage == Stage.STAGE_CATCHING_UP, "Stage is not in STAGE_CATCHING_UP");
    if (success) {
        this.addingPeers.remove(peer);
        if (this.addingPeers.isEmpty()) {
            nextStage();//下一步,STAGE_JOINT
            return;
        }
        return;
    }
    reset(new Status(RaftError.ECATCHUP, "Peer %s failed to catch up.", peer));
}

1.2、同步配置信息(STAGE_JOINT & STAGE_STABLE)

这个过程有两个步骤,第一步同步新旧配置,所有写入操作兼容两种配置,第二步则是通知Follwer删除旧配置。

在过渡到第二步之前,Leader需要new & old quonum <=0,才能提交数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//com.alipay.sofa.jraft.core.NodeImpl.ConfigurationCtx
void nextStage() {
    Requires.requireTrue(isBusy(), "Not in busy stage");
    switch (this.stage) {
        case STAGE_CATCHING_UP:
            if (this.nchanges > 1) {
                this.stage = Stage.STAGE_JOINT;
                this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners),
                    new Configuration(this.oldPeers), false);// new and old
                return;
            }
            // 如果只更改一个节点,网络出现故障的情况下,新增节点要么被孤立要么融入
            // 删除节点要么成功要么被孤立,不影响系统可用性
            // 可以跳过同时保留新旧配置
        case STAGE_JOINT:
            this.stage = Stage.STAGE_STABLE;
            this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), null, false); // only new
            break;
        case STAGE_STABLE:
            final boolean shouldStepDown = !this.newPeers.contains(this.node.serverId);
            reset(new Status());
            if (shouldStepDown) {
                this.node.stepDown(this.node.currTerm, true, new Status(RaftError.ELEADERREMOVED,
                    "This node was removed."));
            }
            break;
        case STAGE_NONE:
            // noinspection ConstantConditions
            Requires.requireTrue(false, "Can't reach here");
            break;
    }
}
//写日志,等复制给Follower后,调用configurationChangeDone,进入下一步STAGE_STABLE
private void unsafeApplyConfiguration(final Configuration newConf, final Configuration oldConf,
                                          final boolean leaderStart) {
    Requires.requireTrue(this.confCtx.isBusy(), "ConfigurationContext is not busy");
    final LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
    entry.setId(new LogId(0, this.currTerm));
    entry.setPeers(newConf.listPeers());
    entry.setLearners(newConf.listLearners());
    if (oldConf != null) {
        entry.setOldPeers(oldConf.listPeers());
        entry.setOldLearners(oldConf.listLearners());
    }
    final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart);
    // Use the new_conf to deal the quorum of this very log
    if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) {
        Utils.runClosureInThread(configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task."));
        return;
    }
    final List<LogEntry> entries = new ArrayList<>();
    entries.add(entry);
    this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
    checkAndSetConfiguration(false);
}

但是STAGE_JOINT存在一个隐患,如果Leader没来得及把new&old配置复制到所有Follwer就崩溃了,那么会出现一批只有old和一批new&old的节点,极有可能分别选举出term相同的Leader,一个同时存在于new&old配置的Follwer会收到来自不同Leader的消息。

之前看到handleAppendEntriesRequest的处理,就是直接currTerm+1强制让两个Leader下台并发起选举,成为新任领导。

[1] 那么新Leader可以通过日志,向只有old的节点追加new,这样集群配置会长期处在一个unstable状态。发生下次成员变化时,会直接覆盖原来的old,直到步骤走到STAGE_NONE才会恢复到stable状态。

STAGE_STABLE阶段依然有风险,Leader崩溃后会出现一批只有new和一批new&old的节点,同样会分别出现两个Leader,newnew&old配置中的Follower都有可能收到来自不同Leader的消息。

同样在handleAppendEntriesRequest做处理,如果new受到,currTerm+1可以使new且删除old的配置复制到其他节点。反之,回带上面[1]的状态。

2、Leader转移

这部分的逻辑更加容易,就不多解释了,可自行阅读源码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public Status transferLeadershipTo(final PeerId peer) {
    Requires.requireNonNull(peer, "Null peer");
    this.writeLock.lock();
    try {
        if (this.state != State.STATE_LEADER) {
            return new Status(this.state == State.STATE_TRANSFERRING ? RaftError.EBUSY : RaftError.EPERM,
                    "Not a leader");
        }
        if (this.confCtx.isBusy()) {
            //nope!更改配置过程非常混乱,拒绝转移Leader
            return new Status(RaftError.EBUSY, "Changing the configuration");
        }

        PeerId peerId = peer.copy();
        // if peer_id is ANY_PEER(0.0.0.0:0:0), the peer with the largest
        // last_log_id will be selected.
        if (peerId.equals(PeerId.ANY_PEER)) {
            if ((peerId = this.replicatorGroup.findTheNextCandidate(this.conf)) == null) {
                return new Status(-1, "Candidate not found for any peer");
            }
        }
        // ...

        final long lastLogIndex = this.logManager.getLastLogIndex();
        if (!this.replicatorGroup.transferLeadershipTo(peerId, lastLogIndex)) {
            return new Status(RaftError.EINVAL, "No such peer %s", peer);
        }
        this.state = State.STATE_TRANSFERRING;
        final Status status = new Status(RaftError.ETRANSFERLEADERSHIP,
            "Raft leader is transferring leadership to %s", peerId);
        onLeaderStop(status);
        final StopTransferArg stopArg = new StopTransferArg(this, this.currTerm, peerId);
        this.stopTransferArg = stopArg;
        this.transferTimer = this.timerManager.schedule(() -> onTransferTimeout(stopArg),
            this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);//转移Leader有超时时间

    } finally {
        this.writeLock.unlock();
    }
    return Status.OK();
}

// 1 Leader: replicatorGroup.transferLeadershipTo -> Replicator.transferLeadership
// 2 Follower: com.alipay.sofa.jraft.core.NodeImpl#handleTimeoutNowRequest
// 3 Leader: com.alipay.sofa.jraft.core.Replicator#onTimeoutNowReturned