分布式系统

A Comparision of Approaches to Large-Scale Data An

2011年10月7日 阅读(510)

作者:Andrew Pavlo &Erik Paulson etc. 2009-6

原文:http://db.csail.mit.edu/pubs/benchmarks-sigmod09.pdf

译者:phylips@bmy 2011-10-4

译文:http://duanple.blog.163.com/blog/static/7097176720119701941950/

摘要

使用MapReduce模型进行大规模数据分析目前已成为一个炙手可热的领域而备受关注[17]。尽管该框架的基本控制流实际上早在20多年前就出现在数据库管理系统(DBMS)中了,但是有些人还是把它称为一个全新的计算模型[8,17]。在本文中,我们分别对这两种模型进行了描述和对比。此外,我们还对比了这两种系统的性能和开发的复杂度。最后,我们定义了由多个计算任务组成的benchmark,并在一个MR的开源版本和两个并行DBMS系统上进行了测试。针对每个计算任务,我们在一个100节点的集群上进行了多个并行度上的系统性能测试。我们得到了一些有趣的结论。尽管并行DBMS的数据加载过程和执行调优所花费的时间比MR系统要长,但是这些DBMS系统所表现出的执行性能却比MR系统好很多。我们对产生这种显著的性能差异的原因进行了思考,并考虑了那些未来系统所需要从这两种架构学习的地方。

1.    导引

最近的各种期刊上到处充斥着各种关于“云计算”革命的新闻。该名称特指利用大量并行工作的低端处理器来解决计算问题。实际上,这意味着通过罗列大量低端服务器而不是采用少量的高端服务器来构建数据中心。随着人们对该些技术兴趣的上升,已经产生出一系列在它们之上进行编程的工具,而MapReduce就是其中最早也是最著名的一个。MapReduce非常具有吸引力,因为它提供了一种让用户可以编写相对复杂的分布式程序的简单模型,已经引起了教育领域的极大兴趣。比如,IBM和Google已经宣布计划建造一个1000处理器的MapReduce集群来进行MapReduce编程的教学。

 

面对人们这种对于MapReduce的兴趣,很自然的会问“为什么不使用并行DBMS呢?”并行数据库系统投入商业化使用都已经快20年了,同时市场上也存在了一打的产品,包括Teradata,Aster Data,Netezza,DATAllegro,Dataupia,Vertica,ParAccel,Neoview,Greenplum,DB2(通过数据库分区特性),Oracle(通过Exadata)。它们都是很健壮的高性能计算平台。与MapReduce类似,它们也提供了高级的编程环境和方便的并行化。尽管看起来MR和并行数据库面向的是不同的受众,但是事实上几乎所有的并行处理任务都可以通过一组数据库查询(可能会使用用户自定义函数和聚合器进行数据的过滤和合并)或者是一组MapReduce jobs进行描述。受该问题的启发,我们决定研究下用于大规模数据分析的MapReduce和并行数据所采用的策略到底有什么不同。这两类系统在很多关键地方都做出了不同的选择。比如,所有的DBMS都要求数据具有定义良好的schema,然而MapReduce允许数据是任意的格式。还有一些其他的不同,比如系统如何提供索引和压缩优化,编程模型,数据分布方式以及查询执行策略。

 

本文的目的是分析这些方面的选择以及它们所进行的取舍。在第2节,我们简要地看下这两种不同的系统,第3节会讨论下架构上的取舍。然后,在第4节介绍下由多个计算任务组成的benchmark,其中一个任务是从MR论文上摘出来的,剩下的则是一些具有更高要求的计算任务。另外,还给出了在一个100节点集群上的执行每个任务后的该benchmark运行结果。我们测试了MapReduce的开源版本Hadoop,以及另外两个并行SQL DBMS:Vertica和另一个来自某主要关系数据库厂商的系统。也给出了每个系统的数据加载过程所花费的时间,同时对每个任务对应的软件配置和调优过程提出了一个非正式的使用报告。

 

通常情况下,SQL DBMS都要快很多,同时实现每个任务也需要更少的代码,但是调优和数据加载花费的时间要长些。最后,我们对这两种方式的各种差异的原因进行了讨论,同时提出了一些关于大规模数据分析引擎的最佳实践方面的建议。

 

有些读者可能会认为采用100个节点进行实验不具代表性,或者说不能反映真实的数据处理系统。我们并不认同这种观点,对此我们有两点说明:首先,正如第4节所展示的,在100个节点下,两个并行DBMS在很多分析任务上都比MapReduce快了3.1-6.5倍。虽然MP可能实际上可以扩展到1000个节点,但是现代DBMS的高效性使得数据集合在1-2PB量级上时,根本不需要那么多的硬件就可以处理(1000个节点,每个节点具有2TB的话就是2PB)。比如,eBay的Teradata集群只使用72个节点(每个节点:2个双核CPU,32GB内存,104个300GB磁盘)管理了接近2.4PB的关系型数据。再比如,福克斯互动媒体集团的数据仓库是采用40节点的Greenplum DBMS实现的(总共1PB磁盘空间)。每个节点是一个具有双核CPU,48个500GB磁盘和16GB内存的Sun X4500机。因为实际中很少有数据集会达到1PB大小,所以很明显根本没几个MR用户真的需要1000个节点。

2. 两种用于大规模数据分析的方法

这篇文章里我们所讨论的两种系统都是运行在一个“无共享(share nothing)”计算机集合上。也就是说,系统是部署在一系列独立的机器组成的集合上,每个机器都具有本地磁盘和本地主存,通过一个高速本地网络连接在一块。两种系统的并行性实现,都是通过将数据集划分为独立的partitions,然后将这些partitions放到不同的节点以便于并行处理。在本节中,我们会大体描述下MR模型和传统并行DBMS是如何在该环境下进行工作的。

2.1 MapReduce

MapReduce编程模型其中一个最吸引人的特性就是它的简单性:一个MR程序只由两个函数组成,分别是Map和Reduce,它们由用户书写来对key/value对进行处理。输入数据集合存储在部署在集群的每个节点上的分布式文件系统中。该程序,会被插入到一个分布式处理框架中然后按照用户描述的行为执行。

 

Map函数会从一个输入文件读取一系列的记录,进行一些用户期望的过滤和/或转换,然后输出一系列由新的key/value对组成的中间记录集合。当Map函数产生出这些输出记录后,一个“split”函数将会用来将记录划分到R个不相交的桶里,通过对每条输出记录的key应用该函数。通常该split函数都是一个hash函数,实际上只要是个确定性函数即可。每个Map桶将会被写入到处理节点的本地磁盘。Map函数结束后将会产生R个输出文件,一个桶一个。通常,都会有多个Map函数实例(instance)运行在集群的不同节点上。我们采用实例(instance)来代表Map或Reduce函数的一个执行实例。MR调度器会为每个Map instance分配输入文件的一部分让它进行处理。假设有M个Map,R个Reduce,那么总共会有M*R个文件;Fij,1=<i<=M,1=<j<=R。关键的一点是所有的Map instance会使用相同的hash函数;因此,具有相同hash值的所有输出记录会被存储到相同的输出文件中。

 

MR程序的第二阶段会执行Reduce程序的R个实例,R通常就是节点数。每个Reduce instance Rj的输入由文件Fij组成,1=<i<=M。这些文件会通过网络从Map节点的本地磁盘传输过来。需要再次注意的是,来自Map阶段的具体相同hash值的所有输出记录会交给同一个Reduce instance,而不管是哪个Map instance产生的。每个Reduce会以同样的方式对分配给它的记录进行处理或合并,然后将输出结果写到一个输出文件中(在分布式文件系统中),该文件就组成了该计算过程的最终输出。

 

输入数据集通常是分布式文件系统中的一个或者多个partitions集合。决定启动多少个Map instance及如何将它们分配给各个节点是MR调度器的工作。同样的,调度器也会决定Reduce instance的个数和运行节点。MR中央控制器负责协调每个节点上的系统活动。一旦最终结果写入到分布式文件系统中后,MR程序就结束了。

2.2并行DBMS

可以运行在非共享节点组成的集群上的数据库系统早在20世纪80年代末就出现了。这些系统都支持标准的关系型表和SQL,同时对于终端用户来说数据事实上是透明地存储在多个机器上的。很多这类系统都是建立在Gemma和Grace的并行DBMS项目的先驱性研究成果之上的。能够并行执行有两个关键:1.大部分(甚至是所有的)表被划分到集群的所有节点上2.系统使用一个优化器来将SQL命令翻译成在多个节点上执行的查询计划。因为程序员只需要使用高级语言描述他们的目的,因此他们根本不需要关心底层存储细节,比如索引配置和join的策略。

 

考虑一条SQL命令,它首先在表T1中基于断言过滤记录,然后会与另一个表T2进行join,最后在join的结果上进行聚合计算。该命令在并行DBMS中的大体执行过程可以分为三个阶段。因为数据库已经基于某个属性将存储的T2表内容划分到多个节点中,因此负责过滤的这个子查询会首先在这些位置并行执行,类似于Map函数中的过滤。之后,会基于数据表的大小执行一个两路并行join算法。比如,如果T2中的记录数比较少的话,那么DBMS会在首次加载时将它复制到所有节点,这样就可以在所有节点并行执行join了。之后,每个节点会对join的结果部分进行聚合计算。最后的“roll-up”步骤,会根据各个部分的聚合结果计算出最终结果。

 

