分布式系统

MapReduce Hold不住?(zz)

2011年10月17日 阅读(382)

from:http://www.programmer.com.cn/8606/

文/杨栋

本文系统地介绍和分析比较了业界主流的Yahoo! S4、StreamBase和Borealis三种流式计算系统,希望读者能从这些系统的设计中领悟到不同场景下流式计算所要解决的关键问题。

背景

非实时计算几乎都基于MapReduce计算框架,但MapReduce并不是万能的。对于搜索应用环境中的某些现实问题,MapReduce并不能很好地解决问题。

商用搜索引擎,像Google、Bing和Yahoo!等,通常在用户查询响应中提供结构化的Web结果,同时也插入基于流量的点击付费模式的文本广告。为了在页面上最佳位置展现最相关的广告,通过一些算法来动态估算给定上下文中一个广告被点击的可能性。上下文可能包括用户偏好、地理位置、历史查询、历史点击等信息。一个主搜索引擎可能每秒钟处理成千上万次查询,每个页面都可能会包含多个广告。为了及时处理用户反馈,需要一个低延迟、可扩展、高可靠的处理引擎。然而,对于这些实时性要求很高的应用,尽管MapReduce作了实时性改进,但仍很难稳定地满足应用需求。因为Hadoop为批处理作了高度优化,MapReduce系统典型地通过调度批量任务来操作静态数据;而流式计算的典型范式之一是不确定数据速率的事件流流入系统,系统处理能力必须与事件流量匹配,或者通过近似算法等方法优雅降级,通常称为负载分流(load-shedding)。当然,除了负载分流,流式计算的容错处理等机制也和批处理计算不尽相同。

最近Facebook在Sigmod 11上发表了利用HBase/Hadoop进行实时数据处理的论文,通过一些实时性改造,让批处理计算平台也具备实时计算的能力。这类基于MapReduce进行流式处理的方案有三个主要缺点。

将输入数据分隔成固定大小的片段,再由MapReduce平台处理,缺点在于处理延迟与数据片段的长度、初始化处理任务的开销成正比。小的分段会降低延迟,增加附加开销,并且分段之间的依赖管理更加复杂(例如一个分段可能会需要前一个分段的信息);反之,大的分段会增加延迟。最优的分段大小取决于具体应用。
为了支持流式处理,MapReduce需要被改造成Pipeline的模式,而不是Reduce直接输出;考虑到效率,中间结果最好只保存在内存中等。这些改动使得原有的MapReduce框架的复杂度大大增加,不利于系统的维护和扩展。
用户被迫使用MapReduce的接口来定义流式作业,这使得用户程序的可伸缩性降低。

综上所述,流式处理的模式决定了要和批处理使用非常不同的架构,试图搭建一个既适合流式计算又适合批处理计算的通用平台,结果可能会是一个高度复杂的系统,并且最终系统可能对两种计算都不理想。

目前流式计算是业界研究的一个热点,最近Twitter、LinkedIn等公司相继开源了流式计算系统Storm、Kafka等,加上Yahoo!之前开源的S4,流式计算研究在互联网领域持续升温。不过流式计算并非最近几年才开始研究,传统行业像金融领域等很早就已经在使用流式计算系统,比较知名的有StreamBase、Borealis等。

本文简单介绍几种业界使用的流式计算系统,希望流式系统的设计者或开发者们能从中获得启示。

图1 数据分析系统整体组成示意图

图1 数据分析系统整体组成示意图

 

图1从整个分析系统的架构角度,给出了实时计算子系统所处的位置。实时计算系统和批处理计算系统同属于计算这个大的范畴,批处理计算可以是MapReduce、MPI、SCOPE等,实时计算可以是S4、Storm等,批处理和实时都可以或不依赖统一的资源调度系统。另外,计算系统的输入、输出,包括中间过程的输入、输出,都与存储系统交互,可以是块存储系统HDFS,也可以是K-V存储系统Hypertable等。计算层的上层是数据仓库,或者直接和用户交互,交互方式可以是SQL-like或者MR-like等。

系统

S4

