MIT 分布式课程学习笔记 Map Reduce paper 地址
Paper 的标题为 – MapReduce: Simplified Data Processing on Large Clusters。顾名思义,大集群下的简单数据处理方式。
Map Reduce 是一种编程模型,也是一种与之关联的、用于处理和产生大数据集的实现。用户明确一个map函数去处理 key/value 对,并产生 key/value 对的集合,以及一个 reduce 函数去合并有着相同 key 的所有中间状态 key/value 对。本文指出,许多实际的任务都可以用这种模型来表示。
通过这种模式来构建的程序自动地被赋予了在大集群下并行执行的能力。运行时程序将会很好地切分输入数据,在一组机器上调度执行程序,处理机器错误,管理机器间的通信。这允许即便程序员没有并行和分布式经验,也可以很简单地在大型分布式系统中利用资源。
我们将 Map Reduce 的实现运行在大型的集群上,而且其具有高度的可伸缩性:一个典型的 Map Reduce 将要在数千台机器上处理 TB 级别的数据。程序员可以很轻易地使用这一系统:目前数以百计的 Map Reduce 被实现,每天有数以千计的 Map Reduce 在 Google 的集群上执行任务。
在过去的五年中,作者和其他谷歌员工实现了许多的特殊用途的计算程序,用以处理大量原始数据(如爬虫抓取的文档、网络请求日志等),计算多种类目的派生数据(如倒排索引、多种以图结构表示的网络文档、单台主机抓取页面数量的概要、指定日期频次最高的请求集合等)。大多数计算过程都是直截了当的。尽管,大多数时候输入数据是很巨大的,而且计算过程是分布式的,需要跨越成千上百台机器,才能保证在一个合理的时间内结束过程。而为了处理计算并行化、数据分发和错误处理等问题而引入大量复杂的代码则令原本简单的计算过程变的晦涩难懂。
作为对这种复杂性的回应,我们设计了一种新的抽象方式,此方式允许我们表达出原本简单的计算过程,且将涉及并行、容错性、数据分发和负载均衡的杂乱细节隐藏在一个函数库中。我们的抽象收到了 Lisp 等函数式语言中 map 和 reduce 等启发。我们意识到我们大多数的计算都包含了在每个输入的逻辑“记录”上应用 map 操作,从而计算出一组中间 key/value 对的集合,然后再向共享同一个 key 的所有中间 value 应用 reduce 操作,从而适当地合并衍生数据。我们对用户定义的 map 和 reduce 操作的使用允许我们轻松地将大型计算并行化,以及将再执行作为容错性的主要机制。
本项工作的核心贡献是提供了一个简单且强大的接口,允许自动实现并行和大规模的分布式计算,以及一个与此接口结合的实现,能在普调的 PC 机集群上达到很高的性能。
第 2 部分描述了基本的编程模型并给出了几个例子。第 3 部分描述了针对我们基于集群的计算环境而裁剪的 Map Reduce 接口的一个实现。第 4 部分描述了我们觉得非常有用的一些编程模型的技巧。第 5 部分针对多种不同的任务,对我们的实现进行了性能测量。第 6 部分探索了在 Google 中 Map Reduce 的应用,包括了我们在将其作为生产索引系统的重写基础的经验。第7部分讨论了相关的和未来要做的工作。
计算过程就是输入一组 key/value 对,再生成输出一组 key/value 对。Map Reduce 库的使用者用两个函数来表示这个过程:map 和 reduce。
map 由使用者编写,使用一个输入 key/value 对,生成一组中间 key/value 对。Map Reduce 库将有着相同中间 key I 的中间 value 都组合在一起,再传给 reduce 函数。
reduce 也由使用者编写,它接受一个中间 key I 和一组与I对应的 value。它将这些 value 合并为一个可能更小的 value 集合。通常每个reduce 调用只产生 0 或 1 个输出 value。中间 value 是通过一个迭代器提供给 reduce 函数的。这允许我们操作那些因为大到找不到连续存放的内存而使用链表的 value 集合。
考虑一个问题:统计一个很大的文档集合中每个单词出现的次数。使用者能写出与下面的伪代码相似的代码:
map(String key, String value):
// key: document name 文档名称
// value: document contents 文档内容
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, String value):
// key: a word 一个单词
// value: a list of counts 计数列表
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map 函数将每个单词与出现次数一同输出(本例中简单的输出“1”)。reduce 函数将针对某个特定词输出的次数都合并相加。
另外,使用者要写代码填充一个符合 Map Reduce 规格的对象,内容包括输入和输出文件的名字,以及可选的调节参数。之后使用者调用 Map Reduce 函数,将指定的对象传进去。用户代码会与 Map Reduce 库链接到一起。附录A包括了这个例子的全部程序文本。
尽管前面的伪代码写成了用字符串进行输入输出,但从概念上讲用户提供的 map 和 reduce 函数是关联着类型的。
func | in | out |
---|---|---|
Map | (K1, k2) | list(k2, v2) |
Reduce | (k2, list(v2)) | list(v2) |
也就是说,输入的 key 和 value 与输出的 key 和 value 的域不同。进一步说,中间的 key 和 value 与输出的 key 和 value 的域相同。
我们的 C++ 实现使用字符串与用户定义的函数交互,而将字符串与相应类型的转换留给用户代码完成。
这里例举了一些有趣的程序,它们都可以很轻松的用MapReduce模型表达。
**分布式 Grep:**map 函数在匹配到给定的 pattern 时输出一行。reduce 函数只是将给定的中间数据复制到输出上。
**URL 访问频次统计:**map函数处理网页请求的日志,对每个URL输出〈URL, 1〉。reduce 函数将相同URL的所有值相加并输出〈URL, 总次数〉对。
**倒转 Web 链接图:**map 函数在 source 页面中针对每个指向 target 的链接都输出一个〈target, source〉对。reduce 函数将与某个给定的targe t相关联的所有 source 链接合并为一个列表,并输出〈target, list(source)〉对。
**每个主机的关键词向量:**关键词向量是对出现在一个文档或一组文档中的最重要的单词的概要,其形式为〈单词, 频率〉对。map 函数针对每个输入文档(其主机名可从文档URL中提取到)输出一个〈主机名, 关键词向量〉对。给定主机的所有文档的关键词向量都被传递给 reduce 函数。reduce 函数将这些关键词向量相加,去掉其中频率最低的关键词,然后输出最终的〈主机名, 关键词向量〉对。
**倒排索引:**map 函数解析每个文档,并输出一系列〈单词, 文档 ID〉对。reduce 函数接受给定单词的所有中间对,将它们按文档 ID 排序,再输出〈单词, list(文档 ID)〉对。所有输出对的集合组成了一个简单的倒排索引。用户可以很轻松的扩展这个过程来跟踪单词的位置。
**分布式排序:**map 函数从每条记录中提取出 key,并输出〈key, 记录〉对。reduce 函数不改变这些中间对,直接输出。这个过程依赖于 4.1 节介绍的划分机制和 4.2 节介绍的排序性质。
许多不同的 Map Reduce 的实现都是可行的。选择哪一个要取决于环境。例如,一种实现可能适合于小型的共享内存机器,一种实现可能适合于大型的 NUMA 多处理器机器,而另一种则适合于更大型的联网机器集。
本部分描述的实现主要面向 Google 内部广泛使用的计算环境:大型的商用PC机集群,互相之间用交换式以太网连接。我们的环境是:
- 主要使用的机器为双核 X86 处理器,运行 Linux 系统,每台机器的内存从 2GB 到 4GB 不等。
- 使用的都是商用网络硬件设备——在机器层面上通常从 100Mbps 到 1Gbps 不等,但平均起来要比总带宽的一半少很多。
- 集群中拥有数百或数千台机器,因此机器错误经常出现。
- 每台机器都使用廉价的 IDE 硬盘来提供存储功能。我们使用一种内部开发的分布式文件系统来管理这些磁盘上的数据。这个文件系统通过复制的方法在不可靠的硬件之上提供了实用性与可靠性。
- 用户向一个调度系统提交作业。每个作业包括了一个任务集,会由调度器调度到集群内可用的一组机器上。
通过自动将输入数据切分为 M 块,map 调用分布在多台机器上进行。输入划分可以在不同的机器上并行执行。reduce 调用是通过一个划分函数(例如hash(key) mod R)将中间 key 空间划分为 R 块来分布运行。划分的块数R和划分函数都由用户指定。
图 1 展示了我们的实现中 Map Reduce 操作的整体流程。当用户程序调用 Map Reduce 函数时,会发生下面一系列动作:
用户程序中的MapReduce库首先将输入文件切分为M块,每块的大小从16MB到64MB(用户可通过一个可选参数控制此大小)。然后MapReduce库会在一个集群的若干台机器上启动程序的多个副本。
- 程序的各个副本中有一个是特殊的——主节点,其它的则是工作节点。主节点将 M 个 map 任务和 R 个 reduce 任务分配给空闲的工作节点,每个节点一项任务。
- 被分配 map 任务的工作节点读取对应的输入区块内容。它从输入数据中解析出 key/value 对,然后将每个对传递给用户定义的 map 函数。由 map 函数产生的中间 key/value 对都缓存在内存中。
- 缓存的数据对会被周期性的由划分函数分成 R 块,并写入本地磁盘中。这些缓存对在本地磁盘中的位置会被传回给主节点,主节点负责将这些位置再传给 reduce 工作节点。
- 当一个 reduce 工作节点得到了主节点的这些位置通知后,它使用 RPC 调用去读 map 工作节点的本地磁盘中的缓存数据。当 reduce 工作节点读取完了所有的中间数据,它会将这些数据按中间 key 排序,这样相同 key 的数据就被排列在一起了。同一个 reduce 任务经常会分到有着不同 key 的数据,因此这个排序很有必要。如果中间数据数量过多,不能全部载入内存,则会使用外部排序。
- reduce 工作节点遍历排序好的中间数据,并将遇到的每个中间 key 和与它关联的一组中间 value 传递给用户的 reduce 函数。reduce 函数的输出会写到由 reduce 划分过程划分出来的最终输出文件的末尾。
- 当所有的 map 和 reduce 任务都完成后,主节点唤醒用户程序。此时,用户程序中的 Map Reduce 调用返回到用户代码中。
成功完成后,Map Reduce 执行的输出都在 R 个输出文件中(每个 reduce 任务产生一个,文件名由用户指定)。通常用户不需要合并这R 个输出文件——他们经常会把这些文件当作另一个 Map Reduce 调用的输入,或是用于另一个可以处理分成多个文件输入的分布式应用。
主节点维持多种数据结构。它会存储每个 map 和 reduce 任务的状态(空闲、处理中、完成),和每台工作机器的ID(对应非空闲的任务)。
主节点是将 map 任务产生的中间文件的位置传递给 reduce 任务的通道。因此,主节点要存储每个已完成的 map 任务产生的 R 个中间文件的位置和大小。位置和大小信息的更新情况会在 map 任务完成时接收到。这些信息会被逐步发送到正在处理中的 reduce 任务节点处。
既然 Map Reduce 库是为了帮助使用成百上千台机器处理数量非常大的数据的,它就必须能够优雅地承受机器错误。
工作节点错误
主节点周期性的 ping 每个工作节点。如果工作节点在一定时间内没有回应,主节点就将它标记为已失败。这个工作节点完成的任何 map 任务都被重置为空闲状态,并可被调度到其它工作节点上。同样地,失败的工作节点上正在处理的任何 map 或 reduce 任务也被重置为空闲状态,允许被调度。
失败节点上已完成的 map 任务需要重执行的原因是它们的输出存储在失败机器的本地磁盘上,因此无法访问到了。已完成的 reduce 任务不需要重执行,因为它们的输出存储在了一个全球文件系统上。
当一个 map 任务先被A节点执行过,随后又被 B 节点重执行(A节点已失败),所有执行 reduce 任务的工作节点都能收到重执行的通知。任何没有读取完 A 节点数据的 reduce 任务都会从 B 节点读取数据。
Map Reduce 可以弹性应对大范围的工作节点失败。例如,在一次 Map Reduce 操作期间,运行系统上的网络维护导致了一组约 80 台机器在同一时间无法访问,持续了数分钟。Map Reduce 主节点只是简单的重执行了已由无法访问的机器完成的任务,并继续向前执行,最终完成了这次 Map Reduce 操作。
主节点错误
一种简单的方法是令主节点定期将上面描述的数据结构保存为恢复点。如果主节点任务失败,就可以从上一个恢复点状态启动一个新的程序副本。但是给定的条件是只有一个主节点,它也不太可能失败;因此我们当前的实现会在主节点失败时中止 Map Reduce 计算 。客户可以检查到这一情况,并在他们需要时重启 Map Reduce 操作。
出现故障时的语义
当用户提供的 map 和 reduce 操作对于它们输入的值都是确定性的,我们的分布式实现产生的输出值就如同将整个程序分成一个不间断的串行执行过程一样。
为了实现这个性质,我们依赖于 map 和 reduce 任务输出结果的提交是原子的。每个处理中的任务都会将它的输出写入私有的临时文件中。一个 reduce 任务产生一个这样的文件,而一个 map 任务则产生 R 个这样的文件(每个 reduce 任务一个)。当 map 任务完成时,工作节点发送给主节点的消息中带有 R 个临时文件的名字。如果主节点收到了一个来自已完成节点的完成消息,它就会忽略这个消息。否则,主节点会将 R 个文件的名字记录在相应的数据结构中。
当 reduce 任务完成时,工作节点会执行原子性的更名操作,将临时输出文件更名为最终输出文件。如果相同的 reduce 任务在多个机器上执行,就会有多个更名调用应用在相同的最终输出文件上。我们依赖于由底层文件系统提供的原子更名操作,才能保证最终的文件系统中只包含由其中一个 reduce 执行产生的数据。
我们的绝大多数 map 和 reduce 操作都是确定性的,这种情况下我们的语义和一个串行执行过程是等同的,这也使程序员很容易推出他们程序的行为。当 map 和 reduce 操作有不确定性时,我们提供较弱但仍然合理的语义。当存在不确定的操作时,某个 reduce 任务 R1 的输出等价于一个不确定程序的串行执行输出。但某个reduce任务R2的输出可能符合这个不确定程序的另一个串行执行输出。
考虑 map 任务 M 和 reduce 任务 R1、R2。令e(Ri)为Ri的已提交的执行结果(只执行一次)。此时弱语义生效,因为 e(R1) 可能读取了 M 的一次输出,而 e(R2) 则可能读取了 M 的另一次输出。
在我们的计算环境中,网络带宽是一种比较稀缺的资源。我们利用下面的事实来节省带宽:输入数据(由 GFS 管理)就存储在组成集群的机器的本地磁盘上。GFS 将每个文件分成 64MB 大小的区块,每块复制若干份(通常为 3 份)存储到不同的机器上。Map Reduce 主节点会把输入文件的位置信息考虑进去,并尝试将 map 任务分配到保存有相应输入数据的机器上。如果失败的话,它会试图将 map 任务调度到临近这些数据的机器上(例如与保存输入数据的机器处于同一网关的工作节点)。当在一个集群的相当一部分节点上运行 Map Reduce 操作时,大多数输入数据都是本地读取,并不消耗网络带宽。
如上所述,我们将 map 阶段分成 M 份,将 reduce 阶段分成 R 份。理想情况下,M 和 R 应该比工作节点机器的数量大很多。每个工作节点处理很多不同的任务,可以增强动态负责均衡能力,也能加速有工作节点失败时的恢复情况:失败节点已经完成的 map 任务有很多的时候也能传递给其它所有工作节点来完成。
在我们的实现中 M 和 R 的数量有一个实际的上限:如上所述,主节点必须做 O(M+R) 的调度决定以及在内存中保持 O(MR) 个状态。(但是内存使用的常数项很小:O(MR) 个状态中每个 map/reduce 任务对只需要差不多1字节数据。)
进一步分析,R通常由用户指定,因为每个reduce任务都会产生一个独立的输出文件。在实践中我们倾向于这样选择M,即可以将每个单独的任务分成 16-64MB 大的输入数据(此时上面所说的局部性优化效果最好),同时我们令R为待使用的工作节点数量较小的整数倍。我们经常使用 M=200000,R=5000,使用 2000 台机器来运行 Map Reduce 计算。
导致MapReduce操作用时延长的一个常见原因是出现了“落后者”:某台机器在完成最后的一个map或reduce任务时花费了反常的漫长时间。很多原因都会导致落后者的产生。例如,一台磁盘损坏的机器可能会遭遇频繁的可校正错误,导致它的读取性能从30MB/s降至1MB/s。集群调度系统可能已经调度了其它任务到这台机器,导致它在执行MapReduce代码时因为CPU、内存、本地磁盘或网络带宽的竞争而更加缓慢。最近我们遇到的一个问题是机器的初始化代码有一个bug,导致处理器缓存被禁用:受影响的机器上的计算速度下降了超过100倍。
我们有一个通用的机制来减轻落后者问题。当MapReduce操作接近完成时,主节点会将仍在处理中的剩余任务调度给其它机器作备用执行。原本的执行和备用执行中的任一个完成时都会将对应任务标记为已完成。我们已经调整过这个机制,使它因这个机制而增加的计算资源消耗通常只有一点点。我们已经观察到这一机制有效地减少了大型MapReduce操作花费的时间。例如,5.3节中的排序程序在禁用这一机制时要多花费44%的时间。
尽管仅仅用map和reduce函数提供的基本功能就足够解决大多数需求了,我们还是发现了一些很有用的扩展。这些扩展将在本节进行描述。
MapReduce的用户指定他们想要的reduce任务/输出文件的数量。通过划分函数可以将数据按中间key划分给各个reduce任务。我们默认提供了散列函数当作默认的划分函数(例如,hash(key) mod R)。通常这就能得出很平衡的划分结果了。但在有些情况下,用key的其它信息来划分数据也很有帮助。例如有时输出的key都是URL,而我们想让所有来自同一主机的项最后都在同一个输出文件中。为了支持类似这样的情况,MapReduce的用户可以提供一个特殊的划分函数。例如,使用“hash(主机名(urlkey)) mod R”来解决上面的问题。
我们保证在给定的划分中,中间key/value对是按增序排列的。这个顺序保证使每个划分产生一个有序的输出文件变得很容易,当输出文件的格式需要支持高效的按key随机访问,或用户需要输出数据有序时,这一性质会非常有用。
某些情况中,不同的map任务产生的中间key重复率非常高,而且用户指定的reduce函数可进行交换组合。一个典型的例子就是2.1节中的单词统计。单词频率符合齐夫分布(百度百科),因此每个map任务都会产生非常多的<the, 1>这样的记录。所有这些记录都会通过网络被发送给一个reduce任务,然后再通过reduce函数将它们相加,产生结果。我们允许用户指定一个可选的合并函数,在数据被发送之前进行局部合并。
合并函数由每个执行map任务的机器调用。通常合并函数与reduce函数的实现代码是相同的。它们唯一的区别就是MapReduce库处理函数输出的方式。reduce函数的输出会写到最终的输出文件中,而合并函数的输出会写到中间文件中,并在随后发送给一个reduce任务。
MapReduce库支持多种不同格式输入数据的读取。例如,“text”模式将每行输入当作一个key/value对:key是文件中的偏移,而value则是该行的内容。另一种支持的常见格式将key/value对按key排序后连续存储在一起。每种输入类型的实现都知道如何将它本身分成有意义的区间从而能分成独立的map任务进行处理(例如text模式的区间划分保证了划分点只出现在行边界处)。用户也可以通过实现一个简单的reader接口来提供对新的输入类型的支持,尽管大多数用户只是使用为数不多的几种预定义输入类型中的一种。
reader的实现不一定要提供从文件中读数据的功能。例如,可以很容易的定义一个从数据库读记录的reader,或是从映射在内存中的某种数据结构中。
类似的,我们也支持一组输出类型来产生不同格式的数据,而用户提供代码去支持新的输出类型也不难。
有些情况下,MapReduce的用户发现从他们的map或reduce操作中产生一些额外的辅助文件很有帮助。我们依赖于应用作者来确保这样的边界效应是原子且幂等的。通常应用会写一个临时文件,并在文件生成完毕时将其原子的更名。
我们不提供在单一任务产生的多个输出文件中原子的两段提交。因此,如果一个任务产生多个输出文件,且要求有跨文件的一致性,它必须是确定性的。这个限制在实践中还没有引起过问题。
有时用户代码中的bug会导致map或reduce函数一遇到特定的记录就崩溃。这些bug会导致MapReduce操作无法完成。通常的应用措施是修复这些bug,但有时难以实现:也许bug存在于得不到源代码的第三方库中。同样地,有时忽略一些记录是可以接受的,例如在一个大数据集上做统计分析时。我们提供了一个可选的执行模式,在MapReduce库检测到确定会导致崩溃的记录时路过它们从而继续进度。
每个工作进程都要安装一个信号处理程序来捕捉违规操作和总线错误。在调用用户的map或reduce操作前,MapReduce库将用于验证的序列号保存在全局变量中。如果用户代码产生了信号,信号处理程序就将一个包含这个序列号的“最后一步”UDP包发送给MapReduce主节点。当主节点在某个记录上发现了超过一个错误时,就表明下一次重执行相应的map或reduce任务时要跳过这个记录。
map或reduce函数的调试问题常常令人难以捉摸,因为实际的计算过程都发生在分布式系统上,经常包含数千台机器,而工作分配决策都由主节点动态产生。为了帮助调试、性能分析、小范围测试,我们开发了MapReduce库的一个替代实现,可以将MapReduce操作的全部工作在一台本地机器上顺序执行。用户拥有控制权,因此计算可以被限制在特定的map任务中。用户调用他们的程序时加上一个特殊标志,就可以方便的使用任何有用的调试或测试工具。
主节点内置了一个HTTP服务器,可以将当前状态输出为一组网页供用户使用。状态网页能显示计算的进度,例如有多少任务被完成,多少正在处理,输入数据大小,中间数据大小,输出数据大小,处理速度,等等。这些网页还包含指向每个任务的stdout和stderr输出文件的链接。用户可以用这些数据来预测计算要花费多长时间,以及是否应该增加计算使用的资源。这些网页也能用于在计算速率比预期慢很多时发现这一情况。
另外,顶层的状态网页还能显示哪些工作节点失败了,它们失败时正在处理哪些map和reduce任务。当要在用户代码中确定bug时这些时间非常有用。
MapReduce库提供了计数器机制,可以统计多种事件的发生次数。例如,用户代码可能想统计已处理的单词总数,或被索引的德语文献的数量等。
为了使用这一机制,用户代码需要创建一个有名的计数器对象,并在map和reduce函数的适当位置增加它的值。例如:
Counter *uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
工作节点上的计数器值会定期发送给主节点(附在ping的回应里)。当MapReduce操作完成时,主节点会将运行成功的map和reduce任务发来的计数器值合并后返回给用户代码。当前的计数器值也会显示在主节点的状态网页上,其它人可以看到实时的计算进度。在合并计数器值时,主节点会忽略同一个map或reduce任务的重复的结果,从而避免多次叠加。(重复执行可能发生在我们的备用任务和失败节点重执行中。)
有些计数值是由MapReduce库自动维护的,例如输入key/value对已处理的数量和输出key/value对产生的数量等。
用户们观察到这一机制在对MapReduce操作的智能检查上很有帮助。例如,在一些MapReduce操作中,用户代码可能想要确认产生的输出对的数量恰好与输入对的数量相等,或是已处理的德语文献占处理文献总数的比例是在接受范围内的。
本节中我们会在一个大型集群上测量MapReduce在两个计算上的性能。一个计算是在差不多1TB的数据中查找指定的模式。另一个计算则是排序1TB的数据。
这两个程序能代表大量真实的MapReduce程序——一类程序是将数据从一种表示形式转换为另一种,另一类程序则是从很大数量的数据集中提取出一小部分感兴趣的数据。
所有的程序都运行在由1800台机器组成的集群上。每台机器配有:2GHz的Intel Xeon处理器且支持超线程,4GB内存,2块160GB的IDE硬盘,和1GB以太网连接。所有机器都安排在2层树结构的网关内,根节点的可用带宽加起来有100-200Gbps。所有的机器都处于相同的托管设施下,因此任意两台机器间的往返通信时间都少于1ms。
除了4GB的内存,还有1-1.5GB的内存保留给了集群上运行的其它任务。程序运行的时间是一个周末的下午,此时大多数CPU、磁盘和网络资源都空闲中。
grep程序要扫描1010个长度为100字节的记录来寻找一个相当罕见的三字符模式(这个模式出现于92337个记录中)。输入被切分成64MB大小的部分(M = 15000),整个输出为一个文件(R = 1)。
图2显示了随时间变化的计算进度。Y轴是输入数据扫描的速率。这个速率逐渐提升代表更多的机器被分配给MapReduce计算,并在分配的机器数达到1764台时超过了30GB/s。随着map任务的结束,这个速率开始下降,并在计算开始后80s时降至0。整个计算过程共花费约150s。这包括了大约1分钟的启动开销。启动开销是由于要把程序传播到所有工作机器上,还包括GFS要打开1000个输入文件和获取局部优化所需信息而导致的延迟。
排序程序要对10100个100字节的记录进行排序(差不多1TB数据)。这个程序模仿了TeraSort测试程序。
排序程序只包括不到50行的用户代码。map函数一共3行,它从一个文本行中提取出10字节的排序key,再将key和原始文本行输出为中间key/value对。我们使用了一个内置的Identity函数作为reduce函数。这个函数会将未更改的中间key/value输出为结果。最终的排序后输出被出到一组2路复制的GFS文件(例如,这个程序的输出要写2TB的数据)。
如前所述,输入数据被分成若干个64MB大小的部分(M = 15000)。我们将已排序的输出分成4000个文件(R = 4000)。划分函数使用key的首字节来将它分到R个文件中的一个。
此次测试中我们的划分函数使用了key分布的内建知识。在一个一般的排序程序中,我们会预先加一轮MapReduce操作,收集key的样本,并使用key抽样的分布来计算最终输出文件的划分点。
图3(a)是排序程序的正常执行过程。上边的图显示了输入读取的速度。输入速度的峰值为13GB/s,并在200秒后所有的map任务都完成时迅速下降至0。可以注意到这个速度要比grep的速度小。这是因为排序的map任务要在向本地磁盘写中间文件上花费大约一半的时间和I/O带宽。而grep中相应的中间输出则可以忽略不计。
中间的图显示了数据从map节点通过网络向reduce节点发送的速度。这个移动开始于第一个map任务完成。图中的第一个驼峰出现在reduce任务第一次达到1700个时(整个MapReduce共分配到1700台机器,每台机器同时最多只运行一个reduce任务)。计算开始差不多300秒时,第一批reduce任务已经有部分完成,我们开始向剩余的reduce任务传送数据。所有传送过程在计算开始600秒后结束。
下面的图是reduce任务向最终的输出文件写入已排序数据的速度。在第一个传输周期的结束与写入周期的开始之间有一个延迟,因为此时机器正在排序中间数据。写入速度在2-4GB/s下持续了一段时间。所有的写操作在计算开始后850秒左右结束。包括启动开销的话,整个计算花费了891秒。这与TeraSort上目前公布的最佳记录1057秒很接近。
一些要注意的事:因为我们的局部性优化,输入速度要比传播速度和输出速度都快——大多数数据都读自本地磁盘,绕开了我们带宽相当有限的网络。传播速度要比输出速度高,因为输出阶段要写两份已排序的数据(因为可靠性和实用性的考虑)。我们写两份输出是因为这是我们的底层文件系统针对可靠性和实用性提供的机制。如果底层文件系统使用擦除代码而不是复制,那么写数据需要的网络带宽就会减少。
在图3(b)中显示的是禁用了备用任务情况下排序程序的执行过程。这种情况下的执行过程与图3(a)中的很相似,但在末尾处有很长一段时间几乎没有任何的写操作发生。在960秒后,只有5个reduce任务还没有完成。但这最后的几个任务直到300秒后才结束。整个计算花费了1283秒,比正常情况多花费44%的时间。
在图3(c)中显示了有机器失败情况下的排序程序执行过程。在计算开始几分钟后,我们有意的杀掉了1746个工作节点中的200个。底层集群调度器立即在这些机器上重启了新的工作进程(只有进程被杀掉了,机器还在正常运行中)。
工作节点的死亡显示为一个负的输入速度,因为一些已经完成的map任务消失了(因为对应的map节点被杀掉了)需要重新完成。这些map任务的重执行很快就会发生。整个计算过程在933秒后结果,包括了启动开销(相比正常执行时间只增加了5%)。
我们在2003年2月写出了MapReduce库的第一个版本,并在2003年8月进行了非常大的改进,包括了局部性优化、各工作机器上任务执行的动态负载平衡等。从那时开始,我们愉快地惊讶于MapReduce被如此广泛地应用在我们工作中遇到问题上。它已被用于Google的很多领域中,包括:
- 大规模机器学习问题;
- 针对Google News和Froogle产品的集群问题;
- 用于产生流行需求报告的数据提取(例如Google Zeitgeist);
- 针对新的尝试和产品的网页属性提取(例如,从大量的定位搜索中提取出地理位置信息);
- 大规模的图计算。
图4显示了在我们的源代码管理系统中进行了登记的MapReduce程序的数量随时间的显著增长,从2003年初的0到2004年9月底的接近900个不同版本。MapReduce取得如此成功的原因是它令花半小时时间写出一个简单的程序并运行在一千台机器上成为了可能,这极大地加速了开发和原型实现循环。进一步说,它允许没有分布式与并行系统经验的程序员也能轻松地利用大量的资源。
在每项作业的结尾,MapReduce库会将作业花费的计算资源统计写入日志。在表1中,我们能看到Google在2004年8月运行的MapReduce作业的一部分统计情况。
迄今为止我们的一个最重要的MapReduce应用是完全重写了生产索引系统,它负责产生用于Google网络搜索服务的数据结构。索引系统把由我们的爬取系统检索的一个大的文档集合作为输入,并存储为一组GFS文件。这些文档的原始文本数据超过了20TB。索引进程分成了5-10个MapReduce操作运行。使用MapReduce(而不是之前版本使用的ad-hoc分布系统)提供了许多好处:
- 索引代码更简单,更小,也更易懂,因为处理容错性和分布与并行的代码都被隐藏在MapReduce库中了。例如,其中一个阶段的计算在使用了MapReduce后从差不多3800行C++代码降到了700行代码。
- MapReduce库的性能足够好,因此我们可以令概念上无关的计算过程相互分离,而不需要为了避免额外的数据处理而将它们混合在一起。这减小了修改索引进程的难度。例如,在我们的旧索引系统中进行一个修改要花费几个月时间,而在新系统中只需要几天时间。
- 索引进程更容易去操作,因为大多数由机器失败、部分机器缓慢和网络暂时中断引起的问题都由MapReduce库自动处理了,不需要操作者干预。此外,通过向集群中增加机器,可以很容易地增强索引进程的性能。
许多系统都提供了约束好的程序模型,并利用这些约束自动的进行并行计算。例如,一个组合函数可以在N个处理器上使用并行前缀计算用log N的时间计算出包含N个元素的数组的所有前缀。MapReduce可以被认为是基于来自我们在真实世界中的大型计算的经验,对这些模型的简单化和提炼。更重要的是,我们提供了可以扩展到上千个处理器上使用的容错机制。作为对比,大多数并行处理系统只能在小规模下实现,并将处理机器错误的细节留给程序员去完成。
Bulk Synchronous Programming和一些MPI原语提供了更高层次的抽象,更容易写出并行程序。这些系统和MapReduce的一个关键区别是MapReduce利用了一个受约束的程序模型去自动并行化用户程序,并提供透明的容错机制。
我们的局部性优化受到了诸如活动磁盘等技术的启发,即将计算推给靠近本地磁盘的处理单元,来减少要跨过I/O子系统或网络发送的数据总量。我们用运行在直接连接了少量磁盘的商用处理器上替代了直接运行在磁盘控制处理器上,但通用的方法是相似的。
我们的备用任务机制与Charlotte系统上的eager调度机制类似。简单的eager调度系统有一个缺点,就是如果某个任务导致了多次失败,整个计算都会失败。我们通过自己的略过坏记录的机制解决了一些这类问题。
MapReduce的实现依赖于一个内部的集群管理系统,它负责在一组数量很多的共享机器上分布运行用户任务。尽管不是本文的重点,这个集群管理系统与Condor等其它系统在原理上是类似的。
排序机制作为MapReduce库的一部分,与NOW-Sort操作是类似的。源机器(map节点)切分待排序的数据并将其发送给R个reduce节点。每个reduce节点在本地排序它的数据(尽量存放在内存中)。当然NOW-Sort没有用户定义的map和reduce函数,这些使我们的库被广泛应用。
River提供了一个编程模型,通过在分布式队列上发送数据来处理通信。类似于MapReduce,River系统试图提供提供好的平均情况性能,即使存在因不统一的硬件或系统扰动而导致的不一致性。River通过小心的调度磁盘和网络传输来实现平衡的完成时间,从而达到这一目标。MapReduce使用了不同的方法。通过约束编程模型,MapReduce框架能够将问题划分成大量细粒度的任务。这些任务被动态调度给可用的工作节点,因此更快的节点会处理更多的任务。这个约束的编程模型同样允许我们在临近作业结束时将剩余的任务重复调度执行,这显著的减少了存在不一致时(例如有缓冲或停顿的机器)的完成时间。
BAD-FS有着与MapReduce非常不同的编程模型,而且不像MapReduce,它面向跨越广域网的作业执行。但是,它与MapReduce有着两个本质上的相同点:
- 两个系统都使用了备用执行来恢复因错误而导致的数据丢失。
- 两个系统都使用了局部感知调度来减少在拥堵的网络连接上发送的数据量。
TACC被设计为一个高度可用的网络服务的简化结构系统。类似于MapReduce,它依赖于重执行机制来实现容错性。
MapReduce编程模型已经成功的用于Google的许多不同的用途中。我们将它的成功归功于几个原因。首先,该模型即使是对于没有并行和分布式系统经验的程序员也是很易于使用的,因为它隐藏了并行化、容错机制、局部性优化、以及负载平衡的细节。其次,很多种类的问题都很容易表示成MapReduce计算。例如,MapReduce被用于为Google的网络搜索服务的数据产生、排序、数据挖掘、机器学习、以及许多其它系统。第三,我们已经开发了一个MapReduce的实现,可以扩展到包含上千台机器的大型集群。该实现可以高效使用这些机器的资源,因此适合于Google遇到的很多大型计算问题。
我们从这项工作中学到了很多东西。首先,约束这个编程模型令并行和分布式计算,以及令这些计算可容错,变得简单了。其次,网络带宽是一种稀缺资源。我们系统中的很多优化都因此针对减少通过网络发送的数据总量:局部性优化允许我们从本地磁盘读,同时将中间文件写入本地磁盘也节省了网络带宽。第三,备用执行可以用于减小缓慢的机器的影响,及应对机器失败和数据丢失。
#include "mapreduce/mapreduce.h"
// User's map fuction
class WordCounter: public Mapper {
public:
virtual void Map(const MapInput &input) {
const string &text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
++i;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
++i;
if (start < i)
Emit(text.substr(start, i-start), "1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User's reduce function
class Adder: public Reducer {
virtual void Reduce(ReduceInput *input) {
// Iterate over all entries with the
// same key and add the values
int64_t value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char **argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; ++i) {
MapReduceInput *input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput *out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combine_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result))
abort();
// Done: 'result' structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}