分布式系统

MapReduce:A Flexible Data Processing Tool(译)

2011年10月7日 阅读(517)

       作者:Jeffrey Dean &Sanjay Ghemawat .Google Inc 2010-1

原文:http://perspectives.mvdirona.com/2010/01/02/MapReduceInCACM.aspx

译者:phylips@bmy 2011-10-5

译文:http://duanple.blog.163.com/blog/static/7097176720119711038980/


与并行数据库相比,MapReduce的优势包括存储系统无关以及大规模jobs的细粒度容错性。 

MapReduce是一个用于大规模数据集合生成和处理的编程模型。用户描述一个map函数和reduce函数,map函数会处理一个key/value对来生成一系列的中间key/value对集合,reduce函数会对具有相同中间key值的中间values进行合并。在2003年我们就基于该模型构建了一个系统用来简化Google.com所使用的倒排索引的构建。从那时起,在Google已经有超过10000个的不同程序使用了MapReduce,涵盖了用于大规模图处理,文本处理,机器学习,机器翻译等各方面的算法。MapReduce的Hadoop开源实现也已被Google之外的很多组织广泛使用。

 

为了帮助MapReduce编程模型的解释,考虑一个在大量文档集合中计算单词出现个数的问题。用户编写的代码类似于如下伪代码:

map(String key, String value):

// key: document name

// value: document contents

for each word w in value:

EmitIntermediate(w, “1”);

reduce(String key, Iterator values):

// key: a word

// values: a list of counts

int result = 0;

for each v in values:

result += ParseInt(v);

Emit(AsString(result));

 

Map函数会输出一个单词和对应的出现次数(在这个简单例子中就是1)。Reduce函数将某个特定单词的所有出现次数累加起来。

 

MapReduce会在一个由商品化机器组成的集群上自动地并行执行该程序。由运行时系统负责输入数据的划分,程序在机器集合上的执行调度,处理机器失败,以及管理所需要的机器间通信。MapReduce可以让没有任何并行和分布式系统经验的人很容易地使用上大规模分布式系统的资源。一个典型的MapReduce计算通常会在数百或数千台节点上处理TB级的数据。大家都觉得该系统很容易使用,目前Google的集群上每天都有超过100,000个MapReduce jobs在运行。


与并行数据库的对比

内建于并行数据库系统的查询语言也可以用来表达这种MapReduce所支持的计算。由Andrew Pavlo等人在2009年发表的一篇论文(在这里我们简称为“比较论文(comparison paper)[13]”)比较了并行数据库和MapReduce的性能。它对MapReduce编程模型的开源Hadoop实现,DBMS-X(某未知的商业数据库系统)以及Vertica(某公司的列存式数据库系统,这篇论文的某位作者为其联合创始人)进行了对比。另外还有由该比较论文的某些作者早期发表的题为“a major step backwards[5,6]”两篇更早一些的blog文章。在本文中,我们主要澄清下这三篇文章里关于MapReduce的一些误解:

l  MapReduce不能使用索引,同时意味着对所有输入数据的完全扫描

l  MapReduce的输入输出总是文件系统上的文件,以及

l  MapReduce需要使用低效的文本数据格式

同时我们也会讨论其他的一些重要问题:

n  MapReduce是存储无关的,同时不需要事先将数据加载到数据库就可以对它进行处理。很多情况下,在将数据加载到数据库完成一次分析的时间里,都已经可以进行50次甚至更多次的MapReduce分析了

n  复杂的转换操作使用MapReduce比使用SQL更容易表达;以及

n  比较论文中的很多结论都是由实现和评价方法的缺陷导致的,而不是MapReduce模型本身的问题;我们会在本文后面讨论这些缺陷。

关于更细节的内容,我们建议读者去阅读下原始的MapReduce论文[4]和该比较论文[13]。


异构性系统

