引言
在本实验中,您将构建一个 MapReduce 系统。您将实现一个工作进程,该进程调用应用程序的 Map 和 Reduce 函数,并处理文件的读取和写入;同时,您还将实现一个coordinator进程,该进程将任务分配给工作进程,并处理失败的工作进程。您将构建的系统类似于 MapReduce论文中的描述。(注意:本实验中使用“coordinator”一词,而不是论文中的“主节点”)。
开始
您需要设置 Go 环境以进行实验。
使用 git(一个版本控制系统)获取初始实验软件。要了解更多关于 git 的信息,请查看Pro Git Book或 git 用户手册。
$ git clone git://g.csail.mit.edu/6.5840-golabs-2024 6.5840
$ cd 6.5840
$ ls
Makefile src
我们为您提供了一个简单的顺序 MapReduce 实现,位于 src/main/mrsequential.go
。该实现一次运行一个 Map 和一个 Reduce,均在单个进程中执行。我们还为您提供了一些 MapReduce 应用程序:在 mrapps/wc.go
中的词频统计和在 mrapps/indexer.go
中的文本索引器。您可以按如下方式顺序运行词频统计:
$ cd ~/6.5840
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...
mrsequential.go
的输出保存在文件 mr-out-0
中。输入来自名为 pg-xxx.txt
的文本文件。
请随意借用 mrsequential.go
中的代码。您还应该查看 mrapps/wc.go
以了解 MapReduce 应用程序代码的样子。
在本实验和所有其他实验中,我们可能会对提供给您的代码进行更新。为了确保您能够获取这些更新并使用 git pull
轻松合并它们,最好将我们提供的代码保留在原始文件中。您可以按照实验说明对我们提供的代码进行修改;只是不要移动它。您可以在新文件中添加自己的新函数。
您的任务(中等/困难)
您的任务是实现一个分布式 MapReduce 系统,包括两个程序:coordinator和worker。只有一个coordinator进程,和一个或多个并行执行的worker进程。在实际系统中,worker将运行在不同的机器上,但在本实验中,您将在单台机器上运行所有worker。worker将通过 RPC 与coordinator进行通信。每个worker进程将循环执行以下步骤:请求coordinator分配任务,从一个或多个文件中读取任务输入,执行任务,将任务输出写入一个或多个文件,然后再次请求coordinator分配新任务。coordinator应注意到如果某个worker在合理的时间内(在本实验中为十秒)未完成其任务,则将相同的任务分配给其他worker。
我们为您提供了一些初始代码。coordinator和worker的“main”例程位于 main/mrcoordinator.go
和 main/mrworker.go
中;请不要更改这些文件。您应将实现代码放入 mr/coordinator.go
、mr/worker.go
和 mr/rpc.go
文件中。
如何运行代码
以下是如何在词频统计 MapReduce 应用程序上运行您的代码的步骤。首先,确保词频统计插件是最新构建的:
$ go build -buildmode=plugin ../mrapps/wc.go
在主目录中,运行coordinator:
$ rm mr-out*
$ go run mrcoordinator.go pg-*.txt
mrcoordinator.go
的 pg-*.txt
参数是输入文件;每个文件对应一个“拆分”,是一个 Map 任务的输入。
在一个或多个其他窗口中,运行worker:
$ go run mrworker.go wc.so
当worker和coordinator完成后,查看输出文件 mr-out-*
。当您完成实验后,输出文件的排序并合并内容应与顺序输出匹配,如下所示:
$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT 8
...
我们在 main/test-mr.sh
中提供了一个测试脚本。测试检查词频统计和索引器 MapReduce 应用程序在给定 pg-xxx.txt
文件作为输入时是否产生正确的输出。测试还检查您的实现是否以并行方式运行 Map 和 Reduce 任务,以及您的实现是否能够从运行任务时崩溃的worker中恢复。
如果您现在运行测试脚本,它会挂起,因为coordinator从未完成:
$ cd ~/6.5840/src/main
$ bash test-mr.sh
*** Starting wc test.
您可以将 ret := false
更改为 true
,在 mr/coordinator.go
的 Done
函数中,这样coordinator会立即退出。然后:
$ bash test-mr.sh
*** Starting wc test.
sort: No such file or directory
cmp: EOF on mr-wc-all
--- wc output is not the same as mr-correct-wc.txt
--- wc test: FAIL
测试脚本期望看到名为 mr-out-X
的文件输出,每个文件对应一个 Reduce 任务。mr/coordinator.go
和 mr/worker.go
的空实现不会生成这些文件(或执行其他操作),因此测试失败。
当您完成时,测试脚本的输出应如下所示:
$ bash test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting job count test.
--- job count test: PASS
*** Starting early exit test.
--- early exit test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
您可能会看到一些来自 Go RPC 包的错误消息,比如:
忽略这些消息;注册coordinator作为 RPC 服务器会检查其所有方法是否适合 RPC(具有 3 个输入参数);我们知道 Done
方法不是通过 RPC 调用的。
此外,根据您终止工作进程的策略,您可能会看到一些形式的错误:
在测试结束后看到这些消息是正常的;它们是在worker无法在coordinator退出后联系到 RPC 服务器时产生的。
几条规则:
- 映射阶段:映射阶段应将中间键分成多个桶,供 nReduce 减缩任务使用,其中 nReduce 是减缩任务的数量,也就是 main/mrcoordinator.go 传递给 MakeCoordinator() 的参数。 每个映射器应创建 nReduce 中间文件,供还原任务使用。
- worker实现:Worker 实现应将第 X 个 reduce 任务的输出放到 mr-out-X 文件中。mr-out-X 文件应包含每个 Reduce 函数输出的一行。 这一行应该以 Go 的"%v %v "格式生成,并以键和值调用。 请查看 main/mrsequential.go 中注释为 "this is the correct format"的一行。 如果您的实现与此格式偏差过大,测试脚本就会失败。
- 可修改的文件:您可以修改 mr/worker.go、mr/coordinator.go 和 mr/rpc.go。您可以暂时修改其他文件进行测试,但确保您的代码在原始版本中也能正常工作;我们将使用原始版本进行测试。
- 中间输出文件:Worker应将中间 Map 输出放入当前目录的文件中,以便后续将其作为输入供 Reduce 任务使用。
- 完成标志:main/mrcoordinator.go 期望 mr/coordinator.go 实现一个 Done() 方法,当 MapReduce 作业完全完成时返回 true;此时,mrcoordinator.go 将退出。
- worker退出:当作业完全结束时,Worker 进程应退出。实现这一点的简单方法是使用 call() 的返回值:如果 Worker 无法联系到 Coordinator,则可以假设 Coordinator 已退出,因为作业已完成,因此 Worker 也可以终止。根据您的设计,您可能还会发现有一个“请退出”的伪任务是有帮助的,Coordinator可以将其分配给 Worker。
提示:
- 指南页面提供了一些开发和调试的提示。
- 一个启动的方式是修改 mr/worker.go 的 Worker() 函数,向 Coordinator 发送 RPC 请求任务。然后修改 Coordinator 以响应尚未开始的 Map 任务的文件名。接着修改 worker 以读取该文件并调用应用的 Map 函数,方式类似于 mrsequential.go。
- 应用的 Map 和 Reduce 函数在运行时使用 Go 插件包加载文件名以 .so 结尾的文件。
- 如果您在 mr/ 目录中更改任何内容,您可能需要重构所有您使用的 MapReduce 插件,例如:
go build -buildmode=plugin ../mrapps/wc.go
。 - 本实验依赖于 Worker 共享文件系统。当所有 Worker 在同一台机器上运行时,这很简单,但如果 Worker 在不同机器上运行,则需要全局文件系统,如 GFS。
- 中间文件的合理命名约定是 mr-X-Y,其中 X 是 Map 任务编号,Y 是减少任务编号。
Worker的 Map 任务代码需要在文件中储存中间键/值对, 让 Reduce 任务可以正确读取。一种可行的方式是使用 Go 的 encoding/json 包。可以使用以下方式将键/值对以 JSON 格式写入打开的文件:
enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv)
并以此方式读取该文件:
dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
- Worker的映射部分可以使用 ihash(key) 函数(在 worker.go 中)来选择给定键的 Reduce 任务。
- 您可以从 mrsequential.go 中复制一些代码,用于读取 Map 输入文件、在 Map 和 Reduce 之间对中间键/值对进行排序,以及将 Reduce 输出存储到文件中。
- coordinator作为一个 RPC 服务器将是并发的;不要忘记锁定共享数据。
- 使用 Go 的竞争检测器,命令为
go run -race
。test-mr.sh 的开头有一条注释,告诉您如何使用 -race 运行它。当我们对您的实验评分时,不会使用竞争检测器。然而,如果您的代码存在竞争条件,那么在我们不使用竞争检测器的情况下测试时,您的代码很可能会失败。 - Worker 有时需要等待,例如,直到最后一个映射完成前,Reduce 任务是无法开始的。一个可行的方案是 Worker 定期向 Coordinator 请求工作,在每个请求之间使用
time.Sleep()
进行休眠。另一种可行方案是 Coordinator 中相关的 RPC 处理程序有一个循环等待,使用time.Sleep()
或sync.Cond
。Go 为每个 RPC 在自己的线程中运行处理程序,因此某个处理程序在等待并不妨碍 Coordinator 处理其他 RPC。 - Coordinator无法可靠地区分崩溃的 Worker、由于某种原因停滞的存活 Worker,以及执行但速度过慢以至于无用的 Worker。您能做的最好的事情是让 Coordinator等待一段时间,然后放弃并将任务重新分配给其他 Worker。在本实验中,让 Coordinator等待十秒;之后 Coordinator应假设 Worker已死亡(当然,它也可能没有)。
- 如果您选择实现备份任务(第3.6节),请注意我们测试您的代码在 Worker 执行任务而不崩溃时不会调度多余的任务。备份任务应仅在相对较长的时间段后调度(例如,10秒)。
- 要测试崩溃恢复,您可以使用 mrapps/crash.go 应用插件。它会在 Map 和 Reduce 函数中随机退出。
- 为了确保没有人观察到在崩溃情况下部分写入的文件,MapReduce 论文提到了一种使用临时文件的技巧,并在完全写入后原子性地重命名。您可以使用
ioutil.TempFile
(如果您运行的是 Go 1.17 或更高版本,则使用os.CreateTemp
)创建临时文件,并使用os.Rename
原子性地重命名它。 - test-mr.sh 在子目录 mr-tmp 中运行所有进程,因此如果出现问题并且您想查看中间或输出文件,请查看那里。随意暂时修改 test-mr.sh,在失败的测试后退出,以便脚本不继续测试(并覆盖输出文件)。
- test-mr-many.sh 多次运行 test-mr.sh,您可能希望这样做以发现低概率的错误。它接受一个参数,即运行测试的次数。您不应并行运行多个 test-mr.sh 实例,因为协调者将重用相同的套接字,导致冲突。
- Go RPC 仅发送名称以大写字母开头的结构字段。子结构也必须具有大写字段名称。
当调用 RPC 的 call() 函数时,回复结构应包含所有默认值。RPC 调用应如下所示:
reply := SomeType{} call(..., &reply)
在调用之前不应设置 reply 的任何字段。如果您传递的回复结构具有非默认字段,则 RPC 系统可能会默默返回错误值。
评论 (0)