MIT 6.824 Distributed Systems Lab-2 笔记

 

本文是上篇lab1笔记的继续。主要介绍MIT 6.824课程lab2的一些心得。

lab2主要是实现Raft。

1. lab2介绍

lab2分为了2A 2B 2C三个部分,对应的需要实现Raft里Leader election, Log replication, Persist部分。

相比lab1的难度高了许多,需要更多的考虑锁、线程以及一些race conditon,因为不再单纯的划分为master和worker,每个进程都在三种角色间转换。

2. Raft介绍

Raft是解决多个实例上的日志一致性算法,在Raft论文里分为了leader election, log replication两部分,同时还介绍了snapshot以及集群实例数变更的方案。

本文主要借助MIT的6.824课程的lab2部分介绍leader election, log replication的实现。具体原理介绍、论文的解读会再下一篇笔记里分析。

我的实现代码放在了github上。论文里一些重要的点都尽量添加到了注释里。

每个测试运行3min左右,跑了1000次,全部pass,测试脚本为test.sh

3. lab2实战

尽管Raft确实相比Paxos在理解性上强了很多,但是完成lab2的各项测试在我看来依然不简单。

student guide部分给出的上手建议是这样的:

At first, you might be tempted to treat Figure 2 as sort of an informal guide; you read it once, and then start coding up an implementation that follows roughly what it says to do. Doing this, you will quickly get up and running with a mostly working Raft implementation. And then the problems start.

个人建议同样是尽快首先按照自己的理解实现2A要求的部分,也就是leader election。读懂test,然后不断改进。

整体上,关注3个状态:leader candidate follower,以及两个RPC的实现:AppendEntries RequestVote。

代码上,type Raft struct就是我们要实现的数据结构。需要严格按照Figure 2来实现。

其中2A 2B 2C分别需要实现leader election, log replication, persist部分。

3.1 整体结构

以下几个接口在外部测试用到:

Make构造一个Raft*实例后返回
Start由测试程序调用,传入新的command,也就是日志数据
GetState获取程序的term,role
Kill用于关闭这个实例,在lab里用来模仿实例退出

测试程序位于test_test.go,例如第一个测试程序TestInitialElection2A

func TestInitialElection2A(t *testing.T) {
    servers := 3
    //调用Make启动servers个Raft*
    cfg := make_config(t, servers, false)
    defer cfg.cleanup()

    // is a leader elected?
    //检查是否有leader选举出来
    cfg.checkOneLeader()

    // does the leader+term stay the same if there is no network failure?
    //查看当前term
    term1 := cfg.checkTerms()
    //等待一段时间
    time.Sleep(2 * RaftElectionTimeout)
    //检查term是否发生改变(用于检测网络正常情况下是否有乱选举的情况)
    term2 := cfg.checkTerms()
    if term1 != term2 {
        fmt.Printf("warning: term changed even though there were no failures")
    }

    fmt.Printf("  ... Passed\n")
}

因此,读懂test_test.go以及config.go是很有必要的。

2C的实现上,从Figure 2可以清晰的看到需要持久化什么数据,因此在要修改这些数据前持久化即可。注意一个原则是

Updated on stable storage before responding to RPCs

实现上根据测试用例逐步推进,例如2A的实现里先不用考虑论文里提到的Election restrction

同时测试会越来越严格,对一些corner case也都能覆盖到(测试使用了很多随机sleep/disconnect/latency等行为,因此需要多次运行验证),这大概就是TDD :sweat_smile:

3.2 Raft结构

Raft一共有三种角色:

  1. Leader:负责与client交互,同步数据到其他实例。
  2. Follower:leader的从属角色,不主动发起RPC,负责响应其他实例的RPC:leader选举,日志复制等。
  3. Candidate:leader与follower的中间角色,如果follower一段时间内没有收到leader的心跳包则转为Candidate;如果Candidate获得大多数其他实例的选举投票,则转为leader。

每个Raft进程都可能在这三种角色间转换。

3.3. Follower

Follower是所有Raft实例启动时默认的角色。

