Raft 实现从0-1 源码篇

Raft 实现

Posted by WR on November 12, 2022

前言

​ Raft 基本概念在 raft 从 0-1 基础篇 已经介绍过,这里不再重复概述,感兴趣的可以移步基础篇。

功能

​ Raft 有选举和日志复制两大核心模块,接下来主要围绕着两大模块一步步实现一个简化版的 Raft 协议,本文参考的是 ETCD Raft 实现,旨在帮助大家更好的理解 Raft 协议,更多的细节可参考 ETCD Raft 实现,或者其它优秀的 Raft 实现。

Leader 选举

首先定义一个 Raft 结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// None is a placeholder node ID used when there is no leader.
const None uint64 = 0

// Possible values for StateType.
const (
	StateFollower StateType = iota
	StateCandidate
	StateLeader
	StatePreCandidate
	numStates
)

// StateType represents the role of a node in a cluster.
type StateType uint64

type raft struct {

  Term uint64
  
	state StateType
	// leadTransferee is id of the leader transfer target when its value is not zero.
	// Follow the procedure defined in raft thesis 3.10.
	leadTransferee uint64

	// number of ticks since it reached last electionTimeout when it is leader
	// or candidate.
	// number of ticks since it reached last electionTimeout or received a
	// valid message from current leader when it is a follower.
	electionElapsed int

	// number of ticks since it reached last heartbeatTimeout.
	// only leader keeps heartbeatElapsed.
	heartbeatElapsed int

	// 检查选举
	checkQuorum bool

	heartbeatTimeout int
	electionTimeout  int
	// randomizedElectionTimeout is a random number between
	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
	// when raft changes its state to follower or candidate.
	randomizedElectionTimeout int

	tick func()
}

步骤一:计时器功能实现

心跳计时器

