MIT 6.824 Lab1 MapReduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets.

论文《MapReduce: Simplified Data Processing on Large Clusters》开篇第一句话就将明白了 MapReduce 的性质:用于处理或生成大数据的相关实现或编程模型

如果想看论文翻译请移步 MapReduce 中文翻译.

编程模型

整个计算过程,输入是一系列的 key/value 对,输出也是一系列的 key/value 对。这个计算过程叫做 map 与 reduce,其实就是 拆分与合并,其实很像 分而治之 的思想。

例如:把一辆汽车拆成各种零件,再把这些零件组装成一个变形金刚。

用户只要指定 map 函数和 reduce 函数就可以了,把剩下的交给 MapReduce 库去做就可以了。map 和 reduce 函数应该设计成这样:

  • map 函数接收输入的 key/value pairs,把它们变成 intermediate key/value pairs,MapReduce 库把所有 intermediate key = X (相同 key)的 pairs 集中起来传给 reduce 函数。
  • reduce 函数接受 intermeidate key = X (相同 key)的 pairs,然后通过某一种规则把他们 merge 起来,生成较少的输出;一般 reduce 函数针对某一个 key 仅仅产生 0 个或 1 个输出。它可以处理那些很大的文件,例如在内存中放不下的文件,它可以以迭代的方式读取并去做 reduce。

例如,word count 的 map 函数和 reduce 函数,用 go 就是下面这样实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}

分析以下这个 map 和 reduce 具体是做什么的:map 函数接受一个 key/value pair,其中 key 是 filename(文件名),value 是 contents(文件内容),输出是 {word, 1} 的数组;reduce 函数接受某一个 key 的所有 value,输出是一个 value。

那么 MapReduce 库是怎么让多个 machine 之间协同合作来一起完成 map 任务和 reduce 任务,最终完成所有任务的呢?这就要看一下 MapReduce 库的工作流程了。

MapReduce0.PNG

可以看到它是一个 master 多个 worker 的模式,master 不对任务进行处理,只对 worker 进行协调,worker 去执行具体的 map task 和 reduce task。worker 会以请求任务的方式向 master 要任务去做。这种模式有以下优劣:

  • 把整个库分成两个部分,master 和 worker,这使得整个工作流程清晰易懂。

  • 一个 master 而非多个 master,使得编程变得很容易,因为不需要考虑 master 之间的通信,一致性等问题;有好处就会有坏处,只有一个 master 会导致系统可用性,可靠性变差,如果 master 节点 down 了,那么整个服务就不可用了,也就是不具备容错能力

  • 多个 worker 使得 MapReduce 库的性能得到大大提高,处理 TB 级别的大量无依赖的数据时,将大大减少处理时间;同时将具有很好的扩展性。

可不可以用一句话来概括 MapReduce 得思想?


用 Go 具体实现 MapReduce 库

  • 一台 machine 上分配一个 Coordinator 出来,用于协调 worker 之间的工作,并回应 worker 的任务请求。

  • 多台 machine 上分配多个 worker 出来,向 Coordinator 索求任务(可以是 map task 也可以是 reduce task,Coordinator 给什么就做什么)。

  • 考虑 worker 处理太慢或者突然 down 掉的情况,Coordinator 需要重新分配任务。

  • 不考虑 Coordinator down 掉的情况。Coordinator 只会在所有任务都完成后退出,此时 Worker 与 Coordinator 的 RPC 通信会超时并返回错误,这时候就可以知道所有任务都结束了(或者 Worker 的网络状况出问题了),此时 Worker 只需要 exit 就行。

Coordinator 和 Worker 分别需要哪些数据结构?

Coordinator:

  • 需要知道最终有多少个 reduce task
  • 需要给 Worker 分配 id
  • 需要知道当前是 map 任务还是 reduce 任务
  • 需要知道是否所有任务都已经做完
  • 因为某些 Worker 可能 down 掉,因此要记录哪些 Worker 正在做哪些 task,以及 Worker 的状态
  • 用超时定时器判断 Worker 是否已经 down 了
  • 需要记录 map task 结束后生成的所有中间结果