S4是一个通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统。基于S4框架,开发者可以轻松开发面向持续流数据处理的应用。

S4的设计特点有以下几个方面。

Actor Model

为了能在普通机型构成的集群上进行分布式处理,并且集群内部不使用共享内存,S4架构采用了Actor模式,这种模式提供了封装和地址透明语义,因此在允许应用大规模并发的同时,也提供了简单的编程接口。S4系统通过处理单元(Processing Elements,PEs)进行计算,消息在处理单元间以数据事件的形式传送,PE消费事件,发出一个或多个可能被其他PE处理的事件,或者直接发布结果。每个PE的状态对于其他PE不可见,PE之间唯一的交互模式就是发出事件和消费事件。框架提供了路由事件到合适的PE和创建新PE实例的功能。S4的设计模式符合封装和地址透明的特性。

Decentralized and Symmetric Architecture

除了遵循Actor模式,S4也参照了MapReduce模式。为了简化部署和运维,从而达到更好地稳定性和扩展性,S4采用了对等架构,集群中的所有处理节点都是等同的,没有中心控制。这种架构将使得集群的扩展性很好,处理节点的总数理论上无上限;同时,S4将没有单点容错的问题。
Pluggable Architecture
S4系统使用Java开发,采用了极富层次的模块化编程,每个通用功能点都尽量抽象出来作为通用模块,而且尽可能让各模块实现可定制化。

Partial Fault-Tolerance

基于Zookeeper服务的集群管理层将会自动路由事件从失效节点到其他节点。除非显式保存到持久性存储,否则节点故障时,节点上处理事件的状态会丢失。

Object Oriented

节点间通信采用“Plain Old Java Objects”(POJOs)模式,应用开发者不需要写Schemas 或用哈希表来在节点间发送Tuples。

S4的功能组件分3大类,Clients、Adapters和PNode Cluster,图2显示了S4系统框架。

图2 Yahoo! S4流式系统框架结构图

图2 Yahoo! S4流式系统框架结构图

 

S4提供Client Adapter,允许第三方客户端向S4集群发送事件和接收事件。Adapter实现了基于JSON的API,支持多语言实现的客户端驱动。

Client通过Driver组件与Adapter进行交互,Adapter也是一个Cluster,其中有多个Adapter结点,Client可以通过多个Driver与多个Adapter进行通信,这样可以保证单个Client在分发大数据量时Adapter不会成为瓶颈,也可以确保系统支持多个Client应用并发执行的快速、高效和可靠性。

在Adapter中,真正与Client交互的是其Stub组件,该组件实现了管理Client与Adapter之间通过TCP/IP协议进行通信的功能。GenericJsonClientStub这个类支持将事件在Client与Adapter之间以JSON的形式转换,从而支持更多种类型的Client应用。不同的Client可以配置不同的Stub来与Adapter进行通信,用户可以定义自己的Stub来实现自己想要的业务逻辑,这样也使得Client的行为更加多样性、个性化。

StreamBase

StreamBase是IBM开发的一款商业流式计算系统,在金融行业和政府部门使用,其本身是商业应用软件,但提供了Develop Edition。相对于付费使用的Enterprise Edition,前者的功能更少,但这并不妨碍我们从外部使用和API接口来对StreamBase本身进行分析。

StreamBase使用Java开发,IDE是基于Eclipse进行二次开发,功能非常强大。StreamBase也提供了相当多的Operator、Functor以及其他组件来帮助构建应用程序。用户只需要通过IDE拖拉控件,然后关联一下,设置好传输的Schema并且设置一下控件计算过程,就可以编译出一个高效处理的流式应用程序了。同时,StreamBase还提供了类SQL语言来描述计算过程。

StreamBase的组件交互情况如图3所示。

图3 StreamBase组件交互图

图3 StreamBase组件交互图

 

StreamBase Server是节点上启动的管理进程,它负责管理节点上Container的实例,每个Container通过Adapter获得输入,交给应用逻辑进行计算,然后通过Adapter进行输出。各个Container相互连接,形成一个计算流图。

