MIT 6.824 Distributed Systems Lab-1 笔记

#Distributed-Systems #6.824 #mapreduce

MIT的6.824这门课程放在收藏夹里很久了,最近终于趁着周末学习了下。

这门课主要是讲分布式系统的,课程主页在https://pdos.csail.mit.edu/6.824/,我学习的是2017版本。

网上全部课程由23节课,4个lab组成,校内课程还包括exam和project.

课程主要目的主要是跟进各节课,看论文,完成实验,在动手过程中了解分布式系统。

每个lab都会有对应的几节课,课上讲一些论文、go语言、分布式相关的知识,以帮助我们更好的完成lab。这个笔记系列主要是分享做lab的一些心得。

1. 如何开始

日历里的第一个lab就是mapreduce。

建议的姿势是首先看下上面链接里的mapreduce论文 by Jeffrey Dean and Sanjay Ghemawa,这两位G家大神的名字如雷贯耳,在protobuf源码里也经常看到。论文完整的介绍了mapreduce的原理、实现方案、优化点以及一些示例等。lab的目标也是跑通mapreduce的流程并实现部分示例。因此读几遍还是很有必要的,搞清楚一些疑问:例如mapper产生了多少个中间文件?task有任务失败怎么处理?哪些操作要求是原子的等,这些疑问都可以自行网上找到答案。

然后需要了解下Go语言,这门课程接下来的lab全部使用go语言实现,建议的文档是这篇An Introduction to Programming In GO,篇幅比较短小,有些语法、Go特性等都可以用到时再查询。

接下来就是lab1了,分为五个part。实际上每个part我们都只需要实现指定的几个函数就可以了。原始代码可以从链接里通过git获取,完成后按照提示执行测试,能够通过就表明函数写的没有问题。

lab1的要求是在第四节课之后完成,不过目前mr理论已经足够成熟了,因此做起来不会太费劲。如果对rpc、并发这些不太理解,也可以先了解下lec2里的两个例子:cralwer(并发) kv(rpc)。

2. mapreduce简介

注:简介内容全部摘抄自上述论文

mapreduce的想法实际上是来自于Lisp语言里的map reduce。目的是为了简化分布式系统代码的门槛,例如对于单词计数这个需求,用户可以不关心有多少台服务器在运行这个程序,只需要实现map reduce函数:

map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
    result += ParseInt(v);
Emit(AsString(result));

其中map产生key value的中间文件,同一个key只会交给一个reduce任务完成。这些任务分发的工作由master完成。

Figure 1: Execution overview

从网上找了一个论文里的图:Figure 1: Execution overview,其中worker封装了文件的读写,以及用户map/reduce函数的调用,搞清楚整个数据流后,lab完成的会轻松些。

3. lab

lab1分为5个part,代码放在里我的github上,这里逐个part说下思路和一些注意点。建议自己动手完成,作者还是很用心的,毕竟看了论文再实现一遍的机会并不多。

lab做的过程可以完全参考教材,其中也有很多有用的go语法的提示,对我这种go新手来说也足够了。

好,废话了一大堆,只是想说明这个过程并不难,有兴趣的读者可以动手自己试下,有问题也可以留言。

源码介绍

git clone下来的源码与lab1相关的主要在src/mapreduce src/main下,实现了一个小型的mapreduce系统:

整个mapreduce流程由SequentialDistributed两种方式,其中Sequential串行执行每个任务,不需要引入rpc,复杂度低,用来了解流程用,前两个part都使用的这种方式。RPC则通过调用worker.DoTask分发并调用任务,

  1. 为了使用mapreduce,应用方提供:input文件、map/reduce函数、map/reduce个数等
  2. master启动一个RPC-server,等待worker注册,schedule负责分发map or reduce任务到注册的worker,并处理失败的任务。
  3. master调用doMap(直接调用或者rpc方式)读取传入的文件、调用map函数、产出key/value格式的中间文件,每个doMap产生nReduce个中间文件。因此一共产出nMap * nReduce个中间文件,形如mrtmp.xxx-x-x。通过简单的对key hash得到该key/value写到哪个文件。
  4. master接下来调用doReduce,读取nMap个文件,得到形如key [value1, value2, ...]的数据调用用户的reduce函数
  5. mr.merge merge nReduce个产出的文件
  6. 流程结束后,程序退出。

3.1. part I: Map/Reduce input and output

实现doMap doReduce两个函数:

  1. doMap:读取参数传入的文件,调用用户的mapF函数返回typedef struct KeyValue的数组,按照key hash后写到nReduce个文件,json格式内容。
  2. doReduce:读取nMap个文件,得到一组{key, [value…]}的数据,即相同key的所有value数组,调用用户的reduceF函数写入到1个文件,json格式内容。