很多生产环境包含了各种存储系统。客户数据可能存储在关系数据库中,用户请求日志可能保存在文件系统。此外,随着环境的演化,数据可能迁移到新的存储系统中。MapReduce提供了可以在这种异构系统中进行数据分析的简单模型。终端用户通过定义对存储系统进行操纵的简单reader和writer实现来扩展MapReduce,就可以支持新的存储系统。目前可以支持的存储系统有:在分布式文件系统上的文件,数据库查询结果,存储在Bigtable上的数据,和结构化输入文件(比如B树)。一个MapReduce操作可以简单地对来自各种存储系统上的数据进行处理和组合等。

 

现在来看下一个并行DBMS系统是如何执行数据分析的。每个分析的输入必须先拷贝到并行DBMS中。该加载阶段的存在使它很不方便。同时也可能是令人不可接受的慢的,尤其是如果数据在加载之后只需要进行一两次分析的情况。比如,考虑一个面向批处理应用的网页抓取和索引系统,该系统会抓取一组网页,然后生成倒排索引。将这些只需要读取一次来建立索引的网页数据加载到数据库,实在是笨拙而且低效。即使将数据加载到并行数据库中的开销是可以接受的,我们仍然需要一个相应的数据加载工具。这就是MapReduce可以使用的一个领域,用户不需要自己编写一个自行进行并行化和容错的定制加载器,通过编写一个简单的MapReduce程序就可以将数据加载到并行DBMS中。


索引

对比论文中错误地认为MapReduce没法利用预先生成的索引,这也导致了论文中偏颇的benchmark结果。比如,假设现有一个划分到多个非分布式数据库中的大规模数据集合,划分可能使用了一个hash函数。每个数据库中都可以使用索引,这样就可以使用该索引在数据库中进行查询,然后将结果交给MapReduce的输入。如果数据是存储在D个数据库分区内,我们可以运行D个数据库查询,产生的结果作为MapReduce的D个输入。实际上,Pavlo等一些作者已经将这种策略应用到了他们最近的工作中。

 

另一个使用索引的例子是从Bigtable中读取数据的MapReduce。如果所需的数据刚好是Bigtable行空间的某个子区间,我们可以只读取该子区间而不需要扫描整个Bigtable。此外,像Vertica和其他一些列存式数据库一样,我们也可以只读取那些数据分析所必需的列,因为Bigtable本身就是按照列进行数据存储的。

 

还有另外一个例子就是特定日期边界内的日志处理;参考比较论文中的Join task部分,在论文中Hadoop benchmark读取了155 million条记录来处理真正落在边界内的134,000条记录。几乎我们所知的所有日志系统都是周期性地切换到另一个新的日志文件,然后将切换时间作为日志文件的一部分。因此我们可以简单地运行一个MapReduce,只处理那些与给定日期区间重叠的日志文件,而不是读取所有的日志文件。


复杂函数

Map和Reduce通常都比较简单,同时也有很直接的等价SQL语句。然而,在很多情况下,尤其是对于Map函数,它可能很复杂以至于没法简单地使用一个SQL查询语句进行表达,比如下面的这些例子:

n  从一组HTML文档集合中抽取出出链集合,并根据目标文档进行聚合

n  将卫星图像进行缝合消除接缝并为Google Earth选择高清图像

n  使用一种专门为支持Google搜索查询请求而优化的压缩模式生成倒排索引文件

n  处理世界上的所有道路区段,然后进行渲染将这些区段显示在Google Maps

n  在一个输入数据集合上容错地并行执行由高级语言(比如Sawzall和Pig Latin)编写的程序

理论上,这样的一些UDF可以与SQL查询语句配合起来,但是比较论文中已经指出这种UDF支持要么很丑陋(在DBMS-X中)要么根本不支持(在Vertica中)。当然,这些问题以后可能会消失,但是就目前来说,与擅长于进行选择和聚合的SQL相比,MapReduce无疑是一个完成这种复杂任务的更好框架。


结构化数据和Schemas