​ raft 里面有心跳计时器和选举计时器,心跳计时器是 leader 结点向follower节点发送心跳的时间,选举计时器是follower 节点没收到心跳后,多久发起选举的时间,这个时间是一个随机时间区间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// tickHeartbeat 心跳计时器,只有主节点才有这个计时器,心跳计时器超时后发送心跳消息
func (r *raft) tickHeartbeat() {
	// 对心跳计时器进行加一
	r.heartbeatElapsed++
	// 对选举计时器进行加一
	r.electionElapsed++

	// 如果选举计数器大于选举超时时间
	if r.electionElapsed >= r.electionTimeout {
		// 选举计时器重置
		r.electionElapsed = 0
		// 主节点会检查集群中的节点是否满足半数以上,否则自动切换成成 follower
		/*if r.checkQuorum {

		}*/

		// 如果当前节点状态是主节点并且为节点转移状态,则停止集群节点转移
		if r.state == StateLeader && r.leadTransferee != None {
			r.abortLeaderTransfer()
		}
	}

	// 如果非主节点则直接返回
	if r.state != StateLeader {
		return
	}

	// 如果心跳计时器超时
	if r.heartbeatElapsed >= r.heartbeatTimeout {
		// 重置心跳计数器
		r.heartbeatElapsed = 0
		// 发起心跳消息
		// r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

// 停止节点转移-有时候需要做一些手动节点转移的操作
func (r *raft) abortLeaderTransfer() {
	r.leadTransferee = None
}
选举计时器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// tickElection 选举计时器被 follower 和 Candidate 驱动
func (r *raft) tickElection() {
	// 选举计数器加一
	r.electionElapsed++

	// promotable 方法会检查 prs 字段中是否还存在当前节 点对应的 Progress 实例 ,这是为了检测当前
	// 节点是否被从集群中移除了
	// 这里暂且不关心 r.promotable()
	/*if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}*/

	// 如果超时,重置选举计数器并且则发起选举
	if r.pastElectionTimeout() {
		r.electionElapsed = 0
		// r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

// pastElectionTimeout 返回选举计数器是否已经超时
// randomizedElectionTimeout = [electiontimeout, 2 * electiontimeout - 1].
func (r *raft) pastElectionTimeout() bool {
	return r.electionElapsed >= r.randomizedElectionTimeout
}

​ 至此计时器功能已经完成,详细说明看一参见代码注释。

步骤二:选举功能实现

​ 集群节点初始角色都是 follower,随着选举计时器到期,开始有 follower 尝试成为 preCandidate、Candidate 角色,随后发起投票操作尝试成为 Leader。

raft 初始化

​ 初始化 raft 结构,因为节点初始化的时候都是 follower 角色,所以在 raft 结构初始化的时候给节点分配 follower 角色,初始化代码如下:

1
2
3
4
5
6
7
func NewRaft() *raft {
  // 正常情况还有大量其他操作和初始化其它参数,这里先略去不管
	r := raft{}
  // 调用 becomeFollower 成为 follower 角色
	r.becomeFollower(r.Term, None)
	return &r
}
becomeFollower

​ 接下来看一下 becomeFollower 做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// becomeFollower 成为 follower 角色
func (r *raft) becomeFollower(term uint64, lead uint64) {
	// 分配 follower 角色消息处理函数,用于处理不同类型的消息
	r.step = stepFollower
	// 做一些重置工作
	r.reset(term)
	// follower 节点设置选举计时器
	r.tick = r.tickElection
	// 设置 lead
	r.lead = lead
	// 设置状态为 follower
	r.state = StateFollower
}

// stepFollower
func stepFollower(r *raft, m Message) error {
	switch m.Type {
	// 这里暂时只关心心跳消息,其它暂时略去
	case MsgHeartbeat:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleHeartbeat(m)

	}
	return nil
}

func (r *raft) handleHeartbeat(m Message) {
	//r.raftLog.commitTo(m.Commit)
	//r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

func (r *raft) reset(term uint64) {
	// 重置任期和投票
	if r.Term != term {
		r.Term = term
		r.Vote = None
	}

	r.lead = None

	r.electionElapsed = 0
	r.heartbeatElapsed = 0
	// 重置随机选举计数器时间-选举超时时间是个随机数,目的是为了避免瓜分选票
	r.resetRandomizedElectionTimeout()

	// 停止节点转移
	r.abortLeaderTransfer()

	// 重置选票
	/*r.prs.ResetVotes()

	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
		// 在 raft 基础篇讲过,matchIndex 重置为0,NextIndex 重置为自己最后一条log index 位置+1
		*pr = tracker.Progress{
			Match:     0,
			Next:      r.raftLog.lastIndex() + 1,
			Inflights: tracker.NewInflights(r.prs.MaxInflight),
			IsLearner: pr.IsLearner,
		}
		// 如果是自己,matchIndex 设置为最后一条log的位置
		if id == r.id {
			pr.Match = r.raftLog.lastIndex()
		}
	})*/

	/*r.pendingConfIndex = 0
	r.uncommittedSize = 0
	r.readOnly = newReadOnly(r.readOnly.option)*/
}

​ 随着选举计数器 electionElapsed 不断推进,最终会有一个节点进入选举环节,这次给前文介绍的选举时钟加上 step 功能,即上文给 follower 节点注册的 stepFollower 函数,由 stepFollower 函数处理心跳消息:

​ tickElection 新增 r.Step(Message{From: r.id, Type: MsgHup}) 环节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tickElection 选举计时器被 follower 和 Candidate 驱动
func (r *raft) tickElection() {
	// 选举计数器加一
	r.electionElapsed++

	// promotable 方法会检查 prs 字段中是否还存在当前节 点对应的 Progress 实例 ,这是为了检测当前
	// 节点是否被从集群中移除了
	// 这里暂且不关心 r.promotable()
	/*if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}*/

	// 如果超时,重置选举计数器并且则发起选举
	if r.pastElectionTimeout() {
		r.electionElapsed = 0
    // 进入选举流程
		r.Step(Message{From: r.id, Type: MsgHup})
	}
}
stepFollower - 选举流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *raft) Step(m Message) error {
  // 这里主要分为任期和消息类型两种类型判断
  // 本次暂时直接进入 case MsgHup: 分支,进入 r.preVote 环节

	switch m.Type {
  // 选举类型消息  
	case MsgHup:
    // 如果是预选举则进入预选举环节即preCandidate,否则进入 Candidate环节
		if r.preVote {
			r.hup(campaignPreElection)
		} else {
			r.hup(campaignElection)
		}


	return nil
}
r.hup() 选举:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (r *raft) hup(t CampaignType) {
	if r.state == StateLeader {
		return
	}

	// 进行节点状态等信息检查
	if !r.promotable() {
		return
	}

  // 暂时先不管 raftlog 相关实现
	/*ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
	if err != nil {
		r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
	}
	if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
		r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
		return
	}*/

	r.campaign(t)
}
r.campaign():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
	if !r.promotable() {
		// This path should not be hit (callers are supposed to check), but
		// better safe than sorry.
	}
	//var term uint64
	var voteMsg MessageType
	// 如果是预投票则进入 becomePreCandidate 阶段,并且给当前任期+1
  // 否则进入 becomeCandidate 阶段,
	if t == campaignPreElection {
		r.becomePreCandidate()
    // 消息类型为预投票
		voteMsg = MsgPreVote
		// PreVote RPCs are sent for the next term before we've incremented r.Term.
		term = r.Term + 1
	} else {
		r.becomeCandidate()
		voteMsg = MsgVote
		term = r.Term
	}
	// 统计选票
	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == VoteWon {
		// 如果消息类型是预选举并且赢得多数节点的选票则进入选举环节,否则成为主节点
    // 这里是递归的不断统计选票
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			//r.becomeLeader()
		}
		return
	}
	var ids []uint64
	{
		idMap := r.prs.Voters.IDs()
		ids = make([]uint64, 0, len(idMap))
		for id := range idMap {
			ids = append(ids, id)
		}
		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
	}
	for _, id := range ids {
		if id == r.id {
			continue
		}

		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
		// 发送预投票或者投票请求消息给集群节点
		r.send(Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

​ 如果是与投票消息,则其它节点收到消息后会进入前文介绍的 r.Step 函数

r.Step()

​ 以下只保留和本次预选举相关的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch {
	case m.Term == 0:
		// local message
    // 前文介绍的时候,预选举的时候会给自己的消息+1,所以这里消息任期大于节点的任期
	case m.Term > r.Term:
    // 这里主要做一些校验
    // 检查消息类型
    // 检查是否是集群节点转移
    // 检查是否满足 checkQuorum,避免瓜分选票
    // 检查是否是 lead 节点
    // 检查选举计时器是否还未超时
		if m.Type == MsgVote || m.Type == MsgPreVote {
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease {
				// If a server receives a RequestVote request within the minimum election timeout
				// of hearing from a current leader, it does not update its term or grant its vote
				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
				return nil
			}
		}
  }
		

	switch m.Type {
	// 消息类型为预投票
	case pb.MsgVote, pb.MsgPreVote:
		// 如果已经给当前节点投过票或者还未投票并且没有主节点或者消息任期比当前节点任期更大则满足投票条件
		canVote := r.Vote == m.From ||
			// ...we haven't voted and we don't think there's a leader yet in this term...
			(r.Vote == None && r.lead == None) ||
			// ...or this is a PreVote for a future term...
			(m.Type == pb.MsgPreVote && m.Term > r.Term)
		// 满足选举条件则回复投票消息,否则回复投票拒绝
    // isUpToDate 判断发起选举的节点任期是否比当前节点大或者任期相等但最后一条消息索引大于等于当前节点,主要还		// 是确保选举的节点包含所有消息
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
      // 发送投票消息
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			// 拒绝投票
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}

	default:
		err := r.step(r, m)
		if err != nil {
			return err
		}
	}
	return nil
}

​ 回到 stepCandidate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
	// 收到预选票消息回复 	myVoteRespType = pb.MsgPreVoteResp
	var myVoteRespType pb.MessageType
	if r.state == StatePreCandidate {
		myVoteRespType = pb.MsgPreVoteResp
	} else {
		myVoteRespType = pb.MsgVoteResp
	}
	switch m.Type {
	case myVoteRespType:
    // 统计选票
		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
		switch res {
      // 如果赢得预选
		case quorum.VoteWon:
      // 当前节点是预选状态,进入campaign 
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
				r.becomeLeader()
				r.bcastAppend()
			}
     //  选举失败重新变成 follower
		case quorum.VoteLost:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}
	}
	return nil
}

func (r *raft) campaign(t CampaignType) {
	if !r.promotable() {
		// This path should not be hit (callers are supposed to check), but
		// better safe than sorry.
		r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
	}
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
		r.becomePreCandidate()
		voteMsg = pb.MsgPreVote
		// PreVote RPCs are sent for the next term before we've incremented r.Term.
		term = r.Term + 1
	} else {
    // 成为候选人,并发起投票
		r.becomeCandidate()
		voteMsg = pb.MsgVote
		term = r.Term
	}
	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
		// We won the election after voting for ourselves (which must mean that
		// this is a single-node cluster). Advance to the next state.
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			r.becomeLeader()
		}
		return
	}
	var ids []uint64
	{
		idMap := r.prs.Voters.IDs()
		ids = make([]uint64, 0, len(idMap))
		for id := range idMap {
			ids = append(ids, id)
		}
		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
	}
	for _, id := range ids {
		if id == r.id {
			continue
		}
		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
    // 发起投票
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

候选人发起投票

当前节点已经变成 candidate,候选人在选举计时器到期后发起选举,消息类型为 MsgVote, 再次回到 r.Step()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch {
	case m.Term == 0:
		// local message
	case m.Term > r.Term:
		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease {
				// If a server receives a RequestVote request within the minimum election timeout
				// of hearing from a current leader, it does not update its term or grant its vote
				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
				return nil
			}
		}
		switch {
		case m.Type == pb.MsgPreVote:
			// Never change our term in response to a PreVote
		case m.Type == pb.MsgPreVoteResp && !m.Reject:
			// We send pre-vote requests with a term in our future. If the
			// pre-vote is granted, we will increment our term when we get a
			// quorum. If it is not, the term comes from the node that
			// rejected our vote so we should become a follower at the new
			// term.
		default:
			r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From)
			} else {
        // 如果 candidate 收到比自己任期大的消息,并且还未投过票会把自己转为为 follower 角色
				r.becomeFollower(m.Term, None)
			}
		}

	case m.Term < r.Term:
		if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
		} else if m.Type == pb.MsgPreVote {
			// Before Pre-Vote enable, there may have candidate with higher term,
			// but less log. After update to Pre-Vote, the cluster may deadlock if
			// we drop messages with a lower term.
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
		} else {
			// ignore other cases
			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
		}
		return nil
	}

	switch m.Type {
	case pb.MsgVote, pb.MsgPreVote:
		// We can vote if this is a repeat of a vote we've already cast...
		canVote := r.Vote == m.From ||
			// ...we haven't voted and we don't think there's a leader yet in this term...
			(r.Vote == None && r.lead == None) ||
			// ...or this is a PreVote for a future term...
			(m.Type == pb.MsgPreVote && m.Term > r.Term)
		// ...and we believe the candidate is up to date.
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			// 回复 MsgVoteResp 消息
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}

	default:
		err := r.step(r, m)
		if err != nil {
			return err
		}
	}
	return nil
}
候选人处理 MsgVoteResp 消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
	// Only handle vote responses corresponding to our candidacy (while in
	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
	// our pre-candidate state).
	var myVoteRespType pb.MessageType
	if r.state == StatePreCandidate {
		myVoteRespType = pb.MsgPreVoteResp
	} else {
		myVoteRespType = pb.MsgVoteResp
	}
	switch m.Type {
	case myVoteRespType:
		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
		switch res {
		case quorum.VoteWon:
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
        // 赢得选举则成为 leader 节点并且发送广播
				r.becomeLeader()
				r.bcastAppend()
			}
		case quorum.VoteLost:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}
	case pb.MsgTimeoutNow:
		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
	}
	return nil
}