因此最终的数据结构是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type WorkerStatus int
const (
Free WorkerStatus = iota
Busy
Timeout
)

type Coordinator struct {
// Your definitions here.
mu sync.Mutex

MapTaskFinished bool
MapTaskRemain int // 还剩多少 map task 任务可以分配
ReduceTaskFinished bool
ReduceTaskRemain int // 还剩多少 reduce task 任务可以分配

Workers int
WS map[int] WorkerStatus // WorkerStatus 表示工人目前的状态,0-表示空闲,1-表示正在做任务,2-表示 coordinator 已经联系不到工人了

// map task
WorkerToMapTask map[int] string// worker i 正在做 文件 filename 的 map task
IntermediateFiles []string
RecordFiles map[string] bool // 用于记录哪些中间文件已经出现过了
MapTask map[string] int // map task 需要完成的文件还有哪些, 2 表示已经完成, 1 表示还未完成, 0 表示还未分配
MapTaskBaseFilename string

// reduce task
NReduce int
WorkerToReduceTask map[int] int// worker i 正在做 第 j 个 reduce task
ReduceTask map[int] int // reduce task 需要完成的任务还有哪些, 2 表示已经完成, 1 表示还未完成, 0 表示还未分配
ReduceTaskBaseFilename string

// crash
Timer map[int] int
}

Worker:

其实 Worker 的很多数据都是 Coordinator 给的,因此不需要特意为他设计。

它的处理流程的主要结构是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
for !IsDone() { // 只要 Coordinator 没结束
AskTask(); // 向 Coordinator 请求任务
if is_map_task() { // 如果是 map task
if has_task_to_do() {
do_map_task()
reply := ReportTask() // 做完任务后向 Coordinator 汇报结果
if is_good_job(reply) {
// 如果 Coordinator 认可我的工作
// 因为可能出现,Coordinator 以为我 down 了,把原来我的工作分配给其他人,那么我做的就是无用功了。。。
}
} else {
// 暂时没有任务可做,这种情况会在所有 map task 都被分配出去了,但是还没有都完成的情况下出现
}
} else { // 如果是 reduce task
if has_task_to_do() {
do_reduce_task()
reply := ReportTask() // 做完任务后向 Coordinator 汇报结果
if is_good_job(reply) {
// 如果 Coordinator 认可我的工作
// 因为可能出现,Coordinator 以为我 down 了,把原来我的工作分配给其他人,那么我做的就是无用功了。。。
}
} else {
// 暂时没有任务可做,这种情况会在所有 reduce task 都被分配出去了,但是还没有都完成的情况下出现
}
}
}

根据其主要流程就知道要设计三种 RPC 与 Coordinator 通信:

  • IsDone:coordinator 是否已经完成了所有任务

  • AskTask:请 coordinator 给我一个任务

  • ReportTask:向 coordinator 汇报我完成的任务


最终代码如下:

mr/rpc.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
X int
}

type ExampleReply struct {
Y int
}

// Add your RPC definitions here.

// AskTask: 向 coordinator 请求任务
type AskTaskArgs struct {
WorkerId int // 当前 worker 的 id,刚开始没分配时为 nil
}

type AskTaskReply struct {
IsMapTask bool
IsReduceTask bool

// map task
Filename string // 需要做 map 的文件名字
MapTaskBaseFilename string // 把 intermediate key 放到 MapTaskBaseFilename-WokerId-X 文件中去
WorkerId int // coordinator 分配给当前 worker 的 id,只要它还活着,除了他自己以外就没人会占用这个 id

// reduce task
NReduce int // 总共需要多少个 reduce
ReduceTaskBaseFilename string // reduce 任务的 base filename
XReduce int // woker 要处理第 X 个 reduce 任务,并把输出放到 ReduceTaskBaseFilename-X 中去
AllFiles []string // 所有的中间文件名
}

// AskStatus: 询问 coordinator 当前的状态
type AskStatusArgs struct {
}

type AskStatusReply struct {
IsDone bool
}