Follower的行为要简单些,主要关注两种情况:

  1. 是否有新的communication,有则重置time out的timer。如果收到更高term的RPC,投票对象修改为RPC里的实例id。
  2. 是否time out,如果timeout则转为candidate

注: Follower的实现上要额外注意timer何时更新:

  1. Candidate发起Request-Vote,收到对方的reply后,如果发现reply.Term比自身term大,此时转为Follower。但是不应当重置timer
  2. 收到RequestVote,如果request.Term比term大,则转为Follower。但是只有判断log inconsistency后,才决定投票给对方,这种条件下才能重置timer
  3. 收到AppendEntries,如果request.Term比term大,才转为Follower,重置timer。
func (rf* Raft) BeFollower() {
    DPrintf("[BeFollower] me:%d before for looooooooop", rf.me)
    rf.role = follower

    for {
        DPrintf("[BeFollower] me:%d begin wait select", rf.me)

        select {
        case v := <- rf.changeToFollower:
            //A server remains in follower state as long as it receives valid RPCs from a leader or candidate.
            // continue BeFollower with another leader(maybe)
            DPrintf("[BeFollower] me:%d CurrentTerm:%v changeToFollower:%v", rf.me, rf.CurrentTerm, v)
            if v.term > rf.CurrentTerm {
                go rf.TransitionToFollower(v)
                return
            }
            rf.changeToFollowerDone <- true
            if v.shouldResetTimer {
                rf.timeout.Reset(time.Duration(rf.ElectionTimeout()) * time.Millisecond)
            }
        case <- rf.timeout.C:
            //If a follower receives no communication over a period of time called the election timeout,
            //then it assumes thers is no viable leader and begins an election to choose a new leader.
            DPrintf("[BeFollower] me:%d timeout", rf.me)
            go rf.BeCandidate()
            return
        case <- rf.receivedQuit:
            DPrintf("[BeFollower] me:%d quit", rf.me)
            return
        }
    }
}

3.4 Candidate

Candidate是一个中间角色,当Follower time out后转变为Candidate角色,参与选举,如果选举成功则转为Leader。

主要关注三种情况:

  1. 选举结果:胜出则转为leader
  2. 选举结果:超时则认为split vote,重新发起选举。
  3. 如果收到更高term的RPC,转为Follower。

注:StartElection即发起选举。

func (rf *Raft) BeCandidate() {
    DPrintf("[BeCandidate] me:%v begin.", rf.me)
    rf.role = candidate
    for {
        vote_ended := make(chan bool, len(rf.peers))
        go rf.StartElection(vote_ended)
        rf.timeout.Reset(time.Duration(rf.ElectionTimeout()) * time.Millisecond)

        select {
        case v := <- rf.changeToFollower:
            //If AppendEntries RPC received from new leader:convert to follower
            DPrintf("[BeCandidate] me:%d changeToFollower:%v", rf.me, v)
            go rf.TransitionToFollower(v)
            return
        case <- rf.receivedQuit:
            DPrintf("[BeCandidate] me:%d quit", rf.me)
            return
        case win := <- vote_ended:
            DPrintf("[BeCandidate] me:%d CurrentTerm:%v win:%v", rf.me, rf.CurrentTerm, win)
            //If vote received from majority of servers:become leader
            if win {
                go rf.BeLeader()
                return
            }
        case <- rf.timeout.C:
            //If election timeout elapses:start new election
            DPrintf("[BeCandidate] election timeout, start new election. me:%v CurrentTerm:%v", rf.me, rf.CurrentTerm)
        }
    }
}

3.5 Leader

Leader负责同步数据到其他server,保证数据的一致性。当Candidate收到超过半数以上server的投票时,则胜出,成为Leader。

Leader的主要任务:

  1. 接收来自client的数据,更新到自己的Log。
  2. 定期发送LogEntry到其他server,无论是否包含了Log数据。
  3. 根据其他server响应LogEntry的结果,记录已经更新commit的Log。

注:

  • leader初始化nextIndex统一为自身的len(logs)

    When a leader first comes to power, it initializes all nextIndex values to the index just after the last one in tis log.

  • 在我的实现里,使用matchIndex[rf.me]替代了lastApplied