主节点会周期性给其它节点发送心跳 ,follower 节点收到消息后会重置选举计时器,更新 lead 角色,更新 raftLog 已提交位置,并且回复心跳响应。

leader 节点心跳响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgHeartbeatResp:
    // 更新 follower 节点的健康状态
		pr.RecentActive = true
		pr.ProbeSent = false

		// 结点限流,避免网络中的消息堆积过多
		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
			pr.Inflights.FreeFirstOne()
		}
    
    // 如果 follower 节点已匹配的消息小于当前最后一条消息索引则追加消息
		if pr.Match < r.raftLog.lastIndex() {
			r.sendAppend(m.From)
		}

		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
			return nil
		}

		if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
			return nil
		}

		rss := r.readOnly.advance(m)
		for _, rs := range rss {
			if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
				r.send(resp)
			}
		}
	return nil
}

步骤三:日志复制

​ 客户端向集群发送消息为 prop 类型,只有主节点才能响应写入消息类型,follower 收到 prop 消息会转发给 leader, leader 收到消息后会先 WAL ,然后向其它节点发送 log,当收到一半以上的回复后,会更新 commmited位置,并且应用到本地状态机,同时也会告诉 follower 节点把消息应用到状态机。

leader 节点 prop 消息处理流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgProp:
		// 检测 MsgProp 消息是否携带了 Entry 记录,如果未携带,则输出异常日志并终止 程序 (略)
		// 检测 当前节点是否被移出 集群 ,如果当前节点以 Leader 状态被移出集群,则不再处理 MsgProp 消息(咯)
		// 检测当前是否正在进行 Leader 节点的转移,不再处理 MsgProp 消息(略)
		if len(m.Entries) == 0 {
			r.logger.Panicf("%x stepped empty MsgProp", r.id)
		}
		if r.prs.Progress[r.id] == nil {
			// If we are not currently a member of the range (i.e. this node
			// was removed from the configuration while serving as leader),
			// drop any new proposals.
			return ErrProposalDropped
		}
		if r.leadTransferee != None {
			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
			return ErrProposalDropped
		}

		for i := range m.Entries {
			e := &m.Entries[i]
			var cc pb.ConfChangeI
			if e.Type == pb.EntryConfChange {
				var ccc pb.ConfChange
				if err := ccc.Unmarshal(e.Data); err != nil {
					panic(err)
				}
				cc = ccc
			} else if e.Type == pb.EntryConfChangeV2 {
				var ccc pb.ConfChangeV2
				if err := ccc.Unmarshal(e.Data); err != nil {
					panic(err)
				}
				cc = ccc
			}
			if cc != nil {
				alreadyPending := r.pendingConfIndex > r.raftLog.applied
				alreadyJoint := len(r.prs.Config.Voters[1]) > 0
				wantsLeaveJoint := len(cc.AsV2().Changes) == 0

				var refused string
				if alreadyPending {
					refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
				} else if alreadyJoint && !wantsLeaveJoint {
					refused = "must transition out of joint config first"
				} else if !alreadyJoint && wantsLeaveJoint {
					refused = "not in joint state; refusing empty conf change"
				}

				if refused != "" {
					r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
				} else {
					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
				}
			}
		}

		// 将上述 Entry记录追加到当前节点的 raftLog 中
		if !r.appendEntry(m.Entries...) {
			return ErrProposalDropped
		}
		// 通过 MsgApp 消息向集群中其他节点复制 Entry记录, bcastAppend()方法
		r.bcastAppend()
		return nil
	}
	
	return nil
}
follower 收到 leader MsgApp类型 消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func stepFollower(r *raft, m pb.Message) error {
  case pb.MsgApp:
  	// 重置选举计数器
		r.electionElapsed = 0
  	// 更新 lead 节点
		r.lead = m.From
  	// 追加消息
		r.handleAppendEntries(m)
}