如果T2表的数据规模很大,那么T2的内容将会被分布到多个节点中。如果这些表划分时所使用的key不是join时使用的key,那么系统会对T2和T1过滤后的版本在join所用的属性上使用一个相同的hash函数。之后对于过滤版本的T1和T2的重新分布过程就类似于Map和Reduce函数之间的处理过程。一旦某个节点得到了它所需要的数据,那么它就会执行一个hash join,同时会进行聚合函数的计算。最后也有一个“roll-up”步骤,根据各个部分的聚合结果计算出最终结果。

 

乍一看,这两种数据分析和处理方式有很多共同点;但是它们之间也有很多区别,我们下一节会进行讨论。

3.架构元素

本节,我们会讨论下两种系统在分布式环境下处理大规模数据的各种特点。其中的一个观点就是,MR模型本身的特性很适合于具有少量开发人员和有限的应用领域的开发环境。然而约束性的缺乏,使得它可能并不适合长期和大规模的项目。

3.1 Schema支持

并行DBMS要求数据以关系型模式的行列进行组织。与此相比,MR模型并不要求数据文件具有一个使用关系数据模型定义的schema。也就是说,MR程序员可以以任意方式对他们的数据进行结构化甚至是根本没有结构。

 

有人可能认为正是因为没有严格的schema才使得MR如此让人偏爱。比如,SQL经常会因为它需要程序员必须使用数据定义功能来定义数据“形状”而受到批评。另一方面,为了还原输入记录的真实语义MR程序员通常都需要编写一个解析器,这个工作量还是逃不掉的。但是对于大规模数据集不定义一个schema也还会有其他方面的问题。

 

无论MR输入文件的结构如何,它都得内建于Map和Reduce函数中。现有的MR实现提供了一些内建功能来处理简单key/value对格式的解析,但是对于更复杂的数据结构,比如复合key值,程序员必须显式的编写支持程序。如果MR数据集不会被多个应用访问,这可能还是可以接受的。但是如果数据经常被共享,那么第二个程序员就得读懂第一个程序员的那部分代码来确定如何处理输入文件。更好的方式,应该是像DBMS那样,将schema与应用程序分离,将它存储在可以被查询的系统元数据集合中。

 

但是即使schema与应用分离了,而且已经通过一种数据描述机制使得它对于多个MR程序都是可用的了,开发者还必须对这个schema达成一致。这显然需要对于数据模型提出某些保证,同时输入文件必须遵守该保证,因此一旦文件创建就很难去修改它的数据属性。

 

一旦程序员对数据结构达成一致,那么其他人就要保证任何数据的添加或修改都不能改变数据的完整性或者其他的高级限制(比如雇员工资必须非负)。这样的一些条件必须要通知到会修改该数据集的程序员;MR框架和底层存储系统对这些规则一无所知,因此输入数据很有可能会被脏数据损坏。这就需要再一次地将这些限制和应用程序分离,同时通过一个运行时系统来自动地检查,就像DBMS那样,数据完整性的保证不需要程序员做额外的工作。

 

总的看来,如果是没有共享的情况下,MR模式是非常灵活的。但是如果需要共享,正如我们所说,使用一个数据描述语言,将数据定义和完整性限制与应用程序分离是一大优势。这些信息对于相应的用户和应用程序应该都是可以访问的。

3.2索引

所有的现代DBMS都使用hash或者B树索引来加速数据访问。如果某人要查找一个记录子集(比如工资大于$100,000的雇员),那么使用合适的索引可以明显缩小查询的范围。大部分数据库都允许单个表格具有多个索引。因此,查询优化器可以决定为用户查询使用哪个索引或者是简单地采用一个暴力的顺序搜索。

 

因为MR模型是如此简单,MR框架不支持内建索引。如果程序员想加速它们应用程序内的数据查询必须自己实现索引。通常这不太容易完成,因为框架的数据获取机制也必须要修改,使得在将数据推送给Map instance时能使用索引。另外,如果索引不需要在多个程序员间共享这也是一个可以接受的策略,尽管需要每个MR程序员重复地造轮子。

 

但是如果需要共享,那么程序员之间就需要对索引格式以及如何使用它们进行沟通。这时,最好还是把索引信息以标准格式存储到系统目录下,这样程序员通过查询该结构就可以获取到相应的信息。

3.3编程模型

在20世纪70年代,数据库研究社区就发生了在关系模型支持者和网状模型支持者之间的一场大辩论。讨论的焦点在于DBMS中的数据访问程序该通过如下哪种方式编写:

1.      说明你想要什么—而不是描述一个如何得到它的算法(Relational)

2.      描述一个数据访问算法(Codasyl)

最终,第一种观点取得了胜利,同时过去的30年已经确实证明了关系数据库系统的价值。高级语言程序,比如SQL,易于编写,易于修改,易于理解。Codasyl被人批评为“访问DBMS的汇编语言”。我们认为MR编程有些类似于Codasyl编程:人们为了执行记录级的处理必须采用底层的语言编写算法。另一方面,对于很多习惯了过程性语言编程(比如C/C++或者Java)的人来说,可能也会不喜欢像SQL这样的语言。

 

来自MR社区的证据表明很多task之间具有很多重复性的代码,比如数据集的joinning。为了避免不断实现这种重复性的task,MR社区正在现有接口之上集成一些更高级的语言。Pig和Hive正是这个方向上两个非常知名的项目。

3.4数据分布

对于大规模数据库有一个传统观点:总是将计算发给数据,而不要反其道而行之。换句话说,应当把程序通过网络发送给节点,而不是从节点导入大量数据。并行DBMS可以充分利用数据分布和位置信息:并行查询优化器在尽量平衡计算负载的同时,会最小化通过网络传输的数据量。

 

除了将Map instances调度到哪的初始决定外,MR程序员必须手动执行这些任务。例如,假设一个用户编写了一个对一组文档分两部分进行处理的MR程序。首先,Map函数扫描文档然后创建一个单词出现频率的直方图。然后这些文档被传递给Reduce函数,并根据它们所属的原始站点进行分组。现在用户想在第一个用户工作的基础上,通过这些数据,找到那些其中某个文档中“Google”或者“IBM”出现超过5次的站点。在该查询的最基本实现中,Map函数的过滤条件是在所有文档计算完毕并且发送给reduce之后才执行的,实际上只有少数的文档满足这个过滤条件。

 

与之相比,下面的SQL view和select查询完成了类似的计算:

CREATE VIEW Keywords AS

SELECT siteid, docid, word, COUNT(*) AS wordcount

FROM Documents

GROUP BY siteid, docid, word;

SELECT DISTINCT siteid

FROM Keywords

WHERE (word = ‘IBM’ OR word = ‘Google’) AND wordcount > 5;

现代的DBMS会重写第二阶段的查询,因为可以直接用视图定义替换掉FROM子句里的Keywords。之后,查询优化器会将查询中WHERE子句下调,这样在计算COUNT之前将该条件应用到Documents表上,从而大大减少计算量。如果文档是分布在多个节点上的,那么该过滤条件就可以在文档被分组之前过滤掉,因此大大节省了网络IO。

3.5执行策略

MR对于Map和Reduce job间的数据传输处理方式具有潜在的严重性能问题。假设有N个Map instance,每个产生M个输出文件,每个输出文件指向不同的Reduce instance。这些文件会被写到执行Map instance节点的本地磁盘。如果N是1000,M是500,那么Map阶段将会产生500,000个本地文件。当Reduce阶段开始时,500个Reduce instance,每个都需要读取1000个输入文件,同时必须使用一个文件传输协议从每个Map instance所运行的节点处拉取输入文件。假设同时有100个Reduce instance并行执行,不可避免地将会有2个或者更多的Reduce instance同时在同一个节点上试图读取文件,这就会产生大量的磁盘seek操作,从而降低磁盘传输效率。这也是为什么并行数据库系统没有将它们的中间数据保存为文件,并且采用了一种推模式而不是拉模式进行数据传输的原因。

3.6灵活性

尽管被广泛采用,但是SQL经常会因为它缺乏表达力而受到批评。有些人认为,20世纪70年代的数据库研究者们不应该专注于一个可以嵌入到任何编程语言的数据子语言的研究,而是应该为所有的编程语言提供一个高级的数据访问接口。幸运的是,新的应用程序框架,比如Ruby on Rails和LINQ,已经开始扭转这种现状,通过利用新的编程语言功能来实现一个对象-关系映射模式。这种编程环境允许开发者不需要编写复杂的SQL就可以直接利用现有的DBMS技术。

 

MR模型的支持者们认为SQL没法提供MR那样的通用性。但是几乎所有的主要DBMS产品(无论是商业的还是开源的)现在都在SQL中提供了用户自定义函数支持,存储过程以及用户自定义聚合器。尽管无法达到MR那样的通用性,但是它们确实提高了数据库系统的灵活性。

3.7容错性