Adapter负责与异构输入或输出交互,源或目的地可能包括CSV文件、JDBC、JMS、Simulation(StreamBase提供的流产生模拟器)或用户定制。
每个StreamBase Server上面都会存在一个Sytsem Container,主要是产生系统监控信息的流式数据。

HA Container用于容错恢复,可以看出它实际包含两个部分:Heartbeat和HA Events,其中HeartBeat也是Tuple在Container之间传输。在HA方案下,HA Container监控Primary Server的活动情况,然后将这些信息转换成为HA Events交给StreamBase Monitor来处理。

Monitor就是从System Container和HA Container中获取数据并且进行处理。StreamBase认为HA 问题应该通过CEP方式处理,也就是说如果哪个部件出现问题,就肯定会反映在System Container和HA Container的输出流上面,然后 Monitor通过复杂事件处理这些Tuples的话就能够检测到机器故障等问题,并作出相应处理。

StreamBase提出了以下4种模板策略来解决容错问题。

Hot-Hot Server Pair Template

Primary Server和Secondary Server都在同时计算,并且将计算结果交给下游。优点是Primary Server如果故障的话那么Secondary Server依然工作,几乎没有任何切换时间;并且下游只需要选取先到来的Tuple就可以处理了,保证处理速度最快;缺点是浪费计算和网络资源。

Hot-Warm Server Pair Template

Primary Server和Secondary Server都在同时计算,但只有Primary Server将计算结果交给下游。优点是如果Primary Server故障,Secondary Server可以很快切换,而不需要任何恢复状态的工作。相对于Hot-Hot方式时间稍微长一些,但没有Hot-Hot那么耗费网络资源,同时也浪费了计算资源。

Shared Disk Template

Primary Server在计算之后,将计算的一些中间关键状态存储到磁盘、SAN(Storage Area Network)或是可靠的存储介质。如果Srimary Server故障,Secondary Server会从介质中读取出关键状态,然后接着继续计算。优点是没有浪费任何计算和网路资源,但恢复时间依赖状态的量级而定,相对于前两种,恢复时间可能会稍长。

Fast Restart Template

这种方案限定了应用场景,只针对无状态的应用。对于无状态的情况,方案可以非常简单,只要发现Primary Server故障,Secondary Server立即启动,并接着上游的数据流继续计算即可。

Borealis

Borealis是Brandeis University、Brown University和MIT合作开发的一个分布式流式系统,由之前的流式系统Aurora、Medusa演化而来。目前Borealis系统已经停止维护,最新的Release版本停止在2008年。

Borealis具有丰富的论文、完整的用户/开发者文档,系统是C++实现的,运行于x86-based Linux平台。系统是开源的,同时使用了较多的第三方开源组件,包括用于查询语言翻译的ANTLR、C++的网络编程框架库NMSTL等。

Borealis系统的流式模型和其他流式系统基本一致:接受多元的数据流和输出,为了容错,采用确定性计算,对于容错性要求高的系统,会对输入流使用算子进行定序。

Borealis的系统架构如图4所示。

图4 Borealis框架示意图

Query Processor(QP)是计算执行的地方,是系统的核心部件,其大部分功能继承自Aurora。
I/O Queues将数据流导入QP,路由Tuples到其他节点或客户端程序。
Admin模块用来控制本地的QP,例如建立查询、迁移数据流图片段,该模块也会同Local Optimizer协作优化现有数据流图。
Local Optimizer职责包括本地调度策略、调整Operator行为、超载后丢弃低价值元组等。
Storage Manager模块用于存储本地计算的状态数据。
Local Catalog存储本地数据流图和元数据,可以被本地所有组件访问。
Borealis Node还有彼此通信的模块用于执行协作任务。
Neighborhood Optimizer使用本地和邻居节点来优化节点间的负载均衡或shed load。
High Availability (HA)模块相互监测,发现对方故障时及时代替对方。
Local Monitor收集本地性能相关统计数字报告给本地和Neighborhood Optimizer。
Global Catalog为整个数据流计算提供了一个逻辑上的完整视图。