go关键字:ioutil map slice json
mr关键字:

  1. map/reduce输入、输出的内容,文件个数
  2. map如何判断某个key写到哪个中间文件

3.2. Part II: Single-worker word count

实现用于单词统计的mapF reduceF,这也是论文里提到的经典题目

I里会调用到mapF reduceF,输入输出都比较明确,这个实现就比较简单了。

  1. mapF:参数分别为文件名和文件内容,遍历内容生成typedef struct KeyValue的数组返回即可,Key为单词,Value为统计的次数。判断单词可以使用FiledsFunc,我没有注意逐个下标判断的,也PASS了。
  2. reduceF:参数为单词和各文件出现次数的数组,返回一个sum就好。

go关键字:strings unicode strconv

3.3. Part III: Distributing MapReduce tasks

I II的test都是Sequential的方式,这个part修改为分布式。单机运行,采用rpc模拟的方式。rpc的内容比较多,建议也还是看下前面文章里server相关的一点就好,再把这个hint里提到的关键字再了解下。

III主要目的是修复schedule函数,这个函数负责分发map or task任务给空闲的worker。

文档里介绍了很多,我这里说下调用的流程。

  1. 入口在TestBasic,启动了master和两个worker
  2. setup启动master,将输入文件split为nMap个文件,创建socket文件,调用Distributed产生newMaster
  3. Distributed启动rpc并调用newMaster.run,调用的行数较多,单独说一下
  4. RunWorker启动后注册到master,并在监听端口后等待master rpc调用DoTask传入任务,DoTask则根据参数调用I里实现的DoMap or DoReduce
//启动一个线程函数newMaster.run
//参数分别为:
//jobName: [in] "test"
//files: [in] split后的nMap个文件
//nreduce: [in] reduce后的文件个数
//func: 匿名函数,输入是phase(map or reduce)
//func: 匿名函数,
|   go mr.run(jobName, files, nreduce,
|   |   func(phase jobPhase) {
|   |   |   ch := make(chan string)
|   |   |   go mr.forwardRegistrations(ch)
|   |   |   schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
|   |   },
|   |   func() {
|   |   |   mr.stats = mr.killWorkers()
|   |   |   mr.stopRPCServer()
|   |   })

newMaster.run则会调用这两个匿名函数

func (mr *Master) run(jobName string, files []string, nreduce int,
|   schedule func(phase jobPhase),
|   finish func(),
) {
|   mr.jobName = jobName
|   mr.files = files
|   mr.nReduce = nreduce

|   fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

|   schedule(mapPhase)
|   schedule(reducePhase)
|   finish()
|   mr.merge()

|   fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

|   mr.doneChannel <- true
}

可以看到调用流程上是先schedule(mapPhase)schedule(reducePhase),这里的schedule实际上就是Distribute参数里第一个匿名函数,注意我们要是实现的是匿名函数func里的schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)

schedule主要就是分发任务,调用Worker.DoTask通知worker要做的任务(同步rpc call?),DoTask则实际调用了doMap doReduce。前面提到了启动两个worker,因此注意分发任务时不能hold。

go关键字:go WaitGroup channel rpc mr关键字:master schedule async

  1. registerChan初始化ch := make(chan string),大小为0,因此注意异步写入。
  2. master创建socket是在/var/tmp/下,注意权限问题,这个因为不懂go的rpc折腾了好久。

这里我有一个go test -race -run TestBasic > out的测试一直没有通过,时间原因也没有研究go的race detector,这块可能的原因有兴趣的同学可以分享下。

3.4. Part IV: Handling worker failures

这个part主要接着III处理,注意call(..., "Worker.DoTask", ...)可能会失败,需要处理这种情况。

mr关键字:参考论文Fault Tolerance -> Worker Failure一节,简单讲就是重试o(╯□╰)o

3.5 Part V: Inverted index generation (optional for extra credit)

这也是论文提到的倒排的一个例子,因此跟II很像,需要实现ii.go里的mapF reduceF

  1. mapF:输出typedef struct KeyValue格式的数组,key为单词,value为文档名。
  2. reduceF:输入为(key string, values []string),其中key文档次,value为所有的文档名,注意可能重复,因此需要去重,同时文档需要按照字母序排序。

go关键字:map strings strconv

运行sh ./test-mr.sh可以跑下lab1所有part的测试。

4. 总结

这个lab从schedule分发任务,到doMap/doReduce调用用户函数,到mapF/reduceF实现用户函数,整个做下来对mapreduce流程可以有一个更详细的了解,对go语言也有一个入门。