func (rf *Raft) BeLeader() {
    //异步避免 AE/RV里get lock后尝试push channel
    //而这里尝试getlock后才pop channel
    go func() {
        rf.mu.Lock()
        defer rf.mu.Unlock()
        if rf.role != candidate {
            return
        }
        //When a leader first comes to power
        //it initializes all nextIndex values to the index just after the last one in its log.
        for i := 0; i < len(rf.nextIndex); i++ {
            rf.nextIndex[i] = len(rf.Logs)
        }
        rf.matchIndex[rf.me] = rf.nextIndex[rf.me] - 1

        //放在最后一步,在rf.SendLogEntryMessageToAll前判断是否是leader角色
        rf.role = leader
    }()

    for {
        select {
        case v := <- rf.changeToFollower:
            //turn to follower
            DPrintf("[BeLeader] me:%d changeToFollower:%v", rf.me, v)
            go rf.TransitionToFollower(v)
            return
        case <- rf.receivedQuit:
            DPrintf("[BeLeader] me:%d quit", rf.me)
            return
        default:
            DPrintf("[BeLeader] me:%d default. rf.role:%v", rf.me, rf.role)
            //等待直到leader状态初始化完成
            if rf.role == leader {
                //Upon election: send initial empty AppendEntries RPCs(heartbeat) to each server;
                //repeat during idle periods to prevent election timeouts.
                rf.SendLogEntryMessageToAll()
                //Hint: The tester requires that the leader send heartbeat RPCs no more than then times persecond
                time.Sleep(heart_beat_interval_ms * time.Millisecond)
            }
        }
    }
}

3.6 RequestVote RPC

在我看来,实现上最难的是两个RPC。因为RPC线程是我们无法控制的。

注意:

  1. 对于日志up-to-date的判断,先判断term,再判断长度。
  2. 当收到更高term的RPC时,设置currentTerm并转为follower。这里也是对我来讲实现上比较复杂的地方,比如对于Candidate,为了更高效的选举完成,会使用currentTerm并行发送RequestVote到其他实例,这时候就需要避免使用修改后的currentTerm构造RequestVote的参数。作为一个noob goer,我额外使用了一个channel用来保证两个线程中间不会有其他线程获取锁。相关实现可以参考PushChangeToFollower函数。
  3. 2A的测试实现时先不用考虑日志最新的问题。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // Your code here (2A, 2B).
    DPrintf("[RequestVote] me:%v currentTerm:%v args:%v", rf.me, rf.CurrentTerm, args)
    rf.mu.Lock()
    defer rf.mu.Unlock()

    //1. Reply false if term < currentTerm
    if args.Term < rf.CurrentTerm {
        reply.Term = rf.CurrentTerm
        reply.VoteGranted = false
        return
    }

    //2. if votedFor is null or candidateId, and candidate's log is at least as up-to-date as receiver's log, grant vote.
    //Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs.
    //If the logs have last entries with different terms, then the log with the later term is more up-to-date.
    //If the logs end with the same term, then whichever log is longer is more up-to-date.
    lastLogIndex := len(rf.Logs) - 1
    lastLogTerm := rf.Logs[lastLogIndex].Term

    if args.LastLogTerm < lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex < lastLogIndex) {
        reply.Term = args.Term
        reply.VoteGranted = false
        DPrintf("[RequestVote] not up-to-date me:%d args:%v", rf.me, args)
    } else if rf.CurrentTerm < args.Term {
        reply.Term = args.Term
        reply.VoteGranted = true
        DPrintf("[RequestVote] me:%d votedFor:%d VoteGranted:true", rf.me, rf.VotedFor)
    } else {
        reply.Term = rf.CurrentTerm
        reply.VoteGranted = false
    }

    //If RPC request or response contains term T > currentTerm,
    //set currentTerm = T, convert to follower
    if rf.CurrentTerm < args.Term {
        var changeToFollower ChangeToFollower = ChangeToFollower{args.Term ,args.CandidateId, reply.VoteGranted}
        DPrintf("[RequestVote] me:%d changeToFollower:%v", rf.me, changeToFollower)
        rf.PushChangeToFollower(changeToFollower)
    }
}