MR框架提供了一个比DBMS更复杂的容错模型。虽然这两种系统都通过某种形式的replication来处理磁盘失败,MR更擅长于处理MR计算过程中出现的节点失败。在一个MR系统中,如果某个工作单元出错,那么MR调度器可以自动地在另一个节点上重启该任务。这种灵活性部分是因为Map阶段的输出文件是存储在本地而不是流式地传送给Reduce节点的。类似地,多个MR jobs组成的pipeline,比如4.3.4节描述的一个,会将每一步的中间结果保存到磁盘。这与并行DBMS不同,在出现失败时DBMS重启的粒度会很大,DBMS采用这种策略的部分原因是它总是尽可能地避免将中间结果存到磁盘。因此如果在DBMS中某个长时间运行query的执行期间,某个节点挂了的话,整个查询必须从头开始。

4.性能Benchmark

在本节,我们提出了用来比较MR模型和并行DBMS性能的由5个tasks组成的benchmark。第一个task直接来源于原始的MapReduce论文,据作者称它代表了大部分的MR计算类型。因为该task非常简单,我们又开发了其他4个额外的tasks,它们具有更复杂的分析工作,设计用来探索前面这些章节里所讨论的各种取舍。我在一个知名的MR实现和两个并行DBMS上运行了这些benchmarks。

 

4.1Benchmark环境

下面描述下我们的benchmark环境的具体细节,指出我们测试的数据分析系统在操作假设上的不同,并讨论下我们为尽量让实验环境保持一致采用的方法。

4.1.1测试系统

Hadoop:Hadoop系统是由Yahoo!和Apache软件基金会开发的当前最流行的MapReduce框架开源实现。不像Google原始的MR框架实现是用C++编写的,核心系统都是用Java写的。在我们的实验中,我们使用了运行在Java 1.6.0之上的Hadoop 0.19.0版本。为了得到更好的性能,除进行如下不涉及MR核心的修改外,其余都使用默认配置进行该系统的部署:1.数据使用256MB的数据块大小进行存储而不是默认的64MB;2.每个任务执行器的JVM最大堆大小为512MB,同时DataNode/JobTracker的JVM最大堆大小配置为1024MB(每个节点总共3.5GB,三个任务执行器是3*512MB,再+2*1024MB);3.打开了Hadoop针对集群数据locality的“rack awareness”特性;4.在为每个Map/Reduce task启动新的处理器时允许重用task的JVM执行器。此外我们还将系统配置为:每个节点上可以运行2个Map instance和1个Reduce instance。

 

Hadoop框架也提供了一个GFS的实现。对于每个benchmark试验,我们将所有输入和输出数据保存在HDFS上。使用HDFS默认的单block三副本配置同时没有使用压缩;我们也测试过其他的一些配置方案,比如每个block一个副本,进行块级别和记录级别的压缩,我们发现测试结果表明在这些配置情况下,性能没有提高甚至变得更差了(见5.13节)。在每个特定节点规模下的benchmark运行结束后,我们会删除每个节点的数据目录同时重新格式化HDFS,以保证下次运行时的输入数据集合是均匀分布到各个节点上的。

 

Hadoop使用一个中央job tracker和一个“master”HDFS后台进程来协调节点活动。为了保证这些进程不会影响到工作节点的性能,我们用集群中单独的一个节点来运行这些额外的框架组件。

 

DBMS-X:我们使用了最新版的DBMS-X,来自某主要关系数据库厂商的基于行存储的并行SQL DBMS。该系统安装在每个节点上,为缓存池和其他临时空间配置了4GB的共享内存空间。每个表会通过对该表的某个属性进行hash划分到所有的节点中,同时会对不同的属性建立索引(见4.2.1和4.3.1)。与Hadoop环境类似,我们会为每次试验删除DBMS-X里的表然后重新加载数据,以保证各个元组均匀地分布在集群中。

 

默认情况下,DBMS-X的内部存储不会对数据进行压缩,但是它提供了一个基于字典的表压缩支持。我们发现打开压缩几乎可以将所有benchmark tasks的执行时间降低50%,因此我们的实验结果都是指打开压缩的情况下的。但是有一种情况,我们也确实发现使用压缩后性能实际下降了。此外,因为我们的benchmark都是只读的,所以没有打开DBMS-X的relication,因为这不能提高性能同时还会导致安装过程的复杂化。

 

Vertica:Vertica数据库是一个设计用于大规模数据仓库的并行DBMS。与其他DBMS(包括DBMS-X)的主要区别是它的所有数据是按列存储的,而不是按行。同时它使用了一个专门为面向列的存储层定制的执行引擎。不像DBMS-X,Vertica的数据压缩默认就是打开的,因为它的执行器可以直接在压缩的表上进行操作。在典型的Vertica部署中,该feature都是打开的,因此本文中的结果也都是打开压缩的情况下的。Vertica也能基于一个聚簇索引来将表根据一个或多个属性进行排序。

 

我们发现每个节点默认的256MB缓存大小在我们的实验中已经可以工作地很好了。Vertica资源管理器负责用于每个查询的内存大小设置,但是我们可以给系统提供一个hint,可以让系统一次只执行一个查询。因此,这样在运行时刻,某个查询的可用内存量就可以达到每个节点的最大可用内存量。

4.1.2节点配置

三个系统都是部署在一个100节点集群上。每个节点具有一个运行着64位Red Hat Enterprise Linxe 5(内核版本2.6.18)的2.40GHz的Intel双核Duo处理器,具有4GB内存和2个250GB SATA-I硬盘。根据hdparm,硬盘对于cached read传输速率为7GB/s,对于buffered read为74MB/s。节点间通过Cisco Catalyst 3750E-48TD交换机相连。该交换机为每个节点提供gigabit以太网接口以及一个内部的128Gbps交换网络。每个交换机上有50个节点。交换机之间通过一个Cisco StackWise Plus连接,建立了带宽为64Gbps的交互机之间的环状拓扑。同时交换机内部节点间的流量完全是本地的,不会影响到环状拓扑上的流量。

4.1.3 Benchmark执行

对于每个benchmark task,我们会描述整个MR程序的实现步骤以及由数据库系统执行的等价SQL语句。每个task我们会执行三次,然后取平均值。每个系统独立地执行这些benchmark tasks,以保证对集群资源的互斥访问。为了测量除去并行任务协调开销后的基本性能,我们先在单个节点上执行每个task。然后在集群上以不同的节点规模执行该task,来查看伴随着数据规模和集群资源增长的可扩展性。我们只采纳了那些所有节点都可用同时系统软件在benchmark执行期间正确工作时产生的结果。

 

我们也测量了每个系统用于测试数据的加载所花费的时间。测量结果还将实际的数据加载和加载之后系统所执行的其他额外操作进行了分离,比如压缩和索引构建过程。每个节点的初始输入数据是存储在它的两个本地磁盘中的一个上的。

 

除非明确说明,Vertica和DBMS-X的查询执行结果都是直接通过管道将shell命令的输出存入磁盘文件中,而不是通过DBMS。尽管可以在Hadoop中执行一个与之等价的操作,但是将MR程序的执行结果存到HDFS里会更简单也更常见些。但是,这个过程与DBMS处理输出数据的方式完全不同;不是将结果存储到单个文件中,MR程序的每个Reduce instance都会产生一个输出文件并将它们存储到一个目录下。对于开发者来说,实践中通常会把该输出目录作为另一个MR job的输入单元。但是,如果用户想通过一个非MR应用使用这些数据,他们就必须先将结果合并为一个文件然后下载到本地文件系统。

 

为解决这个问题,我们会为每个MR benchmark task执行一个额外的Reduce函数,该函数会将最终的输出合并为HDFS上的单个文件。因为我们的结果实际上包含了执行实际benchmark task的时间以及该额外的合并操作的时间。因此,本论文各图中的Hadoop结果都是使用一个具有2部分的柱状图来表示:下半部分代表实际benchmark task的时间,上面部分代表合并操作的时间。

4.2原始的MR Task

我们的第一个benchmark task是取自原始MapReduce论文的“Grep task”。在该task里,所有系统都必须要扫描一个100字节长的记录集合去查找一个三字符的片段。每个记录的前10个字节是一个唯一的key,后面90字节是随机生成的value。待查找的片段只会在每10,000记录里的最后90个字节里出现。

 

输入数据以普通文本文件格式存储在每个节点上,一条记录一行。对于Hadoop实验,我们会将这些文件直接上传到HDFS。为了将数据加载到Vertica和DBMS-X,我们会在每个节点上并行执行系统自带的load命令然后采用如下的schema存储数据。

CREATE TABLE Data (

key VARCHAR(10) PRIMARY KEY,

field VARCHAR(90) );

 

我们使用两个不同的数据集来执行Grep task。原始MapReduce论文的测量实验是在大概1800个节点上对1TB数据进行处理,算下来每个节点大概有5.6 million条记录或者是535MB数据。对于每个系统来说,我们会在集群大小为1,10,25,50和100节点的情况下运行该task。不同规模的集群所处理的记录总数就是5.6 million乘以节点数。所以每个系统的性能不仅展示了系统随数据增长的扩展性,同时也可以与原始的MR系统结果进行对比。

 

在我们的第一个测试数据集中,我们会固定每个节点的数据大小,以使它与原始MR benchmark保持一致,同时只会改变节点数进行测试。在第二个测试数据集中,我们会固定总数据量的大小(1TB)让它与原始MR benchmark一致,然后将数据均匀划分到不同数目的节点上。该task测量了随着可用节点数的增加,系统的扩展性如何。因为Hadoop实际上需要总共3TB的磁盘空间来存储这些数据,因此对于该情况下的benchmark,我们只针对25,50和100节点的情况进行了测试(在小于25节点的情况下,没有足够的磁盘空间来存储3TB数据)。

