本文记录了Lab 1: MapReduce 的实验过程

Lab 记录

  • Coordinator(也就是论文中的 Master ):负责将 Map 和 Reduce 任务分配给 Worker 进程,同时还需要处理 Worker 进程在执行任务失败的情况
  • Worker:worker 会通过 RPC 向 coordinator 请求一个 Task,然后 coordinator 会相应这个请求,带回该任务的相关信息(例如,在 wc 程序中会返回需要读取的文件名等信息),然后根据该该任务的类型执行 Map 或者 Reduce 函数或是结束运行(任务全部完成)
  • RPC:Coordinator 进程和 Worker 进程进行通信的工具,我们需要定义二者交换的信息

要求:

  • Map 阶段:将每个词存放在对应的 buckets 中(已提供哈希函数),一共 nReduce 个 reduce 任务
  • Worker 应该将第 X 个 reduce 任务的输出结果保存到文件 mr-out-X
  • mr-out-X 文件每一行包含一个 Reduce 函数的输出,格式为 %v %v。可以参考文件 main/mrsequential.go
  • worker 在执行 Map 程序时应该将中间结果输出到当前目录的文件中,之后 worker 会读取这些文件作为 Reduce 任务的输入
  • 实现 Done() 方法,当 MapReduce 任务结束的时候返回 true,此时 mrcoordinator.go 会终止
  • 使用 call() 方法的返回值来判断 worker 进程是否结束。如果 worker 进程无法和 coordinator 进程联系(例如 coordinator 进程已经退出了),那么 worker 进程也应该终止。在设计中,我们或许可以为 worker 进程设置一个完成任务

实现

  • mr/coordinator.go:给 worker 分配 Map 或 Reduce 任务
  • mr/worker.go:执行 Map 或者 Reduce 任务
  • mr/rpc.go:Coordinator 和 Worker 之间的通信

RPC

​ 在 mr/rpc.go 文件中,我们首先需要定义每个任务的结构体,Worker 向 Coordinator 发送的参数,Coordinator 向 Worker 返回的参数;最后就是 mapreduce 任务产生的中间文件或者最终文件的格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// MapReduce 任务的三个阶段
const (
MAP = "Map"
REDUCE = "Reduce"
COMPLETE = "Complete"
)

type Task struct {
TaskType string // 任务的类型
TaskIndex int // 任务的编号
Filename string // map 需要读取的文件名
WorkerId string // 线程的 ID,即哪个线程执行了这个任务
Deadline time.Time // 该任务分配之后,经过多少秒之后还未完成的话便需要将这个任务重新分配
}

type Args struct {
WorkerId string // 线程的 ID,即哪个线程向 Coordinator 请求任务
LastTaskType string // 该线程上一个执行的任务类型,如果没有的话 LastTaskType = ""
LastTaskIndex int // 该线程上一个执行的任务编号
}

type Reply struct {
Filename string // map 任务需要读取的文件名
TaskType string // 任务的类型
TaskIndex int // 任务的编号
NMap int // 一共 NMap 个 map 任务
NReduce int // 一共 NReduce 个 reduce 任务
}

func TempMapOutFile(workerId string, mapIndex, reduceIndex int) string {
return fmt.Sprintf("tmp-worker-%s-%d-%d", workerId, mapIndex, reduceIndex)
}
func FinalMapOutFile(mapIndex, reduceIndex int) string {
return fmt.Sprintf("map-out-%d-%d", mapIndex, reduceIndex)
}
func TempReduceOutFile(workerId string, reduceIndex int) string {
return fmt.Sprintf("tmp-worker-%s-out-%d", workerId, reduceIndex)
}
func FinalReduceOutFile(reduceIndex int) string {
return fmt.Sprintf("mr-out-%d", reduceIndex)
}

Worker

​ 在 Worker 函数中,我们需要不断地向 Coordinator 请求任务,然后根据获得的任务类型来调用 mapTask() 或者 reduceTask() 函数来完成这个任务,同时我们还需要记录自己上一个完成的任务以便 Coordinator 将临时文件更新为最终文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// KeyValue
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}