Pavlo et al.确实也提出了很好的一点,即schema有助于多个应用程序共享相同的数据。比如,考虑来自比较论文中的下面一个schema:

CREATE TABLE Rankings (

pageURL VARCHAR(100)

PRIMARY KEY,

pageRank INT,

avgDuration INT );

比较论文中的相应的Hadoop benchmarks使用了一种低效而且易损坏的文本格式,比如通过在字段间插入一个竖杠来分割不同属性,如下:

137|http://www.somehost.com/index.html|602

与这种低效格式相比,实际上Google所有的读写数据的MapReduce操作都是以Protocol Buffer[8]格式进行的。一个高级的输入输出类型描述语言,同时通过编译器进行代码生成来将编码/解码细节与应用程序代码分离。对Ranking数据的相应的Protocol Buffer描述如下:

message Rankings {

required string pageurl = 1;

required int32 pagerank = 2;

required int32 avgduration = 3;

}

下面的Map函数片段用来处理一条Ranking记录:

Rankings r = new Rankings();

r.parseFrom(value);

if (r.getPagerank() > 10) { … }

Protocol Buffer框架允许类型进行升级而不需要修改(甚至不需要重新编译或build)现有代码。实践证明,该层次的schema支持已足以让数千的Google工程师共享一个不断演化的数据类型。

 

此外,Protocol Buffer的实现采用了一种更紧致以及可以更快编解码的二进制格式,而不是对比论文中的Hadoop benchmarks所采用的文本格式。比如,自动生成的解析Rankings Protocol Buffer记录的代码每条记录花费20纳秒,与之相比使用之前提到的Hadoop benchmark中采用的文本输入格式,每条记录解析要1732纳秒。该测量结果是通过一个运行在2.4GHz的Intel双核Duo的JVM上得到的。用户benchmark运行的Java代码片段如下:

// Fragment 1: protocol buffer

parsing

for (int i = 0; i < numIterations;

i++) {

rankings.parseFrom(value);

pagerank = rankings.get-

Pagerank();

}

// Fragment 2: text format

parsing (extracted from

Benchmark1.java

// from the source code

posted by Pavlo et al.)

for (int i = 0; i < numIterations;

i++) {

String data[] = value.to-

String().split(“\\|”);

pagerank = Integer.

valueOf(data[0]);

}

二者差了80倍,因此我们认为比较论文中关于Hadoop benchmarks的相关数字被夸大了,同时不能被用来得出关于MapReduce和并行DBMS在性能上根本性的差异的结论。


容错性

MapReduce实现采用了一种拉模式进行Mappers和Reducers间的数据移动,而不是让Mappers直接写到Reducers的推的模式。Pavlo等指出这种拉的模式会导致Mappers和Reducers间的数据移动产生很多小文件和很多磁盘seek,这是正确的。但是在Google的MapReduce实现中利用了一些像批量处理,排序和中间数据的组合以及读操作的智能调度技巧来降低这种开销。

 

MapReduce实现没有采用推模式是由Google的开发者所需要的容错需求决定的。在大规模数据集上的大部分MapReduce的执行过程通常都会碰到一些错误,可能是软件的或者是硬件的问题,同时Google的集群调度系统也会杀掉某些MapReduce任务来为更高优先级的任务腾出空间。在一个推模式中,一个reducer的失败将会导致所有Map tasks的重新执行。

 

同时随着数据集的增大,分析过程将需要更多的计算量,同时容错会变得更重要。目前在Google,每天使用MapReduce处理的数据量,规模超过1PB的已经有10几个数据集,超过数百TB的也有几十个。在Google之外,Hadoop用户列表里也已经有很多用户处理的数据集达到了数百TB甚至更多。很明显,随着数据集的增大,越来越多的用户将会需要一个类似于MapReduce那样的容错系统来高效地并且有效地处理这些大规模的数据集。


性能

Pavlo等将Hadoop MapReduce实现的性能与两个数据库实现进行了对比;这里,我们讨论下这些系统间的性能差异:

 