3.7 AppendEntries RPC

AppendEtries的实现要更加复杂一些。

复杂主要体现在两点:

  1. log inconsistencies的判断,follower的日志可能比leader更多,此时需要inconsistencies的判断如何响应该leader的日志
  2. 更新自身的commitIndex

注: 论文里提到了一些log inconsistencies后的优化方法,以保证尽快与leader的log一致,但是认为这个优化不一定需要。在lab2里必须实现这点。因此我们的AppendEntries RPC的Result跟Figure2相比,增加了两个参数

type AppendEntriesReply struct {
    //2A
    Term int
    Success bool

    ConflictIndex int
    ConflictTerm int
}

AppendEntries RPC的实现:

func (rf *Raft) AppendEntries(request *AppendEntriesArgs, response *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    DPrintf("[AppendEntries] me:%d currentTerm:%v received AppendEntriesArgs:%v", rf.me, rf.CurrentTerm, request)

    //1. Reply false if term < currentTerm
    if rf.CurrentTerm > request.Term {
        response.Term = rf.CurrentTerm
        response.Success = false
    } else {
        log_len := len(rf.Logs)
        //2. Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm
        log_is_less := log_len < request.PrevLogIndex + 1
        log_dismatch := !log_is_less && request.PrevLogIndex > 0 && (rf.Logs[request.PrevLogIndex].Term != request.PrevLogTerm)
        if log_is_less || log_dismatch {
            response.Term = rf.CurrentTerm
            response.Success = false

            if log_is_less {
                response.ConflictTerm = -1
                response.ConflictIndex = log_len
            } else if log_dismatch {
                response.ConflictTerm = rf.Logs[request.PrevLogIndex].Term
                for i := 0; i < log_len; i++ {
                    if rf.Logs[i].Term == response.ConflictTerm {
                        response.ConflictIndex = i
                        break
                    }
                }
            }
        } else {
            //3. If an existing entry conflicts with a new one(same index but different terms), delete the existing entry and all that follow it.
            if len(rf.Logs) - 1 != request.PrevLogIndex {
                rf.Logs = rf.Logs[:request.PrevLogIndex + 1]
            }
            //4. Append any new entries not already in the log
            rf.Logs = append(rf.Logs, request.Entries...)

            last_commit_index := rf.commitIndex
            //5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
            last_new_entry := len(rf.Logs) - 1
            if last_new_entry > request.LeaderCommitIndex {
                rf.commitIndex = request.LeaderCommitIndex
            } else {
                rf.commitIndex = last_new_entry
            }
            rf.NotifyApplyCh(last_commit_index);

            response.Term = rf.CurrentTerm
            response.Success = true
        }

        //if RPC request or response contains term T > currentTerm:
        //set currentTerm = T, convert to follower
        var changeToFollower ChangeToFollower = ChangeToFollower{request.Term, request.LeaderId, true}
        DPrintf("[AppendEntries] me:%v changeToFollower:%v response:%v log_is_less:%v log_dismatch:%v", rf.me, changeToFollower, response, log_is_less, log_dismatch)
        rf.PushChangeToFollower(changeToFollower)
    }
    DPrintf("[AppendEntries] me:%d currentTerm:%d votedFor:%d", rf.me, rf.CurrentTerm, rf.VotedFor)
}

注意调用了NotifyApplyCh来push数据到ApplyMsg

func (rf *Raft) NotifyApplyCh(last_commit int) {
    commitIndex := rf.commitIndex
    //持久化,注意这里不判断commitIndex != last_commit才persist
    //因为对follower而言,这次AppendEntriesArgs可能是新的日志 + 新日志之前的commitIndex
    //在follower返回true之后,leader commit了新的日志,如果follower不持久化而重启导致丢失了这些新的log
    //之后leader重启,该follower同意选取其他日志较少的leader,新的leader可能覆盖掉之前commit的内容。
    rf.persist()

    //这里不能设置为异步push数据到rf.applyCh
    //因为同一server可能连续调用两次,而config里的检查需要对Index有顺序要求
    for i := last_commit + 1; i <= commitIndex; i++ {
        DPrintf("[NotifyApplyCh] me:%d push to applyCh, Index:%v Command:%v", rf.me, i, rf.Logs[i].Command)
        rf.applyCh <- ApplyMsg{Index:i, Command:rf.Logs[i].Command}
    }
}