// ReportTask: worker 完成了一个任务,向 coordinator 汇报该任务完成情况
type ReportTaskArgs struct {
WorkerId int

// map task
MapTaskFilename string
IntermediateFile []string // map 任务产生的中间文件

// reduce task
XReduce int // worker 做的是第 XReduce 个 reduce task
}

type ReportTaskReply struct {
GoodJob bool
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

mr/coordinator.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package mr

import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import "sync"

type WorkerStatus int
const (
Free WorkerStatus = iota
Busy
Timeout
)

type Coordinator struct {
// Your definitions here.
mu sync.Mutex

MapTaskFinished bool
MapTaskRemain int // 还剩多少 map task 任务可以分配
ReduceTaskFinished bool
ReduceTaskRemain int // 还剩多少 reduce task 任务可以分配

Workers int
WS map[int] WorkerStatus // WorkerStatus 表示工人目前的状态,0-表示空闲,1-表示正在做任务,2-表示 coordinator 已经联系不到工人了

// map task
WorkerToMapTask map[int] string// worker i 正在做 文件 filename 的 map task
IntermediateFiles []string
RecordFiles map[string] bool // 用于记录哪些中间文件已经出现过了
MapTask map[string] int // map task 需要完成的文件还有哪些, 2 表示已经完成, 1 表示还未完成, 0 表示还未分配
MapTaskBaseFilename string

// reduce task
NReduce int
WorkerToReduceTask map[int] int// worker i 正在做 第 j 个 reduce task
ReduceTask map[int] int // reduce task 需要完成的任务还有哪些, 2 表示已经完成, 1 表示还未完成, 0 表示还未分配
ReduceTaskBaseFilename string

// crash
Timer map[int] int
}

// Your code here -- RPC handlers for the worker to call.

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}

func (c *Coordinator) IsDone(args *AskStatusArgs, reply *AskStatusReply) error {
// 由于 Done 是线程安全的,因此 IsDone 也是线程安全的
reply.IsDone = c.Done()
return nil
}
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()

if args.WorkerId == -1 {
args.WorkerId = c.Workers
c.Workers++
}
// TODO
// 分配任务
worker_id := args.WorkerId
reply.WorkerId = worker_id
reply.NReduce = c.NReduce
reply.XReduce = -1

if !c.MapTaskFinished {
reply.IsMapTask = true
for filename, val := range c.MapTask {
if val == 0 {
reply.Filename = filename
reply.MapTaskBaseFilename = c.MapTaskBaseFilename
c.MapTask[filename] = 1
c.WorkerToMapTask[worker_id] = filename
c.WS[worker_id] = Busy
c.Timer[worker_id] = 0
break
}
}
} else if !c.ReduceTaskFinished {
reply.IsReduceTask = true
for xreduce, val := range c.ReduceTask {
if val == 0 {
reply.XReduce = xreduce
reply.ReduceTaskBaseFilename = c.ReduceTaskBaseFilename
reply.AllFiles = c.IntermediateFiles
c.ReduceTask[xreduce] = 1
c.WorkerToReduceTask[worker_id] = xreduce
c.WS[worker_id] = Busy
c.Timer[worker_id] = 0
break
}
}
}
return nil
}

func (c *Coordinator) is_timeout(worker_id int) bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.WS[worker_id] == Timeout
}
func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
worker_id := args.WorkerId
// 如果超时了则不理他
if c.is_timeout(worker_id) {
reply.GoodJob = false
c.WS[worker_id] = Free
return nil
}

c.mu.Lock()
defer c.mu.Unlock()