4.2.1数据加载

我们现在描述下将数据从节点的本地文件加载到每个系统的内部存储中的过程。

 

Hadoop:有两种方式将数据加载到HDFS:1.使用Hadoop命令行工具将存储在本地的文件上传到HDFS或者2.使用Hadoop内部IO API创建自己的数据加载程序。因为我们不需要为我们的MR程序修改输入数据,因此直接在每个节点上并行地采用Hadoop的命令行工具上传文件即可。采用这种方式存储的数据可以通过Hadoop的TextInputFormat数据格式访问数据,在该格式中,key就是文件中的行编号,value就是行的内容。我们发现这种方法,与使用Hadoop序列化格式或者是压缩的情况相比,无论是数据加载还是任务执行都得到了最好的性能。

 

DBMS-X:DBMS-X的加载过程分两个阶段。首先我们会在集群中的每个节点并行执行LOAD SQL命令来从本地文件系统中读取数据,然后将它们的内容插入到数据库中的特定表中。在该命令中,允许我们指明本地数据是通过特殊字符进行分割的,因此不需要在load之前对其进行转换。但是因为我们的数据是随机生成的,因此当系统读到每条记录时,必须根据目标表进行划分的属性将它重新分布到其他节点。当然我们也可以使用一个“hash-aware”的数据生成器使得DBMS-X只需要加载它即可而不需要重分布这个过程,但是我们觉得这也不会带来太大的性能提升。

 

一旦初始的加载阶段完成,之后我们还要执行一个管理命令,来让每个节点重新组织数据。该过程并行地在节点上进行数据压缩,所有的表索引的构建,执行其他的内部处理。

 

Vertica:Vertica也提供了一个COPY SQL命令,该命令是由单个主机发起的,然后会将加载过程在集群中的多个节点上进行协调使它们并行地执行。用户会传递一个执行加载操作的节点列表给COPY命令作为输入。该过程类似于DBMS-X:在每个节点上,Vertica加载器将输入数据根据分隔符分割,为输入文件的每行创建一个元组,然后根据它的主键的hash值将该元组重新分布。一旦数据加载完成,对应的列会根据数据库的物理设计自动地进行排序压缩。

结果与讨论:加载535MB/node和1TB/cluster数据集的结果分别如图1和图2所示。对于DBMS-X,我们将加载过程分为两阶段,如图所示,底部代表并行LOAD命令的执行时间,上部代表数据重新组织的过程。

 

A Comparision of Approaches to Large-Scale Data Analysis(译) - 星星 - 银河里的星星

 

 对于图1来说最显著的特点就是DBMS-X与Hadoop和Vertica之间的性能差异。尽管DBMS-X第一阶段的LOAD命令是在每个节点上并行执行的,但是数据实际上是被顺序加载的{!因为它们实际上在写同一张表,可能因为锁等机制的原因,实际上是顺序的写入该表的}。因此,随着数据总量的增加,加载时间也是成比例上升的。同时这也解释了,对于1TB/cluster的情况,虽然单节点存储的数据少了为什么加载时间依然没有下降。然而,DBMS-X的压缩和内部处理阶段是并行进行的,因此图2中第二阶段的执行时间随着节点数的增加执行时间在下降。

 

在未采用任何压缩的情况下,Hadoop很明显要快于DBMS-X和Vertica,因为每个节点只是简单地将数据文件从本地磁盘拷贝到本地的HDFS实例及将另外两份分布到集群的其他节点上。如果在加载数据到Hadoop时只设置了一个副本,那么加载时间应该会降低3倍。但是正如我们在第5节讨论的,没有多副本的支持通常会增加jobs的执行时间。

4.2.2 Task执行

SQL命令:由于SQL系统中没有在该filed上的索引,因此该查询需要一次全表扫描。SELECT * FROM Data WHERE field LIKE ‘%XYZ%’;

 

MapReduce程序:MR程序仅由一个Map函数组成,该函数接受一个已分离成对应的key/value对的单个记录,然后在value上执行一个子串匹配。如果找到了待搜索的子串,Map函数简单地将输入key/value对输出到HDFS。因为没有定义Reduce函数,因此每个Map instance的输出就是该程序的最终输出。

 

结果与讨论:针对三个系统的性能结果见图4和图5。令人吃惊的是,系统间的相对差异在这两个图中是不一致的。

A Comparision of Approaches to Large-Scale Data Analysis(译) - 星星 - 银河里的星星

在图4中,两个并行数据库系统性能基本一致,都比Hadoop块两倍多。但是在图5,DBMS-X和Hadoop又都比Vertica慢了两倍多。原因在于两次实验的数据处理量有很大的不同。对于图4的结果,只有很少数据被处理(535MB/node)。这使得Hadoop不可忽略的启动开销成为限制它性能的主要因素。正如我们在5.1.2节里提到的,对于运行时间很短的查询(比如可能少于1分钟)来说,Hadoop的启动开销会在执行时间中占主导地位。根据我们的观察,在所有的Map task启动并在集群中的所有节点上全速运行之前,大概会花费10-25秒的时间。此外,随着启动的instance数的增加,会增加中央job tracker协调节点活动上的额外开销。因此随着更多节点加入集群以及对于长时间运行的数据处理任务来说,该固定开销会有轻微上升,在图5中,该固定开销被处理所需的时间掩盖了。

 

图中每个Hadoop柱状图的上面部分代表了合并输出到单个文件的额外的MR job的执行时间。因为我们是将它作为一个独立的MapReduce job运行的,因此这一段时间在图4中也占了很大比例,主要也是固定的启动开销导致整个任务执行时间比较长。尽管Grep task是一个选择性的处理过程,但是图5的结果中,这个合并阶段仍然占了几百秒,主要是由于它还需要打开及合并大量小文件。每个Map instance会将它的输出作为一个独立的HDFS文件,因此尽管每个文件都很小,但是由于有很多Map tasks,因此每个节点上都有很多文件。

 

对于1TB/cluster数据集实验结果,图5 表明所有的系统在节点数加倍的情况下,执行时间都差不多变为原来的一半,因为实验的总数据是个常量。Hadoop和DBMS-X基本上具有类似的性能,因为随着数据处理量的上升Hadoop的启动开销被平摊了。然而,结果很明显是Vertica打败了DBMS-X和Hadoop。我们将这归因于Vertica高效的数据压缩(见5.1.3节),每个节点存储的数据越多越高效。

4.3分析型任务

为了探索这两种系统的更复杂的使用方式,我们开发了另外4个与HTML文档处理有关的tasks。我们首先生成了一组随机的HTML文档集合,类似于web爬虫的网页发现模式。每个节点分配了600,000个全局唯一的HTML文档,及一个唯一对应的URL。在每个文档中,我们采用Zipfian分布随机生成了一些指向其他网页的链接。

 

我们还生成另外两个额外的数据集用来模拟HTTP服务器流量日志文件。这些数据集除了包含那些从HTML文档生成的内容外还有几个随机生成的属性。这三个表的schema定义如下:

CREATE TABLE Documents (

url VARCHAR(100)

PRIMARY KEY,

contents TEXT );

CREATE TABLE Rankings (

pageURL VARCHAR(100)

PRIMARY KEY,

pageRank INT,

avgDuration INT );

CREATE TABLE UserVisits (

sourceIP VARCHAR(16),

destURL VARCHAR(100),

visitDate DATE,

adRevenue FLOAT,

userAgent VARCHAR(64),

countryCode VARCHAR(3),

languageCode VARCHAR(6),

searchWord VARCHAR(32),

duration INT );

 

我们的数据生成器会在每个节点上创建155 million的UserVistis记录(20GB/node)及18 million的Rankings记录(1GB/node)。VisitData,adRevenue和sourceIP字段是从特定边界中均匀随机生成的。其他的字段则是从现实世界中的数据集中随机抽样产生的。每个数据文件以按列分割的文本文件格式存放在每个节点上。

4.3.1数据加载

我们现在描述下UserVistis和Rankings数据集的加载过程。由于我们在4.3.5节讨论的某些原因,只有Hadoop需要直接将文档数据加载到它的内部存储系统中。DBMS-X和Vertica都是通过在运行时在每个节点上执行一个处理文档的UDF,然后将数据加载到临时表里。我们将该阶段的时间算在benchmark时间里,而不是加载时间里。因此,我们不再提供文档数据加载结果。

 

Hadoop:不像Grep task那样,可以不做修改地将数据加载到HDFS,UserVistis和Rankings数据集需要进行修改,以使得它每一行的第一列和第二列是通过tab分割,其他字段则通过一个唯一的字段分隔符进行分割。因为MR模型中没有schema,为了在运行时访问不同的属性,Map和Reduce函数就必须手动地根据分割符将这些值分隔后保存到一个字符串数组中。

 

我们编写了一个并行执行在每个节点上来读取数据集的每一行的数据加载器,根据需要进行数据准备,然后将元组写入到HDFS的普通文本文件中。通过这种方式进行数据集加载比直接使用命令行工具慢了大概三倍,但是不需要用户使用Hadoop编写输入handler;MR程序可以通过在数据文件上使用KeyValueTextInputFormat接口来自动地根据tab分隔符将文本文件的一行转换为key/value对。但是,我们也发现其他的数据格式,比如SequenceFileInputFormat或者是用户定义的Writable元组,会导致更慢的执行时间。

 

