本文记录了Lab 2A: Leader Election 的实验过程

Raft - Part 2A: leader election

​ Lab2 是实现 Raft 协议,论文很重要 extended Raft paper(仔细理解图 2),特别是论文的第 5 部分。2A 部分主要是实现 Raft 的 Leader Election,这部分的讨论在论文的 5.1 和 5.2 节。在开始 Coding 之前先看一下实验指导书给的资料。

实现

  • 首先是相关数据的定义,2A 中需要的数据类型基本上在论文的图 2 中给出了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// server 的三种状态
const (
Follower = "Follower"
Candidate = "Candidate"
Leader = "Leader"
)

// 先定义 Leader 选举所需要的数据,Log 需要的数据在 2B 中实现。
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Persistent state on all servers.
state string // what state of the server (Follower, Candidate, Leader)
currentTerm int // latest term server has seen (initialized to 0)
voteFor int // candidateId that received vote in current term (or -1 if none)
logs []Entry // log entries, each entry contains command and term.

electionTimeout *time.Timer // time of election timeout, random value
heartbeatsTimeout *time.Timer // time of heartbeat timeout, fixed value (120ms)
}

// 下面两个是与 RequestVote RPC 相关的定义,也和图 2 给出的一致
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // candidate’s term
CandidateId int // candidate requesting vote
LastLogIndex int // index of candidate’s last log entry
LastLogTerm int // term of candidate’s last log entry
}

type RequestVoteReply struct {
// Your data here (2A).
Term int // current term of other servers, for candidate to update its term if necessary
VoteGranted bool // true means that me agree candidateId to be leader
}

/*
先定义好 AppendEntries RPC,因为需要 Leader 发送 heartbeat
*/
type AppendEntriesArgs struct {
Term int // leader’s term
LeadId int // follower can redirect clients
}

type AppendEntriesReply struct {
Term int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
}
  • 首先补充 Make 函数中的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func Make(peers []*labrpc.ClientEnd, me int,
//...

// Your initialization code here (2A, 2B, 2C).
rf.state = Follower
rf.currentTerm = 0
rf.voteFor = -1
rf.logs = make([]Entry, 1)
rf.electionTimeout = time.NewTimer(FixedHeartBeatTimeout())
rf.heartbeatsTimeout = time.NewTimer(RandomizedElectionTimeout())

//...
}

func FixedHeartBeatTimeout() time.Duration {
return 100 * time.Millisecond
}

func RandomizedElectionTimeout() time.Duration {
rand.Seed(time.Now().UnixNano())
lo, hi := 300, 450
ms := rand.Intn(hi - lo) + lo
return time.Duration(ms) * time.Millisecond
}
  • 在 ticker 中,需要实现两种类型的定时器任务:1、election timeout 后,需要重新开始新一轮选举。2、heartbeats timeout 后,Leader 需要向其他服务器发送 heartbeat 消息(使用 AppendEntries RPC)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
func (rf *Raft) ticker() {
for rf.killed() == false {

// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().

select {
case <-rf.electionTimeout.C:
rf.mu.Lock()
rf.currentTerm += 1
rf.state = Candidate
rf.StartElection()
rf.electionTimeout.Reset(RandomizedElectionTimeout())
rf.mu.Unlock()
case <-rf.heartbeatsTimeout.C:
rf.mu.Lock()
if rf.state == Leader {
rf.BroadcastHeartBeat()
rf.heartbeatsTimeout.Reset(FixedHeartBeatTimeout())
}
rf.mu.Unlock()
}
}
}

// StartElection
// because of election timeout, so start a new election
//
func (rf *Raft) StartElection() {
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: len(rf.logs) - 1,
LastLogTerm: rf.logs[len(rf.logs) - 1].Term,
}
count := 1
rf.voteFor = rf.me
rf.persist()
for peer := range rf.peers {
if peer == rf.me {
continue
}
// issue RequestVote RPCs in parallel to each of the other servers in cluster
go func(peer int) {
reply := RequestVoteReply{}
if rf.sendRequestVote(peer, &args, &reply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.currentTerm == args.Term && rf.state == Candidate {
if reply.VoteGranted {
count += 1
// when receive majority vote from other server, then
// I become leader and send heartbeats to other followers
if count > len(rf.peers) / 2 {
rf.state = Leader
rf.heartbeatsTimeout.Reset(FixedHeartBeatTimeout())
rf.BroadcastHeartBeat()
}
} else if reply.Term > rf.currentTerm {
rf.state = Follower
rf.currentTerm, rf.voteFor = reply.Term, -1
rf.persist()
}
}
}
}(peer)
}
}

// BroadcastHeartBeat
// It send heartbeat messages to all of the other servers to
// establish its autority and prevent new elections.
//
func (rf *Raft) BroadcastHeartBeat() {
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeadId: rf.me,
}
// send AppendEntries RPC in asynchronously
for peer := range rf.peers {
if peer == rf.me {
rf.electionTimeout.Reset(RandomizedElectionTimeout())
continue
}
go func(peer int) {
reply := AppendEntriesReply{}
if rf.sendAppendEntries(peer, &args, &reply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.voteFor = -1
}
}
}(peer)
}
}
  • 处理 RPC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()

// see figure 2: RequestVote RPC
if args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.voteFor != -1 && rf.voteFor != args.CandidateId) {
reply.Term, reply.VoteGranted = rf.currentTerm, false
return
}
if args.Term > rf.currentTerm {
rf.state = Follower
rf.currentTerm, rf.voteFor = args.Term, -1
}

// vote for candidate and reset election timeout
rf.voteFor = args.CandidateId
rf.electionTimeout.Reset(RandomizedElectionTimeout())
reply.Term, reply.VoteGranted = rf.currentTerm, true
}

// AppendEntries
// send AppendEntries RPC to Follower
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// resets the election timeout
rf.electionTimeout.Reset(RandomizedElectionTimeout())
reply.Term = rf.currentTerm
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
  • 最后一个 GetState 没啥好说的

测试

1
go test -run 2A -race