// for sorting by key.
type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
var lastTaskType string
var lastTaskIndex int
id := strconv.Itoa(os.Getpid())
for {
// Worker 向 coordinator 请求一个任务
args := Args{
WorkerId: id,
LastTaskType: lastTaskType,
LastTaskIndex: lastTaskIndex,
}
reply := Reply{}
call("Coordinator.RpcHandler", &args, &reply)
log.Printf("Worker: receive Coordinator's reply %v\n", reply)
switch reply.TaskType {
case MAP:
mapTask(mapf, &reply, id)
case REDUCE:
reduceTask(reducef, &reply, id)
case WAIT:
time.Sleep(1 * time.Second)
case COMPLETE:
log.Printf("MR Task finished! Worker exit.\n")
return
}
// 记录上一个完成的任务
lastTaskType = reply.TaskType
lastTaskIndex = reply.TaskIndex
}
}

func mapTask(mapf func(string, string) []KeyValue, reply *Reply, workerId string) {
filename := reply.Filename
log.Printf("Worker-%s get a map task, read file content from: %v\n", workerId, filename)
//
// 读取文件的内容(参考 mrsequential.go)
//
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Map Task: cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("Map Task: cannot read %v", filename)
}
err = file.Close()
if err != nil {
log.Fatalf("Map Task: cannot close %v", filename)
}
//
// 调用 mapf 函数,产生中间数据,然后把这些数据划分到 nReduce 个桶中
//
kva := mapf(filename, string(content))
buckets := make(map[int][]KeyValue)
for _, kv := range kva {
index := ihash(kv.Key) % reply.NReduce
buckets[index] = append(buckets[index], kv)
}
//
// 这个采用 lab 给的提示
// 将中间数据写入到对应的的临时文件上,在该 map 任务完成后会将临时的 map 文件原子地命名为最终 map 产生的文件
//
for i := 0; i < reply.NReduce; i++ {
ofile, _ := os.Create(TempMapOutFile(workerId, reply.TaskIndex, i))
for _, kv := range buckets[i] {
_, err := fmt.Fprintf(ofile, "%v\t%v\n", kv.Key, kv.Value)
if err != nil {
log.Printf("Map Task: can't write some data into file " + ofile.Name())
}
}
err := ofile.Close()
if err != nil {
log.Printf("Map Task: can't close file " + ofile.Name())
}
}
}

func reduceTask(reducef func(string, []string) string, reply *Reply, workerId string) {
var lines []string
//
// 读取 map 任务产生的所有中间数据
//
for i := 0; i < reply.NMap; i++ {
filename := FinalMapOutFile(i, reply.TaskIndex)
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Reduce Task: cannot read %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("Reduce Task: cannot read %v", filename)
}
err = file.Close()
if err != nil {
log.Fatalf("Reduce Task: cannot close %v", filename)
}
lines = append(lines, strings.Split(string(content), "\n")...)
}
//
// 添加每个 key 同时对它们进行排序
//
var kva []KeyValue
for _, line := range lines {
if strings.TrimSpace(line) == "" {
continue
}
seg := strings.Split(line, "\t")
kva = append(kva, KeyValue{Key: seg[0], Value: seg[1]})
}
sort.Sort(ByKey(kva))
//
// 这段代码参考自 mrsequential,统计每个唯一的 key 出现的次数,然后将它们写入到临时的 reduce 文件中
//
ofile, _ := os.Create(TempReduceOutFile(workerId, reply.TaskIndex))
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
_, err := fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
if err != nil {
log.Printf("Map Task: can't write some data into file " + ofile.Name())
}
i = j
}
err := ofile.Close()
if err != nil {
log.Printf("Map Task: can't close file " + ofile.Name())
}
}

Coordinator

对于 Coordinator 来说,首先它需要初始化,例如设置初始的状态,设置 map 任务等;然后还需要处理来自 Worker 的请求,在mapreduce 任务完成之后需要对相应的临时文件进行原子地重命名;在 map 阶段全部完成后切换到 reduce 阶段等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// 首先定义 Coordinator 需要的信息
type Coordinator struct {
// Your definitions here.
mux sync.Mutex
stage string
nMap int
nReduce int
tasks map[string]Task
availableTask chan Task
}

// Coordinator 是否可以结束
func (c *Coordinator) Done() bool {
c.mux.Lock()
defer c.mux.Unlock()
return c.stage == COMPLETE
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}

// Your code here.
c.stage = MAP
c.nMap = len(files)
c.nReduce = nReduce
c.tasks = make(map[string]Task)
c.availableTask = make(chan Task, int(math.Max(float64(len(files)), float64(nReduce))))
// 产生 map 任务
for index, file := range files {
task := Task{
TaskType: MAP,
TaskIndex: index,
Filename: file,
}
c.tasks[fmt.Sprintf("%s-%d", task.TaskType, index)] = task
c.availableTask <- task
}
log.Printf("Coordinator start\n")