DBMS-X:我们为DBMS-X使用与4.2节相同的加载过程。Rankings表会根据pageUrl进行hash分区,同时将存储在每个节点上的记录按照pageRank排序。同样地,UserVistis表会根据destinationUrl进行hash分区,同时在节点上根据visitDate进行排序。

 

Vertica:类似于DBMS-X,Vertica也使用与4.2节相同的加载命令,然后分别根据pageRank和visitDate对Ranking表和UserVistis表进行排序。

 

结果和讨论:因为Rankings和UserVistis数据集的加载过程是类似的,所以我们只在图3中提供了UserVistis数据集的加载结果。与图1的Grep 535MB/node数据加载结果类似,加载时间随节点数的增加成比例上升。

4.3.2 Selection Task

Selection task是一个用来查找Ranking表中(1GB/node)pageRank值大于用户给定的某阈值的pageURL集合的轻量级过滤器。对于我们的实验来说,我们将该阈值设为10,这样每个节点的每个数据文件中大概会有36,000条满足条件的记录。

 

SQL命令:DBMS使用如下简单的SQL语句执行该selection task:

SELECT pageURL, pageRank

FROM Rankings WHERE pageRank > X;

 

MapReduce程序:MR程序使用一个简单的Map函数,该函数将输入值根据字段分割符进行提取,然后将那些pageRank值在该阈值之上的记录的pageURL和pageRank做为新的key/value对输出。该任务不需要Reduce函数,因为Ranking数据集中的每个pageURL都是全局唯一的。

 

A Comparision of Approaches to Large-Scale Data Analysis(译) - 星星 - 银河里的星星

 结果与讨论:该实验结果如图6所示,结果再一次表明并行DBMS的性能在所有的集群规模上都要比Hadoop好。尽管所有系统的相对性能伴随着节点数和总数据规模的上升都在降低,但是Hadoop是受影响最大的。比如在1节点和10节点的情况下,执行时间几乎差了50%。这也得归因于伴随着集群节点数的增加,Hadoop的启动开销也在增大,而对于执行时间很短的查询来说这个影响也几乎是线性增加的。

并行DBMS能够打败Hadoop的另一个重要原因是Vertica和DBMS-X都使用了在pageRank上的索引,同时所存储的Rankings表已经是按照pageRank排序的。因此,查询执行时间很短。需要注意的是,尽管Vertica的绝对执行时间都很短,但是它的相对性能随着节点数目的增加在不断降低。事实上,尽管总的执行时间在上升,但是实际每个节点的查询执行时间一直都是相同的(大概170ms)。但是由于节点很快就执行完毕,系统瞬间接受到了来自太多节点的消息,这使得系统需要花费很长时间去处理。Vertica使用一个可靠的消息层来进行查询的分发和协议提交处理,我们认为随着查询所涉及的节点数的增加这个开销会有很大的增加。

4.3.3 Aggregation Task

我们的下一个task需要系统计算UserVisits表根据sourceIP分组后,每个sourceIP对应的adRevenue的总和。我们也对该查询任务进行了一些变动(根据sourceIP的前7个字符进行分组),以测量分组数目减少后对查询性能的影响。我们设计该task是为了测量在单个只读表上进行并行分析的性能,这种情况下为了计算出最终结果,节点之间不需要交换中间数据。无论集群中有多少节点,该tasks总是会产生2.5millon条记录(53MB);变动后的那个查询任务会产生2000条记录(24KB)。

 

SQL命令:SQL命令如下:

SELECT sourceIP, SUM(adRevenue)

FROM UserVisits GROUP BY sourceIP;

修改后的查询的SQL命令如下:

SELECT SUBSTR(sourceIP, 1, 7), SUM(adRevenue)

FROM UserVisits GROUP BY SUBSTR(sourceIP, 1, 7);

 

MapReduce程序:与之前的tasks不同,该task的MR程序将由Map和Reduce函数组成。Map函数首先会将输入记录根据字段分隔符进行分割,然后将sourceIP字段作为key,adRevenue字段作为value进行输出。对于修改后的那个查询,只使用sourceIP的前7个字符。这两个Map函数共享相同的Reduce函数,该函数只是将每个sourceIP下的adRevenue全部加起来,然后输出key和该和。我们使用了MR的Combine特性在数据传输给Reduce instance之前进行一个预先的聚合过程,通过这样做我们将第一个阶段的执行时间降低了2倍。

 

结果与讨论:实验结果如图7和图8所示。再次表明DBMS战胜了Hadoop。DBMS通过让每个节点扫描它本地的table部分,抽取出sourceIP和adRevenue字段,然后执行一个本地的group by。查询协调器会将这些本地的group结果进行merge,最后输出给用户。

A Comparision of Approaches to Large-Scale Data Analysis(译) - 星星 - 银河里的星星

 图7的结果表明两个DBMS平台在group很多情况下执行性能基本上一样,因为此时它们的性能主要受限于将local的groups传输给协调器和进行merge的过程。对于使用的节点数目比较少的情况下,Vertica性能要好些,因为它可以读取更少的数据(因为它可以直接读取sourceIP和adRevenue列),但是随着节点数增加它的性能开始变地要慢一些。

 

根据图8的结果,说明在处理的groups比较少的情况下列存式系统更有优势。因为UserVisits表的每个元组实际上超过200个字节,但是需要访问的那两个列(sourceIP和adRevenue)实际只有20字节,同时由于此时需要进行merge的groups远少于图7的情况,因此所需的通信开销很低,这样瓶颈就不在merge过程了。因此Vertica由于不需要读取那些UserVisits表中未使用的部分,性能要远好于其他两个系统。

 

可以看到所有系统的执行时间在各个节点数规模上几乎是一样的(当然伴随着节点数的增加,Vertica有些轻微的下降)。因为该benchmark task需要系统扫描整个数据集合,因此运行时间主要受控于每个节点的顺序扫描性能和网络repartitioning开销。

4.3.4 Join Task

Join task是由在两个数据集上执行复杂计算的两个子tasks组成。该task的第一部分中,系统需要找到在给定日期期限内产生最多收入的sourceIP。一旦这些中间结果产生之后,系统必须计算出属于该区段的所有被访问页面的平均pageRank。在试验中,我们使用2000年1月15-1月22这个日期区间,在UserVisits表中在该区间的记录大概有134,000条。

 

该task最重要的地方在于它必须访问两个不同的数据集合,同时为了根据pageURL和destURL找到对应的pageRank和UserVisits记录,必须将它们join到一块。该task要求每个系统在大量数据上进行一个相对复杂的处理。同时性能结果也能很好的说明DBMS的查询优化器能否为join产生高效的执行计划。

 

SQL命令:与下面复杂的MR程序相比,DBMS只需要两条相对简单的查询语句完成该任务。第一条语句创建一个临时表来存储执行了UserVisits和Rankings的join以及计算聚合操作的SELECT语句的输出。该表产生之后,就会使用第二个查询来输出具有最大totalRevenue值的那条记录。

SELECT INTO Temp sourceIP,

AVG(pageRank) as avgPageRank,

SUM(adRevenue) as totalRevenue

FROM Rankings AS R, UserVisits AS UV

WHERE R.pageURL = UV.destURL

AND UV.visitDate BETWEEN Date(‘2000-01-15’)

AND Date(‘2000-01-22’)

GROUP BY UV.sourceIP;

SELECT sourceIP, totalRevenue, avgPageRank

FROM Temp

ORDER BY totalRevenue DESC LIMIT 1;

 

MapReduce程序:因为MR模型本身没有对2个或更多数据集合进行join操作的内在支持。因此我们用于实现join task的MR程序必须分成三个独立的阶段。每个阶段都是一个单独的MR程序,只有当上一个阶段结束下一个阶段才会开始执行。

 

阶段1—第一个阶段首先会过滤掉那些在给定日期边界外的UserVisits记录,然后将满足条件的记录与Rankings文件中的记录进行join。MR程序一开始会将所有的UserVisits和Rankings数据文件作为输入。

 

Map函数:对于每个输入key/value对,通过在根据分隔符进行value分割时,计算字段个数来判断记录类型。如果是UserVisits记录,就对它根据日期区间进行过滤。满足条件的记录以如下复合形式的key值进行输出(destURL,K1),在这里K1用来标识该它是一个UserVisits记录。所有的Rankings记录以如下复合形式的key值进行输出(pageURL,K2),在这里K2用来标识该它是一个Rankings记录。这些输出记录使用一个用户提供的只对URL部分进行hash的划分函数进行重新划分。

Reduce函数:Reduce函数的输入是单个URL下的所有记录。对于每个URL,我们将它对应的valuse根据复合key值里的第二部分分为两个集合。该函数之后会计算这两个集合的叉积以完成join操作,然后输出一个以sourceIP为key,以(pageURL,pageRank,adRevenue)为value的新的key/value对。

 

阶段2—根据第一阶段生成的sourceIP上计算adRevenue的和及pageRank平均值。该阶段使用一个Reduce函数来收集单个节点上的特定sourceIP。使用Hadoop API里的identity Map函数来直接将记录提供给split过程。

 

Reduce函数:对于每个sourceIP,该函数会将adRevenue加起来同时计算pageRank的平均值,最后保留具有最大adRevenue和的那个sourceIP。每个Reduce instance会以sourceIP作为key,以一个(avgPageRank,totalRevenue)元组为value作为一条记录输出。

 