工程化因素 启动开销和顺序扫描速度实际上只是体现了实现和工程化取舍的成熟度,而不能代表编程模型的根本性区别。当然这些差别肯定是很重要的,但是可以通过各种方式解决。比如,启动开销可以通过让worker进程一直保持活动状态,等待下一个MapReduce调用来解决。类似的优化一年多以前就已经添加到了Google的MapReduce实现中。

 

Google也通过各种性能优化手段来解决顺序扫描的性能问题。比如,通过为结构化数据采用高效的二进制编码格式(Protocol Buffer)取代低效的文本格式。

 

非必需数据的读取 对比论文中提到,“MR总是要扫描整个输入文件来完成一个query”。MapReduce并不要求在数据进行完整的扫描;它只要求输入接口的实现可以产生一组满足某些输入要求的记录集合。比如:

l  在一个文件集合中的所有记录;

l  在给定访问区间日期区间[2000-01-15…2000-01-22]内的所有记录;以及

l  Bigtable表T中语言类型为”Turkish”的所有记录

 

当然如Pavlo等所说,输入可能的确需要在一组文件集合上进行完全的扫描。但是通常可以采用一些改进的实现。比如输入可能是提供了高效过滤功能的带索引数据库,或者是一个带索引的文件结构(比如可以高效地基于日期进行数据过滤的日志文件)。

 

这种关于MapReduce的错误假设,影响到了对比论文中的5个benchmarks中的3个(selection,aggregation,join tasks),同时使得论文关于MapReduce和并行数据库系统的性能差距的结论缺乏说服力。

 

Merging results 对比论文中的所有5个benchmarks的Hadoop测量结果包括了将初始的MapReduce结果合并为一个文件的最终阶段的开销。实际中,这个merging是没有必要的。因为MapReduce的输出通常会由另一个MapReduce来使用,它可以简单地直接操作第一个MapReduce的输出,而不需要一个单个的输出文件。即使使用者不是一个MapReduce,初始的MapReduce可以通过它的reducer过程直接将数据写到一个merge好的目标(比如Bigtable或者是并行数据库的表)。

 

数据加载 比较论文中的DBMS测量结果中,在进行数据分析之前的加载过程产生了相当大的开销。对于对比论文中的很多benchmarks来说,并行数据库的输入数据加载开销是Hadoop进行数据分析的开销的5到50倍。换句话说,对于某些benchmarks来说,数据库进行数据加载和分析的时间,都够在数据上进行50次单独的MapReduce分析了。如果加载之后查询需要经常执行,这么长的加载时间也没什么关系。但是情况通常不是这样的,数据通常产生之后,只需要处理一两次。比如,MapReduce论文中描述的网页搜索索引构建系统,它的绝大多数MapReduce阶段的输出只会被一个或两个后续的MapReduce阶段所使用。


结论

在比较论文中的关于性能的结论是建立在很多关于MapReduce的错误假设之上的,同时也夸大了并行数据库系统的性能。我们的使用经验表明,MapReduce是一个进行大规模容错的数据分析的非常有效和高效的工具。通过这个讨论,我们也得到了一些有价值的观点:

 

启动延迟 MapReduce实现应当通过使用像在不同调用中重用worker进程这样的一些策略来尽量地降低启动延迟。

数据-shuffling 必须仔细关注数据-shuffling阶段的实现,避免在一个具有M个map tasks和R个reduce tasks的MapReduce中产生O(M*R)的seek操作。

文本化格式 MapReduce用户必须避免使用低效的文本格式。

天然索引 MapReduce用户应当尽可能地利用起天然的索引(比如log文件名里的时间戳)

Unmerged output 绝大多数的MapReduce输出都不需要进行合并,因为如果下一个数据使用者是另一个MapReduce,合并没有任何好处。

 