if !c.MapTaskFinished {
if c.WorkerToMapTask[worker_id] == args.MapTaskFilename && c.WS[worker_id] == Busy {
reply.GoodJob = true
c.WS[worker_id] = Free

for _, intermediate_file := range args.IntermediateFile {
// 如果中间文件没有出现过,那么就把他加入 IntermediateFiles 中,并把他记录下了,用于去重
_, ok := c.RecordFiles[intermediate_file]
if !ok {
c.IntermediateFiles = append(c.IntermediateFiles, intermediate_file)
c.RecordFiles[intermediate_file] = true
}
}
c.MapTask[args.MapTaskFilename] = 2

c.MapTaskRemain--
if c.MapTaskRemain == 0 {
c.MapTaskFinished = true
}
return nil
}
} else if !c.ReduceTaskFinished{
if c.WorkerToReduceTask[worker_id] == args.XReduce && c.WS[worker_id] == Busy {
reply.GoodJob = true
c.WS[worker_id] = Free

c.ReduceTask[args.XReduce] = 2

c.ReduceTaskRemain--
if c.ReduceTaskRemain == 0 {
c.ReduceTaskFinished = true
}
return nil
}
} else {
// 所有任务都已经完成了
reply.GoodJob = false
}
// worker 向我汇报了,但他汇报的任务和我发布的不同或者他在 free 或 timeout 状态
// 但他既然向我汇报了,那么他一定是 Free 的
c.WS[worker_id] = Free

return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
ret := false

// Your code here.
c.mu.Lock()
defer c.mu.Unlock()
if c.MapTaskFinished && c.ReduceTaskFinished {
ret = true
}
for worker_id, _ := range c.Timer {
c.Timer[worker_id]++
if c.Timer[worker_id] >= 10 && c.WS[worker_id] == Busy {
c.WS[worker_id] = Timeout
if !c.MapTaskFinished {
map_task := c.WorkerToMapTask[worker_id]
c.WorkerToMapTask[worker_id] = ""
c.MapTask[map_task] = 0
} else if !c.ReduceTaskFinished {
reduce_task := c.WorkerToReduceTask[worker_id]
c.WorkerToReduceTask[worker_id] = -1
c.ReduceTask[reduce_task] = 0
}
}
}

return ret
}
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}

// Your code here.

c.MapTaskRemain = len(files)
c.ReduceTaskRemain = nReduce
c.NReduce = nReduce

c.MapTask = make(map[string]int)
c.ReduceTask = make(map[int]int)
c.WS = make(map[int] WorkerStatus)
c.WorkerToMapTask = make(map[int] string)
c.IntermediateFiles = []string{}
c.RecordFiles = make(map[string] bool)
c.WorkerToReduceTask = make(map[int] int)
c.Timer = make(map[int] int)

c.MapTaskBaseFilename = "mr"
c.ReduceTaskBaseFilename = "mr-out"

for _, file := range files {
c.MapTask[file] = 0
}

for idx := 0; idx < nReduce; idx++ {
c.ReduceTask[idx] = 0
}

c.server()
return &c
}

mr/worker.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
package mr

import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "os"
import "io/ioutil"
import "strconv"
import "strings"
import "sort"
import "encoding/json"
import "time"


//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