阶段3—作为最后一个阶段,我只需要定义一个Reduce函数,该函数使用前一个阶段的输出来产生具有最大totalRevenue值的记录作为输出。我们只需要在一个节点上执行Reduce的一个instance,来扫描来自阶段2的所有记录以找到目标记录。

 

Reduce函数:该函数处理每个输入的key/value对,同时记住具有最大totalRevenue值的记录。因为Hadoop的API无法知道一个Reduce instance要处理的记录数,因此对于Reduce函数来说就无法知道当前的记录是否是最后一条。因此,我们在Reduce实现中重新实现了closing回调函数,通过它在程序退出前输出最大的那条记录。

 

A Comparision of Approaches to Large-Scale Data Analysis(译) - 星星 - 银河里的星星

 结果和讨论:性能结果如图9所示。由于Vertica查询优化器的一个bug,我们稍微修改了下100节点实验中的SQL语言,这也是为什么当节点从50变为100时,Vertica的执行时间增加的原因。但是即使是这样,很明显该task展示了Hadoop和并行数据库系统间最大的性能差异。产生这种情况的原因是两方面的。

首先,尽管查询复杂度提高了,但是Hadoop的性能仍然受限于它能从磁盘中以多大速率将庞大的UserVisits表(20GB/node)读出来。MR程序必须进行一个全表扫描,而并行数据库则可以利用在UserVisits.visitDate上的索引来大大降低所需要读取的数据量。在尝试减少Hadoop查询的各个阶段的开销时,我们发现无论集群节点规模多大,第2阶段和第3阶段平均消耗了24.3秒和12.7秒。与之相比,第一阶段平均化了1434.7秒。有趣的是,它花了接近600秒用于将UserVisits和Ranking表从磁盘读取出来的IO,然后又用300秒来分割,解析和反序列化各种属性。因此,用来进行表的动态解析的CPU开销成为Hadoop的限制因素。

 

其次,并行DBMS可以充分利用UserVisits和Rankings表都是根据join key进行划分的这一事实。这意味着,这两种DBMS系统都可以直接在每个节点上进行本地化的join,而不需要在join之前进行任何网络传输。因此只需要在每个节点上对Rankings表和UserVisits表的一部分进行一个简单的本地hash join,然后再在多个节点上使用一个ORDER BY即可。

4.3.5 UDF Aggregation Task

最后一个task是要计算数据集合中每个文档的inlink数目,该task通常是作为PageRank计算的一部分。特别是,对于该task,系统必须读取每个文档文件,然后找到出现在文档内容里的URLs。然后对于每个URL,系统必须计算出整个文档集合中引用了该URL的网页数。这也是经常会使用MR来进行计算的一种task类型。

 

为了能够更简单地采用Hadoop进行处理,我们对该task进行了两个修改:首先,我们允许将自引用也计算在内,因为对于Map函数来说,得到它正处理的输入文件的名称并不是那么容易。其次,在每个节点上,我们会将HTML文档连接起来组成更大的文件存储到HDFS上。我们发现这样大概可以将Hadoop的性能提高2倍,同时有助于避免大量文件存储在HDFS上后产生的HDFS master的内存问题。

 

SQL命令:为了在并行DBMS中执行该task,需要一个用户自定义函数F来解析每条记录的文档内容同时将文档里的URLs输出到数据库。该函数可以使用通用语言编写,实际上与下面描述的Map程序基本上是一致的。通过使用函数F,我们产生出具有一系列URLs的临时表,然后执行一个简单的查询来计算inlink个数:

SELECT INTO Temp F(contents) FROM Documents;

SELECT url, SUM(value) FROM Temp GROUP BY url;

 

尽管前面提到的UDF并不复杂,但是实际中我们发现很难在DBMS中实现它。

 

对于DBMS-X,我们将Hadoop的MR程序翻译成一个等价地使用POSIX正则表达式来搜索文档中的链接的C程序。对于在文档内容中发现的每个URL,该UDF会返回一个新的元组(URL,1)给数据库引擎。最初我们打算将每个HTML文档作为一个字符BLOB类型字段存储到DBMS-X中,然后完全在数据库内部对每个文档执行该UDF,但是我们发现由于我们当前使用的系统版本的一个bug使得我们无法这样去做。最后,我们修改该UDF使得它直接打开本地磁盘的每个HTML文档,然后对它们进行处理,就好像是存在数据库中一样。尽管这有些类似于下面我们在Vertica中采用的方式,但是DBMS-X UDF不是作为数据库的一个外部进程,同时也不需要任何的批量加载工具来导入解析出来的URLs。

 

Vertica目前并不支持UDF,因此我们不得不分两个阶段实现该benchmark。在第一个阶段,我们使用一个DBMS-X UDF的修改版本来从文件中抽取出URLs,然后将结果写入到每个节点的本地文件系统中。与DBMS-X不同,该程序是作为数据库系统的一个外部独立进程执行的。每个节点之后会使用Vertica的批量加载工具将这些文件内容加载到表中。一旦完成,我们就使用上面描述的SQL语句来为每个URL计算inlink数目。

 

MapReduce程序:为了能够使用MR模型,所有的数据都必须是以key/value对形式定义的,每个HTML文档是按行进行分割的,它会以行内容为value,以它出现在文件中的行号为key传递给Map函数。Map函数之后使用一个正则表达式找到每行里的URLs。对于找到的每个URL,函数直接输出URL和1作为key/value对。Reduce函数只是简单的计算某个给定的key的value数目,然后将URL和计算出的inlink数目作为程序的最终结果输出。

 

结果和讨论:图10的结果表明,DBMS-X和Hadoop(不包括那个额外的数据合并过程)对于该task具有基本类似的性能。因为每个节点都具有相同的需要处理的文档数据,同时随着实验节点的增加该数据量保持不变(7GB)。正如所预料的,伴随着节点数目的增多,Hadoop额外的将数据合并为单个文件的操作在变慢,因为需要处理的数据量在增加。DBMS-X和Vertica在图10中都是用一个两层的柱状图进行表示,底层部分代表了执行UDF/解析及将数据加载到表所花的时间,上层代表执行实际的查询所花的时间。DBMS-X的性能要比Hadoop差,原因是UDF与存储在数据库外部的输入文件之间的交互产生的额外开销。Vertica性能差的原因是,它必须要在DBMS外部进行数据解析,同时在将解析结果加载到系统中之前,还要将它保存到本地磁盘。

5.讨论

我们现在对上面的benchmark结果和每个系统未涉及的一些方面进行一个更深入的讨论。在上面的benchmark中,DBMS-X和Vertica大部分tasks的执行都要比Hadoop快很多。下面的这些小节里,我们对产生这些性能差异的原因进行一个更深入的讨论。

5.1 System-level Aspects

在本节中,我们会描述下系统级的一些架构决定是如何影响到两种系统的性能的。因为安装和配置参数可能会对系统最终性能有很大的影响。因此我们首先讨论下这些系统参数设置之间的相对的简便程度。之后,我们再讨论一些底层实现细节。

5.1.1系统安装,配置和调优

我们没花多少力气就将Hadoop安装好并能运行job了。系统安装时只需要设置每个节点上的数据目录,部署好系统库文件和配置文件。通过试验和一些错误信息就可以进行性能优化方面的配置调整。我们发现某些参数,比如sort buffer的大小或者是副本数目,对执行性能没有影响,然而其他的一些参数,比如使用更大的block size,则显著地提高了性能。

 

DBMS-X的安装过程相对更直接一些。通过一个GUI界面引导用户在集群的某个节点上先进行一个初始性的步骤,之后只需要通过一个安装工具并行地将所需的文件安装到集群的其他节点上就完成了整个安装。尽管这个过程很简单,但是我们发现DBMS-X在第一次运行查询时,需要进行很复杂的配置。起初我们发现那些最基本的操作都不能成功执行,实在是太受打击了。最终,找到了原因,我们发现是因为每个节点的内核配置限制了所能分配的虚拟地址空间总量。当达到该限制时,新进程就无法被创建出来,DBMS-X操作就失败了。尽管这属于我们操作上的失误,但是DBMS-X的系统探查能力以及自动化配置机制没有检测出这种限制,实在是让我们有些吃惊。与我们在Hadoop上的成功部署相比,有些令人失望。

 

当这些问题解决之后,我们的DBMS-X就运行起来了,但是我们的工作又再一次被其他很多内存方面的限制而阻挠。我们发现,它的某些默认参数,比如缓存池和sort heap的大小,对于现代系统来说太保守了。更糟糕的是,DBMS-X本身又不能对这些参数进行有效的自动化的调整。比如系统只是自动地将我们的缓冲池大小从4MB调整到了5MB(我们后来强制修改到了512MB)。在我们将sort heap大小调整到128MB时,它甚至还提出性能可能降低的警告(事实上,性能提高了12倍)。对于某些参数的手动调整又导致系统自动地修改了其他的一些。有时,这种针对DBMS-X的手动和自动化调整导致它下次系统启动时无法启动起来。但是大多数的配置为了进行调整都需要DBMS-X的运行,悲剧的是系统很容易就无法再登入了,但是又没有failsafe mode来恢复到之前的一个状态。

 