与并行数据库相比,MapReduce具有很多显著的优势。首先也是最重要的,它为大规模jobs提供了细粒度的容错性;在一个需要运行几个小时的任务中间出现错误时,不需要从头开始。其次,MapReduce对于一个具有多个存储系统的异构系统的数据处理和加载非常有帮助。第三,MapReduce提供了一个可以执行那些比SQL所能直接支持的更复杂函数的好框架。


参考文献

1. A bouzeid, A., Bajda-Pawlikowski, K., Abadi, D.J.,Silberschatz, A., and Rasin, A. HadoopDB: An architectural hybrid of MapReduce and DBMS technologies for analytical workloads. In Proceedings of the Conference on Very Large Databases (Lyon,France, 2009); http://db.cs.yale.edu/hadoopdb/

2. Aster Data Systems, Inc. In-Database MapReduce for Rich Analytics;

http://www.asterdata.com/product/mapreduce.php.

3. Chang, F., Dean, J., Ghemawat, S., Hsieh, W.C.,Wallach, D.A., Burrows, M., Chandra, T., Fikes, A.,and Gruber, R.E. Bigtable: A distributed storage system for structured data. In Proceedings of the Seventh Symposium on Operating System Design and Implementation (Seattle, WA, Nov. 6–8).

Usenix Association, 2006; http://labs.google.com/papers/bigtable.html

4. D ean, J. and Ghemawat, S. MapReduce: Simplified data processing on large clusters. In Proceedings of the Sixth Symposium on Operating System Design and

Implementation (San Francisco, CA, Dec. 6–8).

Usenix Association, 2004; http://labs.google.com/papers/mapreduce.html

5. D ewitt, D. and Stonebraker, M. MapReduce: A Major Step Backwards blogpost; http://databasecolumnvertica.com/database-innovation/mapreduce-a-majorstep-backwards/

6. D ewitt, D. and Stonebraker, M. MapReduce II blogpost;

http://databasecolumn.vertica.com/database-innovation/mapreduce-ii/

7. Ghemawat, S., Gobioff, H., and Leung, S.-T. The Google file system. In Proceedings of the 19th ACM Symposium on Operating Systems Principles (Lake George, NY, Oct. 19–22). ACM Press, New York, 2003; http://labs.google.com/papers/gfs.html

8. Google. Protocol Buffers: Google’s Data Interchange Format. Documentation and open source release;http://code.google.com/p/protobuf/

9. Greenplum MapReduce: Bringing Next-Generation Analytics Technology to the Enterprise;

http://www.greenplum.com/resources/mapreduce/

10. Hadoop. Documentation and open source release;

http://hadoop.apache.org/core/

11. Hadoop. Users list; http://wiki.apache.org/hadoop/

12. O lston, C., Reed, B., Srivastava, U., Kumar, R., and Tomkins, A. Pig Latin: A not-so-foreign language for data processing. In Proceedings of the ACM SIGMOD

2008 International Conference on Management of Data (Auckland, New Zealand, June 2008); http://hadoop.apache.org/pig/

13. Pavlo, A., Paulson, E., Rasin, A., Abadi, D.J., DeWitt,D.J., Madden, S., and Stonebraker, M. A comparison of approaches to large-scale data analysis. In Proceedings of the 2009 ACM SIGMOD International Conference (Providence, RI, June 29–July 2). ACM Press, New York, 2009;

http://database.cs.brown.edu/projects/mapreduce-vs-dbms/

14. Pike, R., Dorward, S., Griesemer, R., and Quinlan, S.Interpreting the data: Parallel analysis with Sawzall. Scientific Programming Journal, Special Issue on Grids and Worldwide Computing Programming Models and Infrastructure 13, 4, 227–298.

http://labs.google.com/papers/sawzall.html

 

Jeffrey Dean (jeff@google.com) is a Google Fellow in the Systems Infrastructure Group of Google, Mountain View, CA.

Sanjay Ghemawat (sanjay@google.com) is a Google Fellow in the Systems Infrastructure Group of Google, Mountain View, CA.

You Might Also Like