MIT的6.824这门课程放在收藏夹里很久了,最近终于趁着周末学习了下。
这门课主要是讲分布式系统的,课程主页在https://pdos.csail.mit.edu/6.824/,我学习的是2017版本。
网上全部课程由23节课,4个lab组成,校内课程还包括exam和project.
课程主要目的主要是跟进各节课,看论文,完成实验,在动手过程中了解分布式系统。
每个lab都会有对应的几节课,课上讲一些论文、go语言、分布式相关的知识,以帮助我们更好的完成lab。这个笔记系列主要是分享做lab的一些心得。
1. 如何开始
在日历里的第一个lab就是mapreduce。
建议的姿势是首先看下上面链接里的mapreduce论文 by Jeffrey Dean and Sanjay Ghemawa,这两位G家大神的名字如雷贯耳,在protobuf源码里也经常看到。论文完整的介绍了mapreduce的原理、实现方案、优化点以及一些示例等。lab的目标也是跑通mapreduce的流程并实现部分示例。因此读几遍还是很有必要的,搞清楚一些疑问:例如mapper产生了多少个中间文件?task有任务失败怎么处理?哪些操作要求是原子的等,这些疑问都可以自行网上找到答案。
然后需要了解下Go语言,这门课程接下来的lab全部使用go语言实现,建议的文档是这篇An Introduction to Programming In GO,篇幅比较短小,有些语法、Go特性等都可以用到时再查询。
接下来就是lab1了,分为五个part。实际上每个part我们都只需要实现指定的几个函数就可以了。原始代码可以从链接里通过git
获取,完成后按照提示执行测试,能够通过就表明函数写的没有问题。
lab1的要求是在第四节课之后完成,不过目前mr理论已经足够成熟了,因此做起来不会太费劲。如果对rpc、并发这些不太理解,也可以先了解下lec2里的两个例子:cralwer(并发) kv(rpc)。
2. mapreduce简介
注:简介内容全部摘抄自上述论文
mapreduce的想法实际上是来自于Lisp语言里的map reduce
。目的是为了简化分布式系统代码的门槛,例如对于单词计数这个需求,用户可以不关心有多少台服务器在运行这个程序,只需要实现map reduce
函数:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
其中map产生key value
的中间文件,同一个key只会交给一个reduce任务完成。这些任务分发的工作由master完成。
从网上找了一个论文里的图:Figure 1: Execution overview,其中worker封装了文件的读写,以及用户map/reduce
函数的调用,搞清楚整个数据流后,lab完成的会轻松些。
3. lab
lab1分为5个part,代码放在里我的github上,这里逐个part说下思路和一些注意点。建议自己动手完成,作者还是很用心的,毕竟看了论文再实现一遍的机会并不多。
lab做的过程可以完全参考教材,其中也有很多有用的go语法的提示,对我这种go新手来说也足够了。
好,废话了一大堆,只是想说明这个过程并不难,有兴趣的读者可以动手自己试下,有问题也可以留言。
源码介绍
git clone
下来的源码与lab1相关的主要在src/mapreduce src/main
下,实现了一个小型的mapreduce系统:
整个mapreduce流程由Sequential
和Distributed
两种方式,其中Sequential
串行执行每个任务,不需要引入rpc,复杂度低,用来了解流程用,前两个part都使用的这种方式。RPC则通过调用worker.DoTask
分发并调用任务,
- 为了使用mapreduce,应用方提供:input文件、map/reduce函数、map/reduce个数等
- master启动一个RPC-server,等待worker注册,
schedule
负责分发map or reduce任务到注册的worker,并处理失败的任务。 - master调用
doMap
(直接调用或者rpc方式)读取传入的文件、调用map
函数、产出key/value
格式的中间文件,每个doMap
产生nReduce个中间文件。因此一共产出nMap * nReduce
个中间文件,形如mrtmp.xxx-x-x
。通过简单的对key hash得到该key/value
写到哪个文件。 - master接下来调用
doReduce
,读取nMap个文件,得到形如key [value1, value2, ...]
的数据调用用户的reduce
函数 mr.merge
mergenReduce
个产出的文件- 流程结束后,程序退出。
3.1. part I: Map/Reduce input and output
实现doMap doReduce
两个函数:
doMap
:读取参数传入的文件,调用用户的mapF
函数返回typedef struct KeyValue
的数组,按照key hash后写到nReduce
个文件,json格式内容。doReduce
:读取nMap
个文件,得到一组{key, [value…]}的数据,即相同key的所有value数组,调用用户的reduceF
函数写入到1个文件,json格式内容。
go关键字:ioutil map slice json
mr关键字:
- map/reduce输入、输出的内容,文件个数
- map如何判断某个key写到哪个中间文件
3.2. Part II: Single-worker word count
实现用于单词统计的mapF reduceF
,这也是论文里提到的经典题目
I里会调用到mapF reduceF
,输入输出都比较明确,这个实现就比较简单了。
mapF
:参数分别为文件名和文件内容,遍历内容生成typedef struct KeyValue
的数组返回即可,Key为单词,Value为统计的次数。判断单词可以使用FiledsFunc,我没有注意逐个下标判断的,也PASS了。reduceF
:参数为单词和各文件出现次数的数组,返回一个sum就好。
go关键字:strings unicode strconv
3.3. Part III: Distributing MapReduce tasks
I II的test都是Sequential的方式,这个part修改为分布式。单机运行,采用rpc模拟的方式。rpc的内容比较多,建议也还是看下前面文章里server相关的一点就好,再把这个hint里提到的关键字再了解下。
III主要目的是修复schedule
函数,这个函数负责分发map or task
任务给空闲的worker。
文档里介绍了很多,我这里说下调用的流程。
- 入口在
TestBasic
,启动了master和两个worker setup
启动master,将输入文件split为nMap
个文件,创建socket文件,调用Distributed
产生newMasterDistributed
启动rpc并调用newMaster.run
,调用的行数较多,单独说一下RunWorker
启动后注册到master
,并在监听端口后等待master
rpc调用DoTask
传入任务,DoTask
则根据参数调用I里实现的DoMap or DoReduce
//启动一个线程函数newMaster.run
//参数分别为:
//jobName: [in] "test"
//files: [in] split后的nMap个文件
//nreduce: [in] reduce后的文件个数
//func: 匿名函数,输入是phase(map or reduce)
//func: 匿名函数,
| go mr.run(jobName, files, nreduce,
| | func(phase jobPhase) {
| | | ch := make(chan string)
| | | go mr.forwardRegistrations(ch)
| | | schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
| | },
| | func() {
| | | mr.stats = mr.killWorkers()
| | | mr.stopRPCServer()
| | })
newMaster.run
则会调用这两个匿名函数
func (mr *Master) run(jobName string, files []string, nreduce int,
| schedule func(phase jobPhase),
| finish func(),
) {
| mr.jobName = jobName
| mr.files = files
| mr.nReduce = nreduce
| fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)
| schedule(mapPhase)
| schedule(reducePhase)
| finish()
| mr.merge()
| fmt.Printf("%s: Map/Reduce task completed\n", mr.address)
| mr.doneChannel <- true
}
可以看到调用流程上是先schedule(mapPhase)
再schedule(reducePhase)
,这里的schedule
实际上就是Distribute
参数里第一个匿名函数,注意我们要是实现的是匿名函数func里的schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
schedule
主要就是分发任务,调用Worker.DoTask
通知worker要做的任务(同步rpc call?),DoTask
则实际调用了doMap doReduce
。前面提到了启动两个worker,因此注意分发任务时不能hold。
go关键字:go WaitGroup channel rpc
mr关键字:master schedule async
registerChan
初始化ch := make(chan string)
,大小为0,因此注意异步写入。master
创建socket是在/var/tmp/
下,注意权限问题,这个因为不懂go的rpc折腾了好久。
这里我有一个go test -race -run TestBasic > out
的测试一直没有通过,时间原因也没有研究go的race detector,这块可能的原因有兴趣的同学可以分享下。
3.4. Part IV: Handling worker failures
这个part主要接着III处理,注意call(..., "Worker.DoTask", ...)
可能会失败,需要处理这种情况。
mr关键字:参考论文Fault Tolerance -> Worker Failure一节,简单讲就是重试o(╯□╰)o
3.5 Part V: Inverted index generation (optional for extra credit)
这也是论文提到的倒排的一个例子,因此跟II很像,需要实现ii.go
里的mapF reduceF
。
mapF
:输出typedef struct KeyValue
格式的数组,key为单词,value为文档名。reduceF
:输入为(key string, values []string),其中key文档次,value为所有的文档名,注意可能重复,因此需要去重,同时文档需要按照字母序排序。
go关键字:map strings strconv
运行sh ./test-mr.sh
可以跑下lab1所有part的测试。
4. 总结
这个lab从schedule
分发任务,到doMap/doReduce
调用用户函数,到mapF/reduceF
实现用户函数,整个做下来对mapreduce流程可以有一个更详细的了解,对go语言也有一个入门。