Vertica是以RPM的形式安装部署到每个节点上的,相对简单一些。同时该RPM还附带了一个额外的配置脚本,用来构建元数据目录以及修改内核参数。数据库调优工作也很少,通过向资源管理器提供一些hints就完成了。同时默认配置已经可以工作地很好了。这种简化的调优策略也有缺点,比如没有一种显式地机制来控制每个query所占用的资源,也不支持手动进行配置。

 

通过我们的经历,可以看出Hadoop比并行DBMS要更容易安装配置些。当然了,不同的并行数据库产品之间,安装和配置的方便性本身就有很大的差异。对于数据库系统来说,有一个优势,这些调优只需要在查询执行之前进行,同时某些调好的参数对于所有的tasks都是适用的。与此相比,Hadoop不仅需要进行系统参数调优(比如block size),为了在系统上运行地更好还需要对每个独立的task进行调优。最后,并行数据库产品通常都提供了一些工具来辅助调优过程,但是对于Hadoop我们不得不通过试验和错误信息进行调优;当然一个更成熟的MR实现也应该会包含这样的调优工具。

5.1.2 Task启动

我们发现MR程序在所有节点全力运行之前需要经过一段时间。在一个100节点集群上,从job提交给JobTracker到第一个MapTask开始执行,需要花费10秒,到集群的所有节点都运行起来需要25秒。这与[8]的结果(在一个1800节点集群上,大概60秒后才能达到数据处理的峰值速度)是一致的。这种“冷启动(cold start)”是Hadoop实现(似乎Google的也是)的属性,而不是MR模型本身固有的。比如,我们发现Hadoop之前的版本中会为每个节点上的Map和Reduce instance创建新的JVM进程,这增加了大规模数据集上的jobs运行开销;通过打开Hadoop最新版本中的JVM重用特性,MR的执行结果提高了10-15%。

 

与此相比,并行DBMS是在OS boot时就启动的,因此可以认为它总是“warm”的,一直在等待着quey去执行。此外,所有的现代DBMS都被设计成执行时采用多线程和多进程,这就允许当前正在运行的代码可以接受额外的tasks,以及进行进一步地执行期调度的优化。最小化启动时间曾经是DBMS早期进行的优化之一,因此肯定有某些东西不用对底层机构进行大的修改就可以直接被MR系统吸收利用。

5.1.3压缩

几乎所有的DBMS(包括DBMS-X和Vertica)都支持对存储数据进行可选的压缩。而且通常都能节省6-10倍的空间。Vertica的内部数据格式为数据压缩进行了高度优化,同时它的执行引擎可以直接操作压缩的数据(比如,在处理时它会尽量避免解压数据)。另外,在大规模数据集上的分析通常都是IO密集型的,用CPU来换IO带宽(压缩数据意味着需要读取的数据会变少)通常都是一种好的策略,同时可以加速执行。尤其是在执行器可以直接操纵压缩数据的情况下,毫无疑问地应该采用压缩。

 

Hadoop和它的底层分布式文件系统支持对输入数据进行块级别和记录级别的压缩。但是我们发现,这两种技术都无法提高Hadoop的性能,某些情况下甚至降低了执行速度。同时也需要用户花费更多的经历在修改代码或者准备数据上。需要指出的是,原始的MR benchmark[8]中也没有采用压缩。

 

为了在Hadoop中使用块级别的压缩,首先需要在每个节点上将数据文件分割成多个小文件,然后使用gzip工具压缩每个文件。采用这种方式压缩数据大概能够降低数据原始大小的20-25%。这些压缩的文件之后会被拷贝到HDFS,就好像是普通文本文件一样。Hadoop会自动检测文件是否被压缩,然后在它们喂给Map instances时会对他们进行实时的解压,因此为使用压缩数据,我们不需要修改我们的MR程序。除了更长的加载时间外(包含了splitting和压缩之后),使用块级别压缩的Hadoop的大多数tasks也慢了几秒钟,而CPU密集型的tasks甚至慢了50%。

 

我们也尝试使用记录级别的压缩来执行这些benchmarks。这需要我们:1.使用Hadoop API编写一个自己定制的tuple对象;2.修改数据加载程序,以将记录转换为压缩的序列化的自定制的tuple对象;3.重构每个benchmark。起初,我们认为这样做可以改进那些CPU密集型的tasks,因为Map和Reduce不需要再根据分隔符进行字段的分割。但是我们发现,这种方式实际上比块级别压缩还要糟糕,同时数据大小只降低了10%。

5.1.4加载和数据Layout

并行DBMS可以在加载输入数据时,对数据进行重新组织。这就允许进行某些优化,比如单独地存储表的每个属性(像Vertica那样的列式存储方式)。对于那些只涉及到了表中个别字段的只读查询来说,该优化可以通过不去读取那些不需要的属性而提高它们的性能。类似于上面的压缩,这样做也节省了关键的IO带宽。MR系统在将数据加载到HDFS中时,默认不转换数据,因此也就不能改变数据layout,就没法进行上面的这种优化。因此,在运行等价的tasks时,Hadoop通常比并行DBMS吃掉更多的CPU,因为它必须在运行时进行数据解析和反序列化,然而并行数据库是在加载时进行的解析,因此它能够以极低的开销很快地将数据从tuple中抽取出来。

 

但是MR这种简单的加载过程确实也使得它能够比DBMS进行更快更简单地加载。4.2.1和4.3.1的结果表明Hadoop加载时的吞吐量比Vertica快了3倍,比DBMS-X快了几乎20倍。对于那些只需要加载一次的特定分析任务来说,实际上是没有必要花那么多的时间进行索引和数据重组的。同时这也表明,对于DBMS来说,应该允许一种“insitu”模式,使得用户可以直接访问和查询存储在本地文件系统的文件。

5.1.5执行策略

正如前面所说,并行DBMS的查询规划器会尽量避免节点间的数据传输。这就使得系统可以基于数据特点和无中间数据写入的采用推模式的数据流传输方式来优化join算法。MR支持者们也应该去研究下并行DBMS中采用的技术,然后将其中的一些好的概念运用到MR模型中。如果这样做的话,我们相信MR框架的性能会有显著地提升。

 

此外,并行DBMS会在查询开始时就构建一个完整的查询计划给所有的处理节点。由于数据是在必要时才在节点间进行推送的,因此在处理过程中没有控制消息。与之相比,MR系统会使用大量的控制消息来同步处理过程,随着这种开销的增长性能会降低;Vertica也有类似的问题(4.2节)。

5.1.6容错模型

正如前面所讨论的,虽然没有提供事务,但是MR可以以大多数并行数据库系统都不具备的方式,从查询执行中的失败中恢复。随着并行DBMS系统部署在集群上的时间的增长,查询中的硬件失败概率也会上升。因此,对于运行时间长的query来说,实现这样的一个容错模型是非常重要的。尽管提高DBMS的容错性的确是一个好主意,我们也需要对那种原本只需要更少硬件和消耗更少能量或者更少时间就可以解决的却采用了更大规模集群和暴力策略进行计算的情况提高警惕,从而回避对这种复杂容错模型的需要。Google,Microsoft和Yahoo!的数千节点集群消耗着大量的能源,而根据我们的结果,对于很多数据处理任务来说,并行DBMS使用更少的节点通常就能达到相同的性能。同样的,更好的方式应该是使用一个具有适度并行性的高性能算法,而不是在一个更大规模集群上采用一个基于暴力策略的算法。

5.2用户级Aspects

本节我们在用户的角度上讨论这些系统中那些促进或阻碍了应用程序开发的属性。

5.2.1易用性

一旦系统上线并且数据加载完毕,之后程序员就可以开始编写查询语句或者代码来执行他们的task了。与其他的编程工作类似,这通常是一个迭代的过程:程序员写一点代码,测试下,然后再写更多的。在这两种系统中,程序员都可以很容易地判断代码是否在语法上是正确的:MR框架中可以检查用户代码能否通过编译,SQL引擎可以判断查询语句能否正确解析。同时这两个系统都提供了运行时支持来帮助用户debug程序。

 

还需要考虑的是用户编写查询的方式。Hadoop的MR程序主要是用Java编写。与其他语言技术相比,比如SQL,大多数的程序员可能都更熟悉面向对象,命令式编程。但是,很多大学课程里都教授SQL,它也是非常具有移植性的—比如在DBMS-X和Vertica之间,只需要一些细微的修改就可以共享同一份SQL命令。

 

通常看来,我们在Hadoop中让MR程序启动和运行起来要比其他系统更简单。为了进行数据处理,我们不需要构造一个schema或者是注册一个用户自定义函数。然而,当我们得到一些初步结果后,扩展benchmark tasks的数量时,可能需要为现有数据集合添加新的列。为了处理这些新的列,我们不得不修改现有的MR代码,然后重新测试每个MR程序以保证它在新的数据模式下可以正确工作。此外,Hadoop中的某些API在新版本中被废弃之后,也需要我们去修改程序。与之相比,一旦我们写好了最初的基于SQL的应用程序,尽管我们的benchmark schema发生了变化,但是我们根本不需要修改代码。

 

我们认为,对于开发者来说,MR可能很容易上手,但是MR程序的维护却是一个很痛苦的地方。正如我们在3.1节所指出的,只要MR模型中所使用的数据没有一种显式的schema表示,那么在两套部署环境或者两个不同数据集上重用MR代码是很困难的。