4. 一些TIPS

作为分布式系统的入门菜鸟一枚 + go的初学者,应该还有一些对论文理解不充分的地方,而且相信即使go test 1000次也会有有些corner case没有测试到的情况。

包括在实现过程中采用了很多trick的方案:shit:,经常压抑不住内心的洪荒之力想要重构之前的代码。不过由于时间原因,还是只好基于最初的版本,一点点的修改了出来。这里分享一下踩过的坑,也欢迎留言讨论。

  • 日志要清楚:因为进程可能在三种角色间转换,日志量会很大,可以参考下golang的包是否有更好的trace方式,我在每条日志里都会打印rf.me,通过grep me:1可以看到重点分析一个实例的日志。

  • 并行发送LogEntry的注意点:

go rf.SendLogEntryToAllServer

func SendLogEntryToAllServer:
    for i := 0; i < ...; i++:
        go rf.SendLogEntryToSingleServer

注意这样实现后,异步发送时term可能已经修改(比如其他candidate采用了更高的term尝试选举),此时应当使用原term或者停止发送LogEntry,因为相当于是用一个自己未win的term去发送LogEntry,可能导致其他server强制转为你的follower(AppendEntry并不会做up-to-date的日志判断),而你的日志不一定是up-to-date的,甚至丢失了某些commit的日志。

  • channel可能堵塞的问题

例如这样的写法会导致deadlock

//routine-1
rf.mu.Lock()
defer rf.mu.Unlock()

rf.changeToFollower <- true

//routine-2
rf.mu.Lock()
defer rf.mu.Unlock()

select {
    case <- rf.changeToFollower:
        ...
}

改变顺序,也不一定能解决问题

//routine-1
rf.mu.Lock()
defer rf.mu.Unlock()

rf.followerAppendEntries <- true

//routine-2
select {
    case <- rf.followerAppendEntries:
    ...
}

rf.mu.Lock()
rf.mu.Unlock()

考虑这种情况:

  1. routine-1抢到锁,执行rf.followerAppendEntries <- true
  2. routine-2触发条件case <- rf.followerAppendEntries,尝试抢锁
  3. routine-1又执行到,抢到锁,执行rf.followerAppendEntries <- true,而2里需要先抢到锁才能返回继续触发case <- rf.followerAppendEntries
    此时又出现了第一种写法的问题
  • defer的生效时间

最开始这么写,以为defer跟C++里的scoped_ptr是相同的,退出作用域时生效。后来才发现是函数return时执行。

{
    rf.mu.Lock()
    defer rf.mu.Unlock()

    ...
}

...
  • ApplyMsg的作用

在2B开始实现log replication后,需要及时更新到ApplyMsg,主要用于config.go里获取该Raft实例已经commit的日志。不直接取log[]的原因是为了检测:对同一index,是否在不同时刻commit了不同的数据。

  • RPC参数大写

虽然从一开始就注意到了这点:

Go RPC sends only struct fields whose names start with capital letters. Sub-structures must also have capitalized field names (e.g. fields of log records in an array). Forgetting to capitalize field names sent by RPC is the single most frequent source of bugs in these labs.

但是还是一不小心踩坑了,因为没有注意到Sub-structures。例如AppendEntriesArgs这么定义:

//AppendEntries
type AppendEntriesArgs struct {
    //2A
    Term int
    LeaderId int

    //2B
    PrevLogIndex int
    PrevLogTerm int
    Entries []LogEntry
    LeaderCommitIndex int
}

那么LogEntry的各个成员需要首字母大写

//LogEntry是AppendEntriesArgs的成员,因此需要大写首字母
type LogEntry struct {
    Command interface{}
    Term int
}
  • RPC的顺序

这方面需要注意的很多,比如server1在term=1时发送AppendEntries,term=3时再次选为leader发送AppendEntries,这两次可能都在term=3时收到,注意对应term=1的response应当忽略掉。