func (r *raft) handleAppendEntries(m pb.Message) {
  // 消息已经提交,告诉leader已提交的位置,leader 下次从这个位置之后开始发送或同步
	if m.Index < r.raftLog.committed {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
		return
	}

   // 回复 MsgAppResp 类型消息
	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	} else {
		r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

		hintIndex := min(m.Index, r.raftLog.lastIndex())
		hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
		hintTerm, err := r.raftLog.term(hintIndex)
		if err != nil {
			panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
		}
    
    // 回复拒绝消息
		r.send(pb.Message{
			To:         m.From,
			Type:       pb.MsgAppResp,
			Index:      m.Index,
			Reject:     true,
			RejectHint: hintIndex,
			LogTerm:    hintTerm,
		})
	}
}

leader 处理 follower 节点的 MsgAppResp 消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func stepLeader(r *raft, m pb.Message) error {
case pb.MsgAppResp:
		pr.RecentActive = true

  	// 如果 follower 拒绝本次消息
		if m.Reject {
			r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
				r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
			nextProbeIdx := m.RejectHint
			if m.LogTerm > 0 {
				nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
			}
      // 减少 index 位置,重新同步
      // 场景:集群重新选主后,新主节点不知道从节点的复制位置,会把 nextIndex[] 置为自己的 lastIndex,					// matchIndex[] 全部置为0
      // 所以一开始发送的同步消息index位置可能并不满足,然后开始慢慢递减 index 位置,直至满足。
			if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
				if pr.State == tracker.StateReplicate {
					pr.BecomeProbe()
				}
				r.sendAppend(m.From)
			}
		} else {
			oldPaused := pr.IsPaused()
			if pr.MaybeUpdate(m.Index) {
				switch {
				case pr.State == tracker.StateProbe:
					pr.BecomeReplicate()
				case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
					// TODO(tbg): we should also enter this branch if a snapshot is
					// received that is below pr.PendingSnapshot but which makes it
					// possible to use the log again.
					r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
					// Transition back to replicating state via probing state
					// (which takes the snapshot into account). If we didn't
					// move to replicating state, that would only happen with
					// the next round of appends (but there may not be a next
					// round for a while, exposing an inconsistent RaftStatus).
					pr.BecomeProbe()
					pr.BecomeReplicate()
				case pr.State == tracker.StateReplicate:
					pr.Inflights.FreeLE(m.Index)
				}

				if r.maybeCommit() {
					// committed index has progressed for the term, so it is safe
					// to respond to pending read index requests
					releasePendingReadIndexMessages(r)
					r.bcastAppend()
				} else if oldPaused {
					// If we were paused before, this node may be missing the
					// latest commit index, so send it.
					r.sendAppend(m.From)
				}
				// We've updated flow control information above, which may
				// allow us to send multiple (size-limited) in-flight messages
				// at once (such as when transitioning from probe to
				// replicate, or when freeTo() covers multiple messages). If
				// we have more entries to send, send as many messages as we
				// can (without sending empty messages for the commit index)
				for r.maybeSendAppend(m.From, false) {
				}
				// Transfer leadership is in progress.
				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
					r.sendTimeoutNow(m.From)
				}
			}
		}
}