5.2.2辅助工具

另一方面,SQL数据库有大量的现成工具来进行数据分析和报告。整个软件工业界已经为DBMS用户开发了大量的第三方扩展。包括各种类型的软件,比如:1.数据可视化;2.商业智能;3.数据挖掘;4.数据备份;5.自动化数据库设计。因为MR技术才刚刚兴起,针对MR的这类软件比较有限;然而,伴随着用户群的增长,很多现有的基于SQL的工具很可能将会支持MR系统。

6.总结

从该论文中的实验结果中,我们可以得出很多有趣的结论。首先,在我们的实验规模上,这两个数据库系统都展示了比Hadoop更好的性能。从100节点规模上的这5个tasks结果上看,DBMS-X平均比Hadoop快了3.2倍,Vertica平均比DBMS-X快了2.3倍。虽然没有验证,但我们觉得在1000节点规模上这些系统应该具有类似的相对性能(最大的Teradata集群少于100节点,但是管理了超过4PB数据)。这些数字说明了两个方面:并行数据库系统使用更少的处理器就能够提供相同的响应时间,同时肯定消耗了更少的能量;在数千节点上的MapReduce模型是一种浪费大量能源的暴力解决方案。尽管,传闻Google的MR版本要比Hadoop版的更快,由于没法访问代码,也没法进行测试。但是,我们认为这两个版本的MR性能,不可能会有本质上的差异,因为MR的query总是要扫描整个输入文件。

 

两个数据库系统的性能优势源自于过去25年所发展的大量技术,包括(1)使用B树索引来加速选择性查询操作(2)新型的存储机制(比如面向列的)(3)高超的压缩技术以及可以直接操作压缩数据的能力(4)进行大规模关系型数据查询的精细并行算法。对于像Vertica这样的列存式数据库来说,只有那些查询必需的列才会从磁盘读出。另外,对于这种列式存储方式的数据可以进行更有效地压缩(Vertica的压缩因子大概是2.0,DBMS-X大概是1.8,Hadoop是1.25),这也降低了执行一个查询所需的磁盘IO量。

 

尽管对于这两个并行数据库系统相对的高性能我们并不感到吃惊,但是与数据库相比Hadoop所具有的易用性却给我们留下了深刻印象。Vertica的安装过程简单但是很难去调整某些系统参数。另一方面,DBMS-X很难配置好,为了提高性能我们需要反复求助于生产厂商。对于一个成熟的产品比如DBMS-X来说,整个的用户体验实在是太令人失望了。鉴于Hadoop前期成本优势,我们终于理解它为何吸引了如此大的用户群。

 

我们认为扩展性(extensibility)是数据库系统所缺乏的另一个方面。使用用户自定义类型和函数对DBMS进行扩展已经是距今25年的一个想法了[16]。但是我们所测试的这两个并行数据库系统,没有一个能很好地支持UDF aggregation task,使得我们不得不采取了一些绕过这些限制(Vertica)和bug(DBMS-X)的workarounds。

 

虽然所有的DB系统都能够容忍大量的软件错误,但是毫无疑问MR可以更好地在硬件失败发生时最小化丢失的工作量。当然,由于需要将Map和Reduce之间的中间结果存储为文件,这种容错能力也带来了性能上的损失。还有一个未解答的问题是,这种性能损失有多严重。不幸地是,要研究这个问题需要在一个更通用的框架中实现materialization和non-materialization两种策略,这超出了这篇论文的内容。尽管很明显这是个优势,但是真正实际中Hadoop的这种容错能力到底有多重要,目前还不太清楚。此外,如果一个需要1000节点的MR系统才能抵得上一个100节点的并行数据库系统的性能,那么查询执行时节点的失败概率可能也会增加10倍。当然了,容错性能越好,对于数据库用户来说当然会越高兴。

 

一开始很多人会认为SQL很难用。部分上是因为人们需要换一种思维方式,同时与Don Chamberlin在1970s的最初设计相比,SQL已经演化成了一个复杂的语言。随着时间的推移,大部分的语言都会复杂化,但是另外由于数据库厂商间的竞争很多产品中都引入了他们自己的专属扩展,这使得SQL的情况要更糟糕。

 

尽管存在这些问题,SQL仍不失为一个强有力的工具。考虑下面一个用来生成按照薪水排序及列出薪水相应的排名(比如最高薪水的雇员的排名是1)的雇员列表的查询语句:

SELECT Emp.name, Emp.salary,

RANK() OVER (ORDER BY Emp.salary)

FROM Employees AS Emp

尽管MR程序也可以并行地执行该排序过程,但是很难将该查询转化为一个具有group by聚合操作的MR程序。而RANK只是由现代并行数据库系统所提供了众多强有力的分析函数的其中一个。比如Teradata和Oracle都支持一组丰富的函数,比如可以在一组排好序的记录窗口上进行各种操作。

 

这两种系统架构上的不同将会长期存在。MR可以看做是一种“schema later”甚至是“schema never”的模式。但是schema的缺乏有很多影响。最重要的是,与在加载时进行解析的DBMS相比,这意味着运行时的记录解析是不可避免的。这种差异,使得压缩在MR中意义甚微,同时导致了两种系统间的性能差异。如果没有schema,每个用户可能都需要写一个解析器,这使得在多个应用程序间的数据共享变得复杂化。其次,schema对于优化所需要的信息是很关键的,比如表是如何划分的,表的基数,每一列值的分布情况。

 

我们认为,这两种系统都有很多值得学习的地方。目前已经出现了一些更高层次的接口,比如建立在MR基础之上的Pig[15],Hive[2],同时,大量的本质上类似但是比MR更具表达力的工具也在开发中,比如Dryad和Scope。这将使得MR风格的系统更加容易编码,意味着我们的benchmark tasks需要更少的代码就可以完成。对于并行数据库来说,我们认为无论商业还是开源的系统都应该改进以更好地支持用户自定义函数。因此,这两种系统的API将会逐步靠拢。目前已经可以看到这样的一些迹象,比如Greenplum和Asterdata已经开始在SQL中集成MR。

7.致谢

The authors would like to thank Lakshmikant Shrinivas in helping to get DBMS-X running, as well as Chris Olston and the reviewers for their insightful comments and feedback. This work was supported in part by NSF Grant CluE – 0844013/0844480.

参考文献

[1] Hadoop. http://hadoop.apache.org/.

[2] Hive. http://hadoop.apache.org/hive/.

[3] Vertica. http://www.vertica.com/.

[4] Y. Amir and J. Stanton. The Spread Wide Area Group Communication System. Technical report, 1998.

[5] R. Chaiken, B. Jenkins, P.-A. Larson, B. Ramsey, D. Shakib,S. Weaver, and J. Zhou. Scope: easy and efficient parallel processing of massive data sets. Proc. VLDB Endow.,1(2):1265–1276, 2008.

[6] Cisco Systems. Cisco Catalyst 3750-E Series Switches Data Sheet, June 2008.

[7] J. Cohen, B. Dolan, M. Dunlap, J. M. Hellerstein, and C. Welton. MAD Skills: New Analysis Practices for Big Data. Under Submission, March 2009.

[8] J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. In OSDI ’04, pages 10–10, 2004.

[9] D. J. DeWitt and R. H. Gerber. Multiprocessor Hash-based Join Algorithms. In VLDB ’85, pages 151–164, 1985.

[10] D. J. DeWitt, R. H. Gerber, G. Graefe, M. L. Heytens, K. B. Kumar, and M. Muralikrishna. GAMMA – A High Performance Dataflow Database Machine. In VLDB ’86, pages 228–237, 1986.

[11] S. Fushimi, M. Kitsuregawa, and H. Tanaka. An Overview of The System Software of A Parallel Relational Database Machine. In VLDB ’86, pages 209–219, 1986.

[12] S. Ghemawat, H. Gobioff, and S.-T. Leung. The Google File System. SIGOPS Oper. Syst. Rev., 37(5):29–43, 2003.

[13] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad:Distributed Data-parallel Programs from Sequential Building Blocks. In EuroSys ’07, pages 59–72, 2007.

[14] E. Meijer, B. Beckman, and G. Bierman. LINQ: reconciling object, relations and XML in the .NET framework. In SIGMOD ’06, pages 706–706, 2006.

[15] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig Latin: A Not-So-Foreign Language for Data Processing. In SIGMOD ’08, pages 1099–1110, 2008.

[16] J. Ong, D. Fogg, and M. Stonebraker. Implementation of data abstraction in the relational database system ingres. SIGMOD Rec., 14(1):1–14, 1983.

[17] D. A. Patterson. Technical Perspective: The Data Center is the Computer. Commun. ACM, 51(1):105–105, 2008.

[18] R. Rustin, editor. ACM-SIGMOD Workshop on Data Description, Access and Control, May 1974.

[19] M. Stonebraker. The Case for Shared Nothing. Database Engineering, 9:4–9, 1986.

[20] M. Stonebraker and J. Hellerstein. What Goes Around Comes Around. In Readings in Database Systems, pages 2–41. The MIT Press, 4th edition, 2005.

[21] D. Thomas, D. Hansson, L. Breedt, M. Clark, J. D. Davidson, J. Gehtland, and A. Schwarz. Agile Web Development with Rails. Pragmatic Bookshelf, 2006.

You Might Also Like