c.server()
//
// 为了避免 Worker 发生崩溃而导致某个任务无法完成,我们需要检测任务的状态
//
go func() {
for {
time.Sleep(500 * time.Millisecond)
c.mux.Lock()
for _, task := range c.tasks {
if task.WorkerId != "" && time.Now().After(task.Deadline) {
// 重新分配该任务
log.Printf("Find time-out %s task %d previously running on worker %s\n",
task.TaskType, task.TaskIndex, task.WorkerId)
task.WorkerId = ""
c.availableTask <- task
}
}
c.mux.Unlock()
}
}()
return &c
}

// 处理来自 Worker 的请求
func (c *Coordinator) RpcHandler(args *Args, reply *Reply) error {
// 如果上一个任务完成了(如果有的话),那么处理它
if args.LastTaskType != COMPLETE {
// 注意需要上锁
c.mux.Lock()
lastTaskId := fmt.Sprintf("%s-%d", args.LastTaskType, args.LastTaskIndex)
if task, existed := c.tasks[lastTaskId]; existed && task.WorkerId == args.WorkerId {
log.Printf("Mark %s task %d as finished on worker %s\n",
task.TaskType, task.TaskIndex, args.WorkerId)
// 重命名 Map 或 Reduce 任务产生的临时文件
if args.LastTaskType == MAP {
for i := 0; i < c.nReduce; i++ {
err := os.Rename(
TempMapOutFile(args.WorkerId, args.LastTaskIndex, i),
FinalMapOutFile(args.LastTaskIndex, i))
if err != nil {
log.Fatalf("Failed to mark map output file `%s` as final: %e",
TempMapOutFile(args.WorkerId, args.LastTaskIndex, i), err)
}
}
} else if args.LastTaskType == REDUCE {
err := os.Rename(
TempReduceOutFile(args.WorkerId, args.LastTaskIndex),
FinalReduceOutFile(args.LastTaskIndex))
if err != nil {
log.Fatalf("Failed to mark reduce output file `%s` as final: %e",
TempReduceOutFile(args.WorkerId, args.LastTaskIndex), err)
}
}
// 将该任务从字典中删除,如果 map 任务全部完成后,则开始 reduce 阶段的任务,或者到达完成阶段
delete(c.tasks, lastTaskId)
if len(c.tasks) == 0 {
c.changeStage()
}
}
c.mux.Unlock()
}

// 给 Worker 分配一个可用的任务
task, ok := <- c.availableTask
if !ok {
reply.TaskType = COMPLETE
return nil
}
c.mux.Lock()
defer c.mux.Unlock()
log.Printf("Assign %s task %d to worker %s\n", task.TaskType, task.TaskIndex, args.WorkerId)
task.WorkerId = args.WorkerId
task.Deadline = time.Now().Add(10 * time.Second)
c.tasks[fmt.Sprintf("%s-%d", task.TaskType, task.TaskIndex)] = task
reply.Filename = task.Filename
reply.TaskType = task.TaskType
reply.TaskIndex = task.TaskIndex
reply.NMap = c.nMap
reply.NReduce = c.nReduce
return nil
}

// 切换 Coordinator 的阶段
func (c *Coordinator) changeStage() {
if c.stage == MAP {
log.Printf("All Map tasks finished. Transit into Reduce stage.\n")
c.stage = REDUCE
// 产生 reduce 任务
for i := 0; i < c.nReduce; i++ {
task := Task{
TaskType: REDUCE,
TaskIndex: i,
}
c.tasks[fmt.Sprintf("%s-%d", task.TaskType, task.TaskIndex)] = task
c.availableTask <- task
}
} else if c.stage == REDUCE {
log.Printf("All Reduce tasks finished. Prepare to exit.\n")
c.stage = COMPLETE
close(c.availableTask)
}
}

测试

开三个窗口,一个窗口运行 Coordinator,另外两个窗口运行 Worker

1
2
3
4
5
# 改动代码后必须重新加载 wc 插件
cd ~/6.824/src/main
go build -race -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run -race mrcoordinator.go pg-*.txt
1
go run -race mrworker.go wc.so

最后运行提供的测试脚本

1
bash test-mr.sh