func is_map_task(task AskTaskReply) bool {
return task.IsMapTask
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.

// uncomment to send the Example RPC to the coordinator.
// CallExample()
// 如果 mr 任务还没结束
var nreduce int
worker_id := -1
total_map := 0
total_reduce := 0
for !IsDone() {
// 向 coordinator 要任务
task := AskTask(worker_id)
worker_id = task.WorkerId
nreduce = task.NReduce
buckets := make([][]KeyValue, nreduce) // nreduce 个 kva
if is_map_task(task) {
filename := task.Filename
if filename != "" {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Can not open file %v ai", filename)
}
content, err := ioutil.ReadAll(file)
file.Close()
if err != nil {
log.Fatalf("Can not read file %v", filename)
}
kva := mapf(filename, string(content))
for _, item := range kva {
bucket_number := ihash(item.Key) % nreduce
buckets[bucket_number] = append(buckets[bucket_number], item);
}
// 对 buckets 中的 item 排序
for _, bucket := range buckets {
sort.Sort(ByKey(bucket))
}

intermediate_files := []string{}
basename := task.MapTaskBaseFilename + "-" + strconv.Itoa(worker_id)
for index, bucket := range buckets {
// TODO
// 创建一个临时文件,把 bucket 中的内容写入临时文件中,并在完成任务后通过 ReportTask 向 coordinator 汇报该任务,
// 当收到 coordinator 的确认后再把临时文件转正
oname := basename + "-" + strconv.Itoa(total_map) + "-" + strconv.Itoa(index)
intermediate_files = append(intermediate_files, oname)
ofile, _ := os.OpenFile(oname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
enc := json.NewEncoder(ofile)
for _, item := range bucket {
err := enc.Encode(&item)
if err != nil {
log.Fatalf("json encoding error!\n")
}
}
ofile.Close()
}
reply := ReportTask(worker_id, task.Filename, intermediate_files, task.XReduce)
if reply.GoodJob {
total_map++
}
} else {
// 暂时没任务,其他 worker 正在做 map task
time.Sleep(time.Second)
}
} else {
// reduce task
if task.XReduce != -1 {
intermediate := []KeyValue{}
for _, file := range task.AllFiles {
ss := strings.Split(file, "-")
sxreduce := ss[len(ss) - 1]
xreduce, _ := strconv.Atoi(sxreduce)
if xreduce == task.XReduce {
f, _ := os.Open(file)
dec := json.NewDecoder(f)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
f.Close()
}
}
sort.Sort(ByKey(intermediate))

// 输出到 ReduceTaskBaseFilename-X 去
oname := task.ReduceTaskBaseFilename + "-" + strconv.Itoa(task.XReduce)
ofile, err := os.OpenFile("tmp" + oname + ".tmp", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
log.Fatalf("Can not open %v\n", oname)
}
i := 0
for i < len(intermediate) {
j := i + 1
values := []string{intermediate[i].Value}
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
values = append(values, intermediate[j].Value)
j++
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
ofile.Close()
reply := ReportTask(worker_id, task.Filename, nil, task.XReduce)
if reply.GoodJob {
total_reduce++
tmpfile, _ := os.OpenFile("tmp" + oname + ".tmp", os.O_RDONLY|os.O_CREATE|os.O_APPEND, 0666)
realfile, _ := os.OpenFile(oname, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
content, _ := ioutil.ReadAll(tmpfile)
realfile.Write(content)
realfile.Close()
tmpfile.Close()
}
} else {
// 暂时没有 reduce task 可做,其他 worker 正在做
time.Sleep(time.Second)
}
}
}
}
//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

// declare an argument structure.
args := ExampleArgs{}

// fill in the argument(s).
args.X = 99

// declare a reply structure.
reply := ExampleReply{}

// send the RPC request, wait for the reply.
call("Coordinator.Example", &args, &reply)

// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
}

// coordinator 是否已经完成了所有任务
func IsDone() bool {
args := AskStatusArgs{}
reply := AskStatusReply{}
connect := call("Coordinator.IsDone", &args, &reply)
if !connect {
// coordinator 已经退出了,因为所有任务都已经完成了
// fmt.Printf("Coordinator is down!\n")
os.Exit(0)
}
return reply.IsDone
}

// 请 coordinator 给我一个任务
func AskTask(worker_id int) AskTaskReply {
args := AskTaskArgs{}
args.WorkerId = worker_id
reply := AskTaskReply{}
// why? 解除下面这条注释,就会出现问题。。。。迷惑
// reply.XReduce = -1 // 表示没有 reduce task 可做
connect := call("Coordinator.AskTask", &args, &reply)
if !connect {
// coordinator 已经退出了,因为所有任务都已经完成了
os.Exit(0)
}
return reply
}
// 向 coordinator 汇报我完成的任务
func ReportTask(worker_id int, filename string, intermediate_files []string, xreduce int) ReportTaskReply {
args := ReportTaskArgs{}
args.WorkerId = worker_id
args.MapTaskFilename = filename
args.IntermediateFile = intermediate_files
args.XReduce = xreduce

reply := ReportTaskReply{}

connect := call("Coordinator.ReportTask", &args, &reply)
if !connect {
// coordinator 已经退出了,因为所有任务都已经完成了
os.Exit(0)
}
return reply
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()

err = c.Call(rpcname, args, reply)
if err == nil {
return true
}

fmt.Println(err)
return false
}