除作为基本功能节点外,Borealis Server也可以被设计成一个协作节点来执行全局的系统监控和其他优化任务,比如全局的负载分布和Global Load Shedding,因此Borealis实际上提供了完整的3级监控和优化(Local、Neighborhood、Global)。

负载均衡方面,Borealis提供了动态和静态两种部署机制。

Correlation-based Operator Distribution

通过分析不同Operators和Nodes间的负载变化的关系,决定和动态调整Operatpr的部署,使之达到负载均衡。

Resilient Operator Distribution Algorithm

该算法的目标是提供一种静态的Operator部署方案,该方案能够在不需要重新调整的情况下处理最大可能的输入速度变化范围。

由于动态调整需要时间和消耗,前者适用于负载变化持续时间较长的系统;而后者则能处理较快较短的负载峰值。在实现上前者使用相关系数作为节点关联度指标,并通过贪婪算法将NP问题转化为多项式求解;而后者在部署前计算完毕,保证系统能够容忍负载峰值。该算法在线性代数上建模,包括Operator Ordering、Operator Assignment两个阶段。

Borealis通过四种容错机制来满足用户需求。

Amnesia Backup

备机发现主机故障,立即从一个空的状态开始重做。

Passive Standby

主机处理,备机待命,主机按周期做Checkpoint,主机故障后切换到备机,重放Checkpoint和数据流,对于不确定性计算可以很好地支持,缺点是恢复时间较长。

Active Standby

主备机同时计算,主机故障时直接切换到备机,不支持不确定性计算,浪费计算资源,不过恢复时间几乎没有。

Upstream Backup

通过上游备份来容错,故障时从上游重放数据即可,恢复时间最长,不过最节省资源。

除此之外,Borealis还提供了更高级的容错机制Rollback Recovery,它是一种基于副本在节点失效、网络失效或网络分区时的故障恢复机制,在尽量减少系统不一致的情况下,尽可能地保证系统的可用性。该机制允许用户定义一个阈值来在一致性和可用性之间做一个平衡。当系统数据恢复后,系统支持重新计算输出正确的结果,保证最终一致性。该机制使用了Data-serializing Operator(SUnion)来确保所有的副本处理同样顺序的数据。当失效恢复后,通过Checkpoint/Redo、Undo/Redo来实现恢复重放。

对比

表1就上述3个流式系统做个分类比较,比较项基于DEBS2011会议上IFPSurvey中涉及的各种Models。Processing Model描述流元组进行计算时的选择策略、消费策略及负载降级处理。Interaction Model描述输入组件和计算系统、计算系统内部及计算系统和输出组件的交互方式。Time Model描述事件流是否按照时间约束。Rules Model描述流式计算规则是显示还是隐式。Data Model描述流中的数据组成、格式等。Function Model描述流式计算系统的功能模型。Language Model描述语言层面的各种算子。

表1 3种流式系统的模型对比

表1 3种流式系统的模型对比

 

小结
本文介绍了业界主流的3个流式计算系统,希望从这些系统的设计中领悟到不同场景下流式计算所要解决的关键问题。
Yahoo! S4的最新版本是Alpha version v0.3.0,动态负载均衡和在线服务迁移等重要功能都尚未实现,不过其代表性的3个特点值得学习,Actor模式、非中心化的对称结构及可插入式的架构。

StreamBase是有着功能强大的IDE并且支持控件式的方法来搭建应用程序,同时还提供了高级语言来搭建应用程序的方法。由于是商业产品,其用户接口的精彩设计值得借鉴,同时其可组合的HA方案也是亮点之一。
Borealis是学术界研究的重要产出,它对新一代的流式系统涉及的诸多方面,如系数据模型、负载管理、高可用性、可扩展性都作了全面和翔实的研究,一方面系统变得强大、先进,另一方面使得系统也变得臃肿、复杂。这套系统的许多策略都值得我们学习,可以应用于不同的流式计算场景。

作者杨栋,百度分布式高级研发工程师,从事Hypertable、Hadoop及流式计算的研究和开发。

本文选自《程序员》杂志2011年10期,更多精彩内容敬请关注10期杂志

《程序员》杂志订阅火热进行中

You Might Also Like