本文基本上是对现有的图计算框架论文或文档的一个综述。由于附图较多,未上传至相册,此处有pdf版本:http://pan.baidu.com/s/1uvjHW。
转载请注明作者:phylips@bmy
出处:http://duanple.blog.163.com/blog/static/7097176720123215230365/
Pregel 是Google继MapReduce之后提出的又一个计算模型,与Mapreduce的离线批处理模式不同,它主要用于图的计算。该模型的核心思想源于Leslie Valiant于上世纪80年代提出的BSP计算模型。图计算会涉及到在相同数据上的不断更新以及大量的消息传递,如果采用MapReduce去实现,会产生大量不必要的序列化和反序列化开销。在传统的高性能计算领域通常会借助MPI来完成,但是MPI本身只是提供了一系列的通信接口,开发难度较高,同时对于Google由普通PC构建的集群来说,MPI的容错性不够。而现有的一些图计算系统也并不适应Google的场景。
Pregel计算有一系列的超级步组成,在每个超级步内,框架会调用每个顶点的用户自定义函数,该函数定义了在该超级步内需要进行的计算,该函数可以读入前一个超级步发送给它的消息,并产生下一轮迭代的消息给其他顶点,同时会修改顶点及其边的状态。
关键词:节点分配,消息的异步和批量传输,checkpoint,confined recovery,combiner,Aggregators,拓扑可变,Reader&Writer。
Master负责:划分partiton,并分给worker,通知用户加载输入,执行超级步
Worker负责:
一个或多个partition,为每个vertex调用compute(),传送上一个超级步的消息,
一个partition一个线程,当前超级步的消息异步传输,但会在该超级步结束前完毕,
每步结束后,worker要告诉master,并告诉master还有多少vertex是active的,只要有active的节点或有消息在传输就会继续。
容错有两种方式:checkpoint,在超级步开始时由master通知各节点将相关信息序列化。Master也会阶段性地保存aggregator的值。
BSP计算模型:
1 )本地计算阶段,每个处理器只对存储本地内存中的数据进行本地计算。
2 )全局通信阶段,对任何非本地数据进行操作。
3 )栅栏同步阶段,等待所有通信行为的结束。
Compute函数是主计算过程,在每个超级步中,都会被调用。在该函数中可以通过GetValue和MutableValue()对Value进行访问和更新。同时也可以Get或Set以该顶点为起点的边的值。所以一个计算状态可以通过顶点,顶点值,边,边值,接收的消息来定义,因此checkpoint时需要保存这些值。在所有节点均投票halt并且没有消息在传递时,迭代结束。
Pregel论文
Apache Hama支持本地,伪分布式,分布式模式。Hama本身与Hadoop十分类似,只是具有自己的通信和同步机制,利用Hadoop RPC进行节点间通信,借助于ZooKeeper进行同步,通过对消息进行收集和捆绑发送来降低网络开销和竞争。
Hama由BSPMaster,GroomServer组成,依赖于ZooKeeper,HDFS/HBase。可用于矩阵运算,图计算。
BSPMaster负责如下工作:
? 维护着Groom服务器的状态,维护counters信息。
? 控制在集群环境中的superstep。
? 维护在groom中job的工作状态信息。
? 分配任务、调度任务到所有的groom服务器节点。
? 广播所有的groom服务器执行。
? 管理系统节点中的失效转发。
? 提供用户对集群环境的管理界面。
首先通过脚本启动BSPMaster和GroomServer。然后BSPMaster会作为GroomServer的RPC server,GroomServer则与一个BSPPeer实例启动。之后,BSPPeer会与GroomServer集成。启动之后,每个GroomServer会周期性地发送一个包含了GroomServer本身状态的心跳信息,比如当前task容量,剩余内存等。BSPMaster会利用这些状态信息来调度task到空闲GroomServer上。
GroomServer负责启动由BSPMaster安排的bsp tasks。它会周期性的向BSPMaster报告task的状态。每个GroomServer会与HDFS或者其他分布式存储系统协同工作。通常GroomServer与DataNode都是直接运行在一个物理节点上。
ZooKeeper则用来管理BSPPeers的同步,未来它也会被用于容错机制。
@Override
public void bsp(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
throws IOException, SyncException, InterruptedException {
int in = 0, out = 0;
for (int i = 0; i < iterations; i++) {
double x = 2.0 * Math.random() – 1.0, y = 2.0 * Math.random() – 1.0;
if ((Math.sqrt(x * x + y * y) < 1.0)) {
in++;
} else {
out++;
}
}
double data = 4.0 * (double) in / (double) iterations;
DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
peer.send(masterTask, estimate);
peer.sync();
}
@Override
public void setup(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
throws IOException {
// Choose one as a master
this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
}
public void cleanup(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
throws IOException {
if (peer.getPeerName().equals(masterTask)) {
double pi = 0.0;
int numPeers = peer.getNumCurrentMessages();
DoubleMessage received;
while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) {
pi += received.getData();
}
pi = pi / numPeers;
peer.write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
}
}
}
只需传递所必需数据,如何利用locality?
Introduction-of-Apache-Hama-2011
HAMA: An Efficient Matrix Computation with the MapReduce Framework
GoldenOrb的关键构建块由Vertex,VertexBuilder, VertexWriter和OrbRunner构成。其中Vertex由ID,value及边组成,代表图中的一个节点。VertexBuilder负责Vertex的构建,通过buildVertex函数创建Vertex对象。VertexWriter则用来为Vertex对象创建相关的OrbContext对象。OrbRunner则用于启动job,主要负责解析命令行参数,初始化,连接ZooKeeper以及设置好OrbConfiguration 。
对于Vertex来说,需要实现compute函数。对于VertexBuilder来说,需要实现buildVertex函数。对于VertexWriter来说,需要实现vertexWrite函数。可以看到这些接口与Pregel的接口是十分类似的。
Vertex
Trinity是微软开发的一套图计算平台,包含一个建立在分布式内存云平台上的图数据库及一个计算框架。通过一个纯内存的key-value存储数据库实现快速访问。
Trinity的一个基本存储单元称为一个cell,每个cell通过一个全局唯一的id标识,该id是一个64-bit整数,支持用户通过这个id进行随机访问。从底层key-value的存储角度来看,key就是cell-id,value是一个任意长度的字符串。
Slave:存储图数据,处理来自于其他Slaves,Aggregators和clients的消息。
Aggregator:处理来自于Aggregator和Client的消息。一方面它会负责聚合来自slave或client的消息,另一方面,在聚合完来自slave的结果后,它也会返回查询应答。
Client:终端用户接口。
可以通过Global.CloudStorage来访问全局图数据,通过Global.LocalStorage访问本地图数据。
通过如下接口进行同步:
通常用户需要实现一个Slave,Aggregator或Client的实例。主要任务在于定义好消息处理过程,包括消息的发送,接受,及消息本身的定义。
Spark主要用来解决Mapreduce所不擅长的两类计算:迭代计算和交互式分析。核心在于将数据存在内存,避免重复的load。采用scala语言实现,提供了类似于DryadLINQ的函数式编程接口。Spark为并行编程主要提供了两个抽象:RDD和并行操作,此外它还提供了两种类型的共享变量支持:广播变量和累加器。
在RDD上的并行操作有:reduce,collect和foreach。通过RDD实现了自动容错,基于locality的调度。容错通常有两种方式checkpoint和logging update,RDD采用的是logging update。RDD之间存在血缘关系,包含了该RDD是由哪些RDD经过何种操作得到的信息,这样当某一个RDD出现问题时,可以由其他RDD恢复出来。
Spark可以用于实现机器学习算法,类MapReduce,Pregel,Haloop的计算。RDD适用于那些针对大量数据进行相同操作的场景,这样RDD不需要保存太多信息就能记录所有的状态。但是并不适合那些对于数据进行异步更新的场景,比如网络爬虫,索引构建等。
Spark的核心在于RDD(Resilient Distributed DataSet)。
RDD之间的依赖可以划分为窄依赖和宽依赖,对于窄依赖来说,一个RDD可能只依赖于另一个RDD,但是对于一个宽依赖来说,一个RDD可能依赖所有的RDD。RDD之间的血缘关系实际形成了一个DAG,调度会根据这个DAG结合locality进行。RDD虽然可以通过血缘关系进行重构,但是如果关系链太长,重构的代价会很高,因此也会通过checkpoint来做容错,不过目前Spark的实现只是提供了一个api,何时调用由用户决定。
Spark的实现大概有10000行scala代码,支持以HDFS,Hbase作为输入源。从性能上看,Spark比Hadoop提高了数10倍,可以在一个1TB的数据集上提供5-7秒的查询延迟。Spark的RDD提供了更通用的抽象,使得它可以支持迭代式MapReduce,类Pregel计算,以及其他的一些应用场景比如交互式数据挖掘。此外还支持自动容错,长尾消除,基于locality的调度。
Dpark是豆瓣刚开源的集群计算框架,是Spark的python clone版本,类似于MapReduce,但是比其更灵活,可以用Python非常方便地进行分布式计算,并且提供了更多的功能以便更好的进行迭代式计算。DPark的计算模型是基于两个中心思想的:对分布式数据集的并行计算以及一些有限的可以在计算过程中、从不同机器访问的共享变量类型。DPark具有的一个很重要的特性:分布式的数据集可以在多个不同的并行循环当中被重复利用。这个特性将其与其他数据流形式的框架例如Hadoop和Dryad区分开来。
与Spark的区别是:Spark中使用一个线程运行一个任务,但是DPark受python中GIL的影响,选择使用一个进程来运行一个任务。Spark支持Hadoop的文件系统接口,Dpark只支持posix文件接口。
关于DPark的如下相关介绍均来自DPark文档。
RDD的设计理念是在保留例如MapReduce等数据流模型的框架的优点的同时(自动容错、本地优化分配(locality-aware scheduling)和可拓展性),使得用户可以明确地将一部分数据集缓存在内存中,以大大加速对这部分数据之后的查询和计算过程。
RDD可以被认为是提供了一种高度限制(只读、只能由别的RDD变换而来)的共享内存,但是这些限制可以使得自动容错的开支变得很低。RDD使用了一种称之为“血统”的容错机制,即每一个RDD都包含关于它是如何从其他RDD变换过来的以及如何重建某一块数据的信息,这个在后面会进行一下详细的介绍。
在RDD之前,也有一些模型被创造出来解决数据流模型中存在的缺点,例如Google的Pregel(迭代图计算框架)、Twister和HaLoop(迭代MapReduce框架)等,但是这些系统只能被应用于一些非常有限和特殊的场景当中。而RDD提供了一种更为通用的迭代并行计算框架,使得用户能够显式的控制计算的中间结果,然后将其自由的运用于之后的计算中。
RDD的发明者已经在Spark的基础上实现了Pregel(100行Scala代码)以及迭代MapReduce框架(200行Scala代码),并在那些其他框架都无法很好适用的一些场景之中取得了一些很好的成果,例如交互式的大数据查询等。一些实际的实践和研究表明,Spark在那些迭代式计算中比Hadoop快20倍,能够在5-7s的时间内交互式的查询1TB的数据(比Hadoop快40倍)。
6.2.1.1 RDD的容错机制
通常而言,有两种实现分布式数据集容错机制的方法:数据检查点或者记录更新。就RDD设计的使用场景(大数据计算分析)而言,设置数据的检查点的花费会很高,因为它意味着在各个机器之间复制大的数据集,然而考虑到网络带宽相比机器内的带宽要小很多,这种拷贝操作会变得相当缓慢,而且这种拷贝会消耗大量的存储资源。
因此我们选择使用后一种机制,即记录更新。然而,记录所有更新的成本也会很高,所以RDD仅支持粗颗粒度变换(coarse-grained transformation),即仅记录在单个块上执行的单个操作,然后创建某个RDD的变换序列存储下来,当数据丢失时,我们可以用变换序列(血统)来重新计算,恢复丢失的数据,以达到容错的目的。
当然,当变换序列变得很长的时候,重新计算以恢复丢失的数据所需的时间会变得很长,所以我们建议用户在RDD的变换序列变得很长的时候,建立一些数据检查点以加快容错的速度。DPark目前并没有提供自动判断是否需要设置数据检查点的功能,但是我们考虑在未来的升级版本中加入这一功能以实现更快更好的容错机制,目前,用户可以通过saveAsTextFile方法来手动设置数据检查点。
6.2.1.2 RDD内部的设计
在DPark中,为了使得RDD能够在未来支持更多的变换而且不需要每次都去改变任务调度机制,同时为了“血统”容错机制的实现,我们为RDD设计了几个共通的接口。简单来说就是,每个RDD都需要包含
- 源数据分割之后的数据块,源代码中的de style="line-height: 22px;" >splitsde>变量
- 关于“血统”的信息,即这个RDD所依赖的父亲RDD以及两者之间的关系,源代码中的de style="line-height: 22px;" >dependenciesde>变量
- 一个计算函数,即这个RDD如何通过父亲RDD计算得到,源代码中的de style="line-height: 22px;" >iterator(split)de>和de style="line-height: 22px;" >computede>函数
- 一些关于如何分块和数据所放位置的元信息,例如源代码中的de style="line-height: 22px;" >partitionerde>和de style="line-height: 22px;" >preferredLocationsde>
举例来说,一个从分布式文件系统中的文件得到的RDD具有的数据块是通过分割各个文件之后得到的,而它没有父RDD,它的计算函数只是读取文件中的每一行并作为一个元素返回给子RDD,而对于一个通过map函数得到的RDD,它会具有和父RDD相同的数据块,它的计算函数是对每个父RDD中的元素调用了一个函数。
在上述的四个接口中,如何表达父亲和儿子之间的依赖关系是Spark以及DPark在实现的时候考虑最多的问题,最终我们发现可以将依赖关系分为两类:窄依赖和宽依赖。所谓的窄依赖是说子RDD中的每一个数据块只依赖于父RDD中的对应的有限个固定的数据块,当然这个依赖的数量要和父RDD总的数据块规模相差比较大;而宽依赖就是指子RDD中的一个数据块可以依赖于父RDD中的所有数据块。举例来说,对于map变换来说,子RDD中的数据块就只依赖于父RDD中对应的一个数据块,也就是说它是个窄依赖;而对于groupByKey变换来说,子RDD中的数据块会依赖于所有父RDD中的数据块,因为一个key可能存在于父RDD中的任何一个数据块中。
这样的分类方式有两个很重要的特性,也是这两个特性要求了我们对于这两种不同的依赖需要采用不同的任务调度机制和容错恢复机制。第一,窄依赖意味着可以在某一个计算节点上直接通过父RDD的某几块数据(通常是一块)计算得到子RDD某一块的数据;而相对的,宽依赖意味着子RDD某一块数据的计算必须等到它的父RDD所有数据都计算完成之后才可以进行,而且需要对父RDD的计算结果进行hash并传递到对应的节点之上。第二,当某一个计算节点出错的时候,窄依赖的错误恢复会比宽依赖的错误恢复要快很多,因为对于窄依赖来说,只有丢失的那一块数据需要被重新计算,而宽依赖意味着所有的祖先RDD中所有的数据块都需要被重新计算一遍,这也是我们建议在长“血统”链条特别是有宽依赖的时候,需要在适当的时机设置一个数据检查点以避免过长的容错恢复。
任务调度机制尝试利用RDD的特性来为所有的操作找到一种最有效的执行策略,任务调度器有一个runJob的接口提供给RDD使用,它接受的参数包括RDD本身,感兴趣的数据块部分以及应用于数据块之上的函数。当RDD需要执行一个操作(count, collect, saveAsTextFile, etc)的时候,就会调用runJob函数来在集群之上进行计算。
总体上来说,DPark的任务调度器和Dryad的比较相似,但是DPark的调度器在调度任务时会将RDD的哪个部分被缓存在哪些机器上这个因素考虑在内。首先,调度器会根据最终RDD的血统序列来创建出一些阶段(stage),每个阶段会包含尽可能多的可以被连续运行的变换,即基于窄依赖的变换,一个stage的边界是那些需要在节点之间移动数据的宽依赖变换,或者是那些已经被缓存了的RDD。下图表明了一个整体计算的Stage分割,只有当父阶段完成之后,我们才会启动子阶段的计算,并为每一个数据块的计算分配一个任务,当然没有父子关系的阶段可以被同时运行,就像例子中的阶段一和阶段二。
其次,调度器会根据数据本地化原则来分配任务到对应的节点,以尽可能减少通讯成本。也就是说,如果一个任务需要处理一个已经被缓存的数据块,那调度器就会将这个任务分配到具有这个缓存数据块的节点上进行计算;否则,一个任务会被分配到它所需要处理的数据块的RDD希望它被分配到的节点上去(通过RDD的preferredLocations函数)。
最后,如果某一个任务运行失败了,如果它所在的阶段的父阶段数据没有被丢失,那么调度器会直接将其在另一个节点上重新运行,如果父阶段已经不可用了,那么我们会重新提交父阶段中需要被重新计算的任务。具体请参见schedule.runJob中submitStage和submitMissingStage函数。
DPark中除RDD之外还有一个很重要的概念就是共享变量,目前实现的共享变量有两种类型:广播变量以及累加器。在此简单介绍一下这两种共享变量的实现,以使得开发者能够更好地使用共享变量并根据自己的需要支持一些别的共享变量类型。
6.2.3.1 广播变量
在DPark中,当用户编写的程序调用了RDD的map, filter等变换的时候,会传递给这些变换一个函数,DPark会在实际运行的时候将这个函数所需要的闭包给序列化后也传递到相应的计算节点中,那么就可以在函数中使用函数所在域的变量、函数等。当时如果有一个大的只读数据集需要在闭包中被使用,每一次序列化然后传递到相应节点后反序列化的成本会变得很高,所以DPark提供了广播变量特性来使得用户可以将其一次性传递到所有的节点上,在之后的闭包传递中传递对应的广播后的变量。目前实现的广播算法有分布式文件系统广播算法以及树型广播算法。
对于分布式文件系统广播算法(broadcast.FileBroadcast),当用户为变量v创建一个广播变量bv的时候,DPark会为其创建一个唯一的广播ID,并将变量v序列化后存储到分布式文件系统的一个文件中,当需要传递闭包的时候,DPark会将bv而不是v本身序列化后传递过去,而广播变量bv的序列化结果就是这个唯一的广播ID,在用户通过bv.value使用v本身的时候,DPark首先会检查v是否在本地的缓存中,如果不在,会将对应的文件反序列化并提供给计算节点使用。
对于树型广播算法(broadcast.TreeBroadcast),当创建广播变量bv的时候,DPark也会为其创建一个唯一的广播ID,但是不同于分布式文件系统广播算法,DPark并不会将v序列化后存储到文件中,会利用zeromq在master上绑定一个端口,在worker中需要读取真实值的时候,如果本地缓存没有的话,那么将会向master去请求数据,而当一个worker已经获得数据之后,它就可以参与传播,把数据传给其他的worker,每个进程可以传播给N个其他进程,所以是传播的过程是一颗N叉树。
关于两种算法之间的比较:通常而言,树型广播算法会比分布式文件系统广播算法要快一些,原因主要有:树型算法中只有内存访问和网络IO,而文件系统算法中还需要进行磁盘操作,并且其网络IO也会需要经过更多层次;同时树型算法中,整体数据会被分割为一些块,所以即使一个节点还没有获得所有的数据,它还是可以参与到广播之中,将已获得的数据传递给下一个节点,广播的时间会相对缩短一些。
在Spark中目前还支持了P2P的广播算法,DPark在未来的升级版本中也会继续加入更好的广播算法。
6.2.3.2 累加器
累加器在创建的时候,需要提供一个accumulator.AccumulatorParam的对象作为参数,这个类当中有两个值代表累加器的0值以及加法的定义,例如数值型累加器的0值就是0,然后加法就是普通的数值加法;而对于列表型累加器来说,0值就是[],加法则是extend函数。当累加器被创建的时候,会为其分配一个唯一的累加器ID,而当其随着任务被传递到计算节点的时候,会在节点中创建出一个值为0的副本,在该任务中对其进行的任何操作实际上都只是在对这个副本进行操作,对主程序中的累加器并没有影响,只有当任务结束返回的时候,DPark的调度器会将该计算节点中的这个副本和主程序中的累加器进行合并,在所有任务完成之后,才能得到正确的值。
由此实现可以得知,在计算过程中获得的累加器的值实际上只是当前任务的值而不是所有任务的值的和,而且只有当RDD的操作被调用的时候,之前在变换中希望执行的关于累加器的操作才会被真正执行,例如:
acc = ctx.accumulator(0)
def sum(x):
x = int(x)
acc.add(x)
return x
rdd.map(sum)
print acc.value
rdd.count()
print acc.value
在上面这个程序中,第一个输出将会是0,而第二个输出才会是正确的值。
有一堆文件,每一行的格式为word,uid,统计每个uid对应出现次数最多的10个word