分布式系统

Storm原理与实现

2013年9月25日 阅读(521)

作者:phylips@bmy 2013-02

1         Storm简介
1.1      简介

本文主要是从内部实现的角度来认识下Storm(0.7.1版本),因此需要用户对Storm的基本原理和使用具有一定的了解。如果缺乏这方面的知识,建议首先阅读下Storm的官方wiki:https://github.com/nathanmarz/storm/wiki

 

目前也有一些中文文章,大部分都未超出官方wiki所包含的内容。推荐几个还不错的链接:

http://xumingming.sinaapp.com/ 这里有一些官方wiki的中文翻译以及一些实现分析

http://blog.linezing.com/category/storm-quick-start 关于storm的一个入门教程

1.2      核心API
1.2.1     普通Topology

如果建立自己的Topology(非Transactional的),用户通常需要利用如下接口和对象:

IRichBolt

IRichSpout

TopologyBuilder

public interface ISpout extends Serializable {

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

void close();

void activate();

void deactivate();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId);

}

public interface IBolt extends Serializable {

       void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

       void execute(Tuple input);

       void cleanup();

}

IRichBolt和IRichSpout与IBolt和ISpout的不同在于多了两个接口:

declareOutputFields(OutputFieldsDeclarer declarer):声明输出字段

getComponentConfiguration() :该接口是在0.7.0引入的,用于支持组件级的配置,即允许用户针对单个Spout或Bolt进行参数配置。

 

实现了这两个接口后,通过调用TopologyBuilder建立起Topology。TopologyBuilder实际上是封装了StormTopology的thrift接口,也就是说Topology实际上是通过thrift定义的一个struct,TopologyBuilder将这个对象建立起来,然后nimbus实际上会运行一个thrift服务器,用于接收用户提交的结构。由于是采用thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。

 

对于用户来说,通常需要做的就是提供自己的ISpout和IBlot实现,然后利用TopologyBuilder建立起自己需要的拓扑结构。

 

Storm框架会拿到用户提供这个拓扑结构及Spout和Blot对象,驱动整个处理过程。简单介绍下ISpout的那些接口的调用时机,在创建Spout对象时,会调用open函数。对象销毁时调用close(),但是框架并不保证close函数一定会被调用,因为进程可能是通过kill -9被杀死的。activate和deactivate是在spout被activate或deactivate时被调用,这两个动作是由用户从外部触发的,Strom的命令行提供两个命令activate和deactivate,允许用户activate和deactivate一个Topology,当用户执行deactivate时,对应Topology的spout会被deactivate,产生影响就是spout的nextTuple此后将不会被调用,直到用户再调用activate。Spout的核心功能是通过nextTuple实现的,用户通过该函数完成Tuple的发射。该函数会被框架周期性的调用。会有类似如下的一个循环:

While(true)

{

    if(…)

spout.activate();

if(…)

sput.deactivate();

if(…)

    spout.nextTupe();

}

 

首先这三个函数都是在一个线程中,因此不需要同步。其次,nextTuple()不能阻塞,如果没有Tuple可以发射需要立即返回,用户不能提供一个阻塞式的实现,否则可能阻塞整个后台循环。另外,后台可能会调节nextTuple()的调用频率,比如系统有一个配置参数可以控制当前被pending的Tuple最大数目,如果达到这个限制,可能就会做一些流控。

 

ack和fail则是两个回调函数。Spout在发射出一个tuple后,该tuple会通过acking机制被acker追踪,除了显式的fail和ack外,每个tuple有一个超时时间,如果超过这个时间还未确定该tuple的状态,那么acker会通知spout,这个tuple处理失败了,然后框架得到这个消息后,就会调用spout的fail函数,如果acker发现这个tuple处理成功了,也会通知spout,然后会调用spout的ack函数。所以通常来说用户在发射tuple时,要确保数据不丢失,都会将已经发射的tuple缓存起来,然后在ack函数中删除对应tuple,在fail函数中重发对应的tuple。

 

另外需要注意的一点是,Spout使用的collector是SpoutOutputCollector,Bolt使用的collector是OutputCollector。这两个虽然提供的功能类似,都是负责发送tuple的,但是由于一个是面向Spout,一个是面向Bolt的,它们的接口也略有不同。具体如下:

public interface ISpoutOutputCollector {

       List<Integer> emit(String streamId, List<Object> tuple, Object messageId);

       void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);

void reportError(Throwable error);

}

Spout通过调用ISpoutOutputCollector的emit函数进行tuple的发射,当然实际上emit函数并未完成实际的发送,它主要是根据用户提供的streamId,计算出该tuple需要发送到的目标taskID。emitDirect函数,更裸一些,直接指定目标taskID。它们都只是将<tasked,tuple>组成的序列对放到一个队列中,然后会有另一个线程负责将tuple从队列中取出并发送到目标task。

 

public interface IOutputCollector extends IErrorReporter {

List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

void ack(Tuple input);

void fail(Tuple input);

}

IOutputCollector是会被Bolt调用的,与ISpoutOutputCollector功能类似。但是区别也很明显,首先我们可以看到它的emit系列函数,多了一个参数Collection<Tuple> anchors,增加这样一个anchors原因在于,对于spout来说,它产生的tuple就是root tuple,但是对于bolt来说,它是通过一个或多个输入tuple,进而产生输出tuple的,这样tuple之间是有一个父子关系的,anchors就是用于指定当前要emit的这个tuple的所有父亲,正是通过它,才建立起tuple树,如果用户给了一个空的anchors,那么这个要emit的tuple将不会被加入tuple树,也就不会被追踪,即使后面它丢失了,也不会被spout感知。

 

除了anchors参数外,IOutputCollector还多了ack和fail两个接口。这两个接口,与Spout的ack和fail完全不同,对于Spout来说ack和fail是提供给Spout在tuple发送成功或失败时进行处理的一个机会。而IOutputCollector的ack和fail则是向acker汇报当前tuple的处理状态的,是需要Bolt在处理完tuple后主动调用的。

1.2.2     Transactional Topology

对于普通Topology来说,它通过acking机制保证了每个Tuple会至少被处理一次,保证了Tuple不会丢失,但是一个Tuple可能会因为重发而被处理多次。引入Transactional Topology就是为了解决重复处理的问题。同时它暴露给用户的API,ITransactionalSpout与普通的Spout相比有很大的差异,而Bolt则基本保持了一致。

 

对于Transactional Topology,用户需要提供一个ITransactionalSpout(4.5.1 ITransactionalSpout)实现,对于batch类型的Bolt需要继承自IBatchBolt(4.4.1 IBatchBolt),那些会改变外部状态的关键Bolt需要实现ICommiter接口。用户需要通过专用的TransactionalTopologyBuilder而不是TopologyBuilder来建立Topology。

 

另外需要注意的一点是Storm已经将TransactionalTopology相关的功能移植到了trident中,而src/jvm/backtype/storm/transactional下的实现实际上会被废弃掉,尽管如此我们下面的分析还是针对src/jvm/backtype/storm/transactional下的实现。二个地方的实现基本上是完全一致的,当然trident可能做了一些改进,比如它暴露出了更丰富的API,允许用户对事务进行更多的控制。

2         Clojure基础

Storm是由两种语言实现的,基本上50%的java,50%的Clojure。框架性的东西基本上都是采用Clojure实现的,因此要真正理解Storm,Clojure是绕不过去的。

2.1      简介

Clojure是一种可以运行在JVM上的函数式编程语言,在 CLR 和 JavaScript 平台上也有各自的实现。属于一种Lisp方言(LISP,全名List Processor,即列表处理语言,由约翰·麦卡锡在1960年左右创造的一种基于λ演算的函数式编程语言)。作为一种函数式编程语言,Clojure基于JVM,可以直接使用现有的java类库,通过SMT(software transactional memory )和异步agent提供了内建的并发支持。

官方网站:http://clojure.org/

 

根据getting_started 的步骤,我们可以在本地建立起一个clojure的执行环境。另外这个网站也提供了在线的脚本执行支持:http://himera.herokuapp.com/index.html 。

 

由于此处主要是为了分析Storm的Clojure源码,因此这里主要关注下Storm里所用到的Clojure语言特性。更深入的内容可以参考:

官方文档:http://clojure.org/documentation 。

中文文档:Clojure API文档Clojure入门教程: Clojure – Functional Programming for the JVM中文版Clojure Handbook

 

2.2      基本语法

2.3      与java的交互

3         代码结构

Storm Structure-of-the-codebase

storm 源码详细介绍

 

4         源码分析

关于实现方面的文章,Storm的官方wiki有一个目录列表,但是基本上只是一个提纲,很多内容还未来得及编写:

Storm Implementation-docs

4.1      Topology生命周期

Lifecycle-of-a-topology

Twitter Storm源代码分析之Topology的执行过程

4.2      消息传输机制

Storm Message-passing-implementation

4.3      Acking机制

Storm acker实现机制剖析

Storm对用户提供的Topology会在内部进行修改,添加一些系统内部的流和Bolt来实现acking框架,实现代码参见:common.clj

4.3.1     Q&A4.3.1.1  MessageID是何作用?与tuple-id的关系? 
4.4      Coordination实现

关于Coordination的基本原理可以参考如下文章:

Twitter Storm源代码分析之CoordinatedBolt

相关代码在如下目录:

src/jvm/backtype/storm/coordination/

4.4.1     IBatchBolt

首先我们来看下IBatchBolt接口,关于该接口的定义是在目录src/jvm/backtype/storm/coordination下。之所以并未放到transactional/下,是因为IBatchBolt是一个通用接口,并非专门为Transaction设计,比如在DRPC里面也会用到它。

public interface IBatchBolt<T> extends Serializable, IComponent {

    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);

    void execute(Tuple tuple);

    void finishBatch();

}

与IBolt相比,有如下不同:

Prepare函数多了一个id参数,多了一个finishBatch接口,collector 类型是BatchOutputCollector。在Transactional Topologies里,这个id就是一个TransactionAttempt对象。

4.4.2     BatchBoltExecutor

每个BatchBolt会被放到BatchBoltExecutor中执行。BatchBoltExecutor实现了IRichBolt, FinishedCallback, TimeoutCallback接口。它的作用在于:它会hold一系列IBatchBolt对象,每个Batch都是通过一个独立的IBatchBolt对象进行处理,这样只要简单的将该对象销毁就可以清除某个Batch的所相关的所有中间状态。

 

它的execute函数很简单,拿到input tuple,从tuple中拿出batch id(对于Transactional Topology来说就是TransactionAttempt),然后找到该batch id对应的IBatchBolt对象,然后调用该对象的execute函数。

 

其中比较重要的一个接口void finishedId(Object id),可以看到它的实现,实际上就是拿到id所对应的那个IBatchBolt对象,然后执行它的finishBatch()函数。finishedId本身是个回调函数,只有当当前batch处理完毕时它才会被调用,具体见CoordinatedBolt的实现。

 

一个比较重要的问题,BatchBoltExecutor是如何确定某个IBatchBolt对象可以清除了的?在finishedId和timeoutId被调用的时候,都会将IBatchBolt对象从BatchBoltExecutor中清除。而这两个函数的调用都是由CoordinatedBolt来决定的。finishedId被调用,对于普通Bolt来说是因为当前Bolt已经确定收到了属于该batch的所有tuple,对于commiter Bolt来说是它收到了当前batch所有tuple并且收到了commit命令。timeoutId被调用是因为到了消息允许的最大延迟。

4.4.3     CoordinatedBolt

CoordinatedBolt内部会记住所有向它发送Tuple边的入度,保存在_numSourceReports中,以及它会发送tuple的那些目标task,保存在_countOutTasks中。

 

对于由CoordinatedBolt处理的Tuple,要求它的第一个field必须是id,CoordinatedBolt内部维护了一个id->TrackingInfo的TimeCacheMap,用于追踪每个id所对应的那些Tuple的处理状态。context.maxTopologyMessageTimeout用来控制超时时间,如果超过这个时间该id的状态还未确定(还未被failed也未成功完成),那么就会回调BatchBoltExecutor的timeoutId函数,将该id对应的IBatchBolt对象从BatchBoltExecutor中删除。

 

TrackingInfo包含如下一些信息:

    public static class TrackingInfo {

        int reportCount = 0;

        int expectedTupleCount = 0;

        int receivedTuples = 0;

        boolean failed = false;

        Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();

        boolean receivedId = false;

        boolean finished = false;

        List<Tuple> ackTuples = new ArrayList<Tuple>();

}

 

CoordinatedOutputCollector在每次emit tuple时都会拿到tuple中的id,然后更新它对应的TrackingInfo,将taskEmittedTuples对应task的输出tuple数加1。在ack(tuple)时,将receivedTuples加1,并调用checkFinishId。在fail(tuple)时将failed置为true,并调用checkFinishId。

 

checkFinishId主要用来判断当前id是否处理完毕。在receivedId为true的情况下,如果(当前节点的_sourceArgs为空)或者(reportCount == _numSourceReports& expectedTupleCount ==receivedTuples,即收了所有源task的Tuple数目报告,并且收到的Tuple数等于报告的Tuple数)的情况下,就认为该id处理完毕,此时会调用传给CoordinatedBolt的real bolt的finishedId(id)这个回调函数。在该回调函数执行完毕后,才会通过COORDINATED_STREAM向它的下游task发送它向下游各个task发射的tuple数目这个消息。可见这样会产生一个顺序关系,如果上游的finishedId没有执行,下游task的finishedId也是无法执行的,这对于Transactional Topology中具有依赖关系的commiters的提交顺序的控制具有重要意义。

 

对于Transactional Topology来说,id就是TransactionAttempt对象,Emitter就是那个_sourceArgs为空的节点,因此当它ack了Coordinator后,就会认为这个Batch已经处理完毕,因为Coordinator也是通过CoordinatedBolt执行的,因此它会通过COORDINATED_STREAM向它的下游task发送消息。那么对于Coordinator来说,事务何时会进入PROCESSED状态呢?是Emiter ack(input)之后呢?还是所有的bolt处理完毕呢?查看TransactionalSpoutBatchExecutor实现可以看到,在void execute(Tuple input)函数中,并未建立input到后续Tuple的父子关系,而调用Emitter的emitBatch,只是将它作为事务ID传入。表面上看并没有建立与输入Tuple的父子关系,那么新的Tuple应该不会被追踪,但是实际上Emitter是被放入CoordinatedBolt中执行的,而追踪状态的Tuple树实际上会通过CoordinatedBolt建立。同时实现中,对于batch中的Tuple的追踪进行了优化,就是说它不会追踪batch中的每个tuple,而只是在original tuple与COORD tuple之间建立tuple树,也就是说通过追踪COORD tuple达到追踪对应batch中的所有tuple的目的。具体参见:CoordinatedBolt.java:250,具体来说就是通过_collector.emitDirect建立了这个tuple树,从而将Coordinator发出的original tuple与COORD tuple关联起来。

 

这里面,还需要注意的一点是,我们看到对于CoordinatedBolt来说Tuple分为三种类型:

    static enum TupleType {

        REGULAR,

        ID,

        COORD

    }

其中REGULAR代表普通的Tuple数据,COORD代表用于task发送Tuple的数目信息,那么ID是用来做什么的呢?通过CoordinatedBolt我们可以得到如下信息,首先应该有一个流专门用于发送TupleType.ID的,但是并不是所有的Blot都会有这个流,只有具有IdStreamSpec的Blot才会有。如果Blot没有这个流,那么receivedId就直接赋为true。要理解这个流的作用,需要跳出CoordinatedBolt,它实际上是为Transactional Topology使用的,查看TransactionalTopologyBuilder.java,可以看到在buildTopologyBuilder中,如果检测到该组件是一个committer,那么就会为它添加一个IdStreamSpec。这样就使得那些committer在调用finishedId时要判断receivedId是否为true,这样就保证了commiter bolt的finishBatch只有在收到了TRANSACTION_COMMIT_STREAM上发送过来的提交命令才会执行。

 

此外,我们观察到TrackingInfo里有一个List<Tuple> ackTuples成员,那么这个成员又是来做啥的呢?ackTuples里保存了两种Tuple:ID和COORD类型的。

if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID){

track.ackTuples.add(tup);

delayed = true;

}

首先之所以需要增加一个ackTuples是为了保存那些无法立即被ack的Tuple。那么为何有些tuple无法被立即ack呢?比如对于(idStreamSpec!=null && type==TupleType.ID)的情况来说,是用于事务提交的,那么当用户收到这个消息后,可能是无法立即提交的,比如用户提交的Topology中有两个commiter,同时这两个commiter间存在依赖关系,事务提交命令是由Coordinator采用all grouping的方式广播发送的,这样这两个commiter基本上会同时收到这个提交命令。但是后面的那个commiter在前面的commiter提交之前,它是无法提交的。因为前面那个commiter只有调用了finishedId后,才会向后面那个commiter发送COORD消息,而后面这个如果没有收到COORD是无法进入提交状态的。这样对于后面这个commiter来说,它就需要将收到的ID类型的Tuple缓存起来,待可以提交时,再将其ack。那么(_idStreamSpec==null && type == TupleType.COORD)又代表了哪种情况呢?这代表了普通的CoordinatedBolt应用场景,CoordinatedBolt为Blot提供了用于确定某个ID是否处理完毕的一种机制,原理就是上级task确定自己处理完该ID后,再通知下级task它发送给它的针对该ID的tuple数。而下级task在收到COORD消息后,可能还未完全收到上级发送过来的数据,因此也是不能立即ack这个COORD类型的Tuple的,需要等到收到所有上级task发送过来的tuple时再往下级task发送这个COORD类型的Tuple。

 

再仔细观察上面的条件,这里对于_idStreamSpec!=null&& type == TupleType.COORD的情况并未加入到ackTuples中,这是为何?如果不加入ackTuples,那种这种情况下接收到的COORD类型的Tuple是何时被ack的?这种情况会直接将COORD消息ack掉,同时将它需要发送给下游的COORD消息延迟到ID消息到达后,因为它是个commiter会接收ID消息,同时需要保证在ID这个消息未到达时,后续的Blot也不能调用finishBatch,因此需要借助CoordinatedBolt完成这种控制,在ID消息到达并完成提交后,再通知后续task它向它们发送的Tuple数,这样就可以让后续task等待着它。同时对于这种情况,我们看到它并未建立一个tuple树,而是直接ack了 这个COORD类型的Tuple,而将tuple树的追踪延迟到了commit阶段,也就是说在Transactional Topology中,在processing阶段,当该batch的所有Tuple到达了commiter,那么就可以ack了,不需要再继续往下追踪,就认为processing阶段结束,然后可以进入commit阶段了。到了commit阶段,acking机制也会建立起一个tuple树,ID类型tuple与发送给下游的COORD消息也会关联起来。相当于将processing阶段未做的追踪延迟到了commit阶段再继续追踪下去。

 

checkFinishId的调用时机?首先对于不同类型的流来说,它具有不同的调用时机。对于ID和COORD类型来说,直接在void execute(Tuple tuple)中进行调用,对于REGULAR类型,它是在ack和fail函数中调用的。这样对于TransactionalTopology中的Emitter来说,由于Coordination发给它的Tuple是REGULAR类型的,这就保证了是在ack(input)完成之后,才去调用checkFinishId向下级task发送COORD类型消息。

4.4.4     BatchOutputCollectorImpl
4.4.5     Q&A4.4.5.1  Coordination中并无Spout,那么ack(COORD type tuple)是如何被追踪的?如果某个COORD消息丢失,是如何触发重发的?

COORD消息实际上会与Coordinator发出的那个原始Tuple进行关联,建立起一个tuple树。首先Emitter是放到CoordinatedBolt中运行的,而在CoordinatedBolt中COORD消息是通过如下函数发送的:

_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));

而对于Emitter来说,tup就是Coordinator发出的那个原始Tuple,tuple树就是这样建立起来的。

 

如果说某个COORD消息丢失,那么acking机制最终会通知Coordinator,Coordinator会重新发射那个原始Tuple,这会导致对应batch的replay,同时这个replay也是从源头全部重新开始的。

4.5      Transactional Topology实现

相关代码主要在如下两个目录:

src/jvm/backtype/storm/transactional;src/jvm/backtype/storm/coordination/。

4.5.1     ITransactionalSpout

public interface ITransactionalSpout<T> extends IComponent {

public interface Coordinator<X> {

X initializeTransaction(BigInteger txid, X prevMetadata);

boolean isReady();

void close();

       }

public interface Emitter<X> {

  void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);

void cleanupBefore(BigInteger txid);

void close();

       }

Coordinator<T> getCoordinator(Map conf, TopologyContext context);

Emitter<T> getEmitter(Map conf, TopologyContext context);

}

 

ITransactionalSpout只是一个包装,核心的类是Coordinator和Emitter。Coordinator是协调者,Coordinator会负责协调事务的产生和提交,它会向Emitter发送消息(消息内容就是一个TransactionAttempt),告诉它产生一个Batch,Emitter是真正的Batch数据产生者,由于事务可能执行 了一半而失败,Coordinator可能会向Emitter发送具有相同TransactionID的tuple,Emitter需要自己保证对于相同TransactionID的batch产生相同的Tuple序列。

4.5.2     TransactionalSpoutCoordinator

TransactionalSpoutCoordinator继承自BaseRichSpout,充当了Coordinator的运行容器。TransactionalSpoutCoordinator的主要作用就是作为协调者,协调整个Transactional Topology的执行,实际运行中,只会有一个task实例作为TransactionalSpoutCoordinator,它是一个中央控制节点,由它负责事务ID的生成和管理事务的提交。

 

通过上面代码可以看到,Coordinator有三个接口:

initializeTransaction:生成本次事务对应的元数据

isReady:是否可以启动新的事务

close:释放Coordinator占用的资源

 

实际运行中Coordinator是放到TransactionalSpoutCoordinator执行的,通过在ISpout的接口实现中调用用户提供的Coordinator实现的initializeTransaction,isReady,close接口,驱动Coordinator的执行。

 

TransactionalSpoutCoordinator会在内存中维护一个所有当前活动Transaction的<TransactionId, TransactionStatus>的Map,TransactionStatus由TransactionAttempt和AttemptStatus组成。AttemptStatus有三种:

PROCESSING,Transaction的初始化状态

PROCESSED, Emitter及其他Bolt处理完该TransactionAttempt对应的batch之后,利用acking机制对此进行追踪,在TransactionalSpoutCoordinator的ack函数中会将状态从PROCESSING –>PROCESSED

COMMITTING,_currTransaction事务进入PROCESSED后会进入COMMITTING状态,然后Coordinator会往TRANSACTION_COMMIT_STREAM上发送针对该Transaction的commit命令。当再次收到针对该事务的ack后,就认为该事务提交成功。

 

用户可以通过配置Config.TOPOLOGY_MAX_SPOUT_PENDING来控制处于活动状态的事务数,默认是1。TransactionalSpoutCoordinator保证当前处于COMMITTING状态的Transaction只会有一个,即_currTransaction,但是可以有多个活动的Transaction处于PROCESSING或PROCESSED状态。TransactionalSpoutCoordinator会周期性检查_currTransaction的状态,如果发现它状态变成了PROCESSED,那么就会发送针对该事务的提交命令。

 

TransactionalSpoutCoordinator会发送两种命令流:TRANSACTION_BATCH_STREAM和TRANSACTION_COMMIT_STREAM。TRANSACTION_BATCH_STREAM上会发送如下信息(当前事务的TransactionAttempt值,当前事务的meta值,已经完成提交的事务的TransactionAttempt值),主要是驱动Emitter输出一个batch,TRANSACTION_COMMIT_STREAM上只发送需要提交的事务的TransactionAttempt值,用于通知所有的commiters提交该事务。

 

这两个消息流是如何在Topology中传输的?对于TRANSACTION_BATCH_STREAM它只出现在Coordinator和Emitter之间。而TRANSACTION_COMMIT_STREAM则取决于后面的Bolt组件是否实现了ICommitter接口或者是否是通过setCommiterBolt添加的,如果是那么TransactionalTopologyBuilder会在Coordinator和它们之间建立一个TRANSACTION_COMMIT_STREAM连接。此外,如果ITransactionalSpout没有实现ICommitterTransactionalSpout接口,那么Coordinator和Emitter之间也不会有TRANSACTION_COMMIT_STREAM的存在。

4.5.3     TransactionalSpoutBatchExecutor

TransactionalSpoutBatchExecutor继承自IRichBolt,充当了Emitter的运行容器。

 

实际运行中Emitter是放到TransactionalSpoutBatchExecutor执行的,因此该类主要是通过自己实现的Blot接口调用用户提供的Emiter实现。比如在void execute(Tuple input)中,进行Emitter的emitBatch调用。

 

TransactionalSpoutBatchExecutor接收的Tuple都是从Coordinator发出的控制命令流。也就是说要么是来自TRANSACTION_BATCH_STREAM,要么是来自TRANSACTION_COMMIT_STREAM。可以看到在它的execute函数实现中,有如下逻辑:

如果是来自TRANSACTION_COMMIT_STREAM,那么它会查看_activeTransactions是否含有该Transaction,如果有就调用_emitter.commit(TransactionAttempt),并将该事务从_activeTransactions中删除,然后ack(input),如果_activeTransactions找不到该事务,就fail掉它。如果是来自TRANSACTION_BATCH_STREAM,就调用_emitter.emitBatch,并将其放入_activeTransactions,然后进行ack。如果中间抛出任何异常就调用fail。

 

此处有个疑问:如果用户给的ITransactionalSpout并未实现ICommitterTransactionalSpout接口,那么_activeTransactions是何时将事务从其中删除的呢?看起来它是根据TRANSACTION_BATCH_STREAM上传递过来的“已经完成提交的事务的TransactionAttempt值”进行清空的。

 

TransactionalSpoutBatchExecutor用的collector也是BatchOutputCollectorImpl。关于该类请参考coordination实现部分。

4.5.4     TransactionalTopologyBuilder

TransactionalTopologyBuilder实际上是对TopologyBuilder的简单封装。之所以提供这一层,是因为系统对用户提交的Topology结构进行了一个改变,在用户看来只是提供了一个ITransactionalSpout,但是实际执行时,该ITransactionalSpout却是要变成一个Spout 线程(Coordinator)和多个Bolt(Emitter)组成的复合结构。TransactionalTopologyBuilder会将用户提供的ITransactionalSpout中的Coordinator放到TransactionalSpoutCoordinator中,作为单独的一个Spout。将ITransactionalSpout中的Emitter放到TransactionalSpoutBatchExecutor中,再在外面套上一层CoordinatedBolt,作为一个单独的Bolt。其他的Blot也会被放到CoordinatedBolt中执行。

 

除此之外系统还会在各个组件之间添加一些额外的消息控制流,比如:在Coordinator和Emitter之间添加一个TRANSACTION_BATCH_STREA,在那些作为commiter的Blot和Coordinator之间添加一个TRANSACTION_COMMIT_STREAM,这样Coordinator就可以向那些committer发送事务的commit命令消息。在CoordinatedBolt之间添加一个COORDINATED_STREAM。

4.5.5     Transaction状态存储

代码目录src/jvm/backtype/storm/transactional/state/。

与Transaction相关的状态是通过Zookeeper保存的,具体代码在TransactionalState.java中。在该类中直接使用Curator进行Zookeeper相关操作,

 

Curator是Netflix开源的一套ZooKeeper客户端框架。Curator主要解决了三类问题: 
封装ZooKeeper client与ZooKeeper server之间的连接处理;

提供了一套Fluent风格的操作API;

提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装。

 

数据在zookeeper上的根存储路径由配置项Config.TRANSACTIONAL_ZOOKEEPER_ROOT确定,默认为:"/transactional"。比如coordinator 的transaction数据存储路径为:transactional_root/id/coordinator/。emitter数据存储路径为:transactional_root /id/user/。id就是用户在创建TransactionalTopology时,传递给TransactionalTopologyBuilder的第一个构造参数,即用来标识该TransactionalTopology的ID。

 

RotatingTransactionalState对TransactionalState进行封装,可以自动删除那些已经提交了的Transaction对应的状态。它维护了一系列Transaction的状态,内部采用一个TreeMap保存这些状态,如果用户创建状态时指定strictOrder=true,那么插入时它都会判断transactionID是否是严格递增的(如果是第一个,那么它的ID应该等于TransactionalSpoutCoordinator.INIT_TXID,如果不是第一个,那么它在map中的前一个transactionID要比它小1,并且没有比它大的transactionID),如果不是严格递增的,那么会抛出异常。

4.5.6     PartitionedTransactionalSpout实现

代码目录src/jvm/backtype/storm/transactional/partitioned/。

 

有两个类:IPartitionedTransactionalSpout和PartitionedTransactionalSpoutExecutor。其中IPartitionedTransactionalSpout是用户需要直接继承的接口,它抽象出了partition的概念,但是该接口直接继承自IComponent,但是根据要求TransactionalTopologyBuilder的接口要求,它需要接收ITransactionalSpout。所以就有了PartitionedTransactionalSpoutExecutor,它实现了ITransactionalSpout接口,这个类实际上是对IPartitionedTransactionalSpout进行了一次适配,通过调用用户提供的IpartitionedTransactionalSpout实现来实现ITransactionalSpout所需要的接口语义。

 

对于PartitionedTransactionalSpout来说,Coordinator针对每个Transaction需要记住的元数据就是当时partition的个数,Emiter  task内部会有一个partitionID到对应的RotatingTransactionalState的map,而RotatingTransactionalState实际上内部又管理了一系列Transaction对应的状态。Emiter  在emitBatch()中会根据task总数,自己的task index和partition个数算出哪些partition是属于当前task的,对于每个属于它的partition,它会查找该partition对应的RotatingTransactionalState{!strictOrder=false},然后根据拿出当前Transaction的id查看,如果该Transaction对应的meta为NULL{!有两种情况会导致它为NULL,历史上就没有被创建过,已经有比它晚的Transaction出现在RotatingTransactionalState中了。根据RotatingTransactionalState实现,虽然它不是strictOrder 的,如果说它本身不存在并且已经存在比它id大的Transaction,getState也会返回NULL},如果它是一个全新的Transaction 则会调用emitPartitionBatchNew,否则如果它已经不存在并且已经存在比它id大的transaction则直接skip,否则调用emitPartitionBatch。

 

TransactionAttempt实际由TransactionId和AttemptId组成,上面我们只看到了TransactionId所起的作用,但是并未看到AttemptId是何时被修改以及它所起的作用。关于AttemptId的作用,请参考:4.5.10.1。

4.5.7     MemoryTransactionalSpout

4.5.8     Acking机制

对于TransactionalTopology中的消息的acking,都由Storm系统内部完成了,用户不需要像在普通的Topology中那样,需要显式地对tuple进行ack。除了将acking自动化,还进行了高度的优化。

 

在这里acking机制主要用于跟踪两个事情,一个是transaction的processing过程,一个是transaction的commiting过程。对于processing过程,是要追踪所有tuple都被处理了。首先Coordinator发射的tuple被认为是root tuple,然后该tuple实际上会引出对应batch中的一系列tuple,但是在追踪时并没有追踪batch里的所有tuple,而是进行了优化,追踪的是CoordinatedBolt发射的COORD类型的Tuple,因为task之间的消息是保序的,这就保证了COORD类型的Tuple如果被成功处理的话,那么对应batch的所有tuple也应该是已经被成功处理完了,因为Bolt只有在发送完batch内的所有tuple后才会发送COORD类型的Tuple。在processing阶段,如果碰到commiter Bolt,那么就不再继续往下追踪了,即tuple树会在commiter Bolt处截止,当整个tuple树ack完成,就认为processing阶段成功完成。

 

Commiting阶段,第一级Tuple是由Coordinator广播给各个commiter Bolt的,之后tuple树会沿着processing阶段截止的地方继续扩展,继续往下发送COORD信息。整个tuple树ack完毕,那么Commiting阶段就完成了。也就是说在processing阶段,碰到commiter Bolt,它会卡在那等待提交消息,因此它需要及时的截断processing的tuple树,以让事务可以进入PROCESSED状态。到了提交消息来了,它才继续往下级task发送COORD信息,因此下级task也会一直被卡住。

 

4.5.9     总结

Here’s how transactional spout works:

1.     1.Transactional spout is a subtopology consisting of a coordinator spout and an emitter bolt

2.     2.The coordinator is a regular spout with a parallelism of 1

3.     3.The emitter is a bolt with a parallelism of P, connected to the coordinator’s "batch" stream using an all grouping

4.     4.When the coordinator determines it’s time to enter the processing phase for a transaction, it emits a tuple containing the TransactionAttempt and the metadata for that transaction to the "batch" stream

5.     5.Because of the all grouping, every single emitter task receives the notification that it’s time to emit its portion of the tuples for that transaction attempt

6.     6.Storm automatically manages the anchoring/acking necessary throughout the whole topology to determine when a transaction has completed the processing phase. The key here is that *the root tuple was created by the coordinator, so the coordinator will receive an "ack" if the processing phase succeeds, and a "fail" if it doesn’t succeed for any reason (failure or timeout).

7.     7.If the processing phase succeeds, and all prior transactions have successfully committed, the coordinator emits a tuple containing the TransactionAttempt to the "commit" stream.

8.     8.All committing bolts subscribe to the commit stream using an all grouping, so that they will all receive a notification when the commit happens.

9.     9.Like the processing phase, the coordinator uses the acking framework to determine whether the commit phase succeeded or not. If it receives an "ack", it marks that transaction as complete in zookeeper.

Storm源码分析 - 星星 - 银河里的星星

 

More notes:

? Transactional spouts are a sub-topology consisting of a spout and a bolt

o the spout is the coordinator and contains a single task

o the bolt is the emitter

o the bolt subscribes to the coordinator with an all grouping

o serialization of metadata is handled by kryo. kryo is initialized ONLY with the registrations defined in the component configuration for the transactionalspout

? the coordinator uses the acking framework to determine when a batch has been successfully processed, and then to determine when a batch has been successfully committed.

? state is stored in zookeeper using RotatingTransactionalState

? commiting bolts subscribe to the coordinators commit stream using an all grouping

? CoordinatedBolt is used to detect when a bolt has received all the tuples for a particular batch.

o this is the same abstraction that is used in DRPC

o for commiting bolts, it waits to receive a tuple from the coordinator’s commit stream before calling finishbatch

o so it can’t call finishbatch until it’s received all tuples from all subscribed components AND its received the commit stream tuple (for committers). this ensures that it can’t prematurely call finishBatch

4.5.10Q&A
4.5.10.1          AttemptID的作用?

TransactionAttempt是在TransactionalSpoutCoordinator.java中的sync()中被首次生成的。

TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());

 

https://groups.google.com/forum/?fromgroups=#!searchin/storm-user/transactional/storm-user/yPTeyFx6l5Y/FdZN6xh9RiYJ。AttemptID是随机生成的,可以保证针对同一个batch的不同次尝试具有不同的AttemptID。然后不同的AttemptID将会对应不同的BaseTransactionalBolt 对象。因为在实际运行中,一个batch可能会发生错误,这样就需要重发,每次重发都会产生一个新的TransactionAttempt,TransactionID一样,但是AttemptID不同。

 

用户可根据TransactionID进行去重。AttemptID用以保证两次batch对应不同的处理对象,同时帮助框架区分不同的attempt,避免混淆。假设如果我们为每个Transaction不提供AttemptID而是只有一个TransactionID,比如在Transaction的第一次执行中,它可能出错了,比如有两个commiter bolt,这两个都失败了,同时它们都调用了fail,假设其中一个commiter的fail被很快收到了,然后Coordinator很快重新发起这个事务,此时另一个commiter的fail才到达,但是如果没有AttemptID,那么这个fail虽然是说之前的那个事务失败了,但是只有TransactionID做标识,那么它就会将新发起的事务fail掉,但是实际上这个事务并未真的失败。加入AttemptID后就避免了这种混淆。

 

那些已经失败的事务使用的资源会在一定时间后销毁,如果收到关于它们的状态汇报Coordinator可以简单地忽略它们。而当前事务会变成这个新发起的事务,Coordinator只需要关注这个新事务的状态就可以了。同时由于这种事务执行一半失败,然后又重试的情况的存在,某些commiter Bolt的finishBatch函数可能会被调用多次,因此实现中需要保证finishBatch中操作的幂等性。

4.5.10.2          有没有emitBolt这样的类?

Emitter是放到TransactionalSpoutBatchExecutor中执行的,然后TransactionalSpoutBatchExecutor又会被放到CoordinatedBolt中,所以本质上是有一个Bolt与之对应的。

4.5.10.3          对于Emitter的_sourceArgs 为空吗?它与CoordinatedBolt如何沟通的?

是的。参见TransactionalTopologyBuilder.java:124,传入了一个null参数,会创建一个空的_sourceArgs,所以对于Emitter来说它的_sourceArgs为空,也就是说当它ack了input后,就可以认为该batch结束了。

 

从CoordinatedBolt的角度看,它会在ack(input)后, checkFinishId成功,这样它就会通过COORDINATED_STREAM向它的下级task发送COORD消息。这样通过coordination机制,后面的task就会知道这个batch所应包含的tuple数,从而可以确认batch的结束。

4.5.10.4          TransactionalTopology中的非BatchBlot是否也会被套上一层CoordinatedBolt?

会。

4.5.10.5          Commit Batch时是如何保证顺序执行的?

具有先后关系的Commiter之间的提交顺序,是通过coordination机制控制的。位于前面的Commiter在自己提交之前,是不会向它的后继task发送COORD消息的,而后继task可以执行finishBatch的前提就是收到前面task的COORD消息,并且自己目前已经收到了前面task发送的所有Tuple。

 

具体细节请参考: 4.4.3CoordinatedBolt。

4.5.10.6          Emit是否是只能在finishBatch里调用?

如果不是,那Bolt C如何知道在Process阶段“Bolt C同样也不会调用finishBatch方法, 它的原因是:它不知道它有没有从Bolt B接收到所有的tuple。(因为Bolt B还在等着事务提交)”。

4.5.10.7          TransactionalTopology中是否允许所有的Bolt都不是commiter?如果允许那么Bolt又是如何来确定可以进行资源回收的呢?因为在commiter存在的情况下,它可以回收那些已经commit的事务,但是如果没有commiter,它如何判断事务已经可以回收?

首先我们在TransactionalTopologyBuilder并未看出有这种限制,即允许TransactionalTopology中的Bolt都不是commiter。查看TransactionalSpoutCoordinator,我们发现它也未判断是否存在为commiter的Bolt,而是都会向TRANSACTION_COMMIT_STREAM发送消息。那么问题的关键就在于当一个流上无目标task时,_collector.emit会做什么事情,这样发出去的Tuple何时被ack?因此就涉及到ack机制如何处理这种情况?可以想到的一种实现就是emit的时候直接向acker发送一个val值为0的消息,那么acker很快就检测到该tuple成功处理,然后再回调ack,相当于空走了一个完整的过程,虽有些浪费但是不需要做任何修改。

4.5.10.8          Transaction何时变为PROCESSED?

很明显是通过acking机制实现的,但是TransactionalTopology对acking机制进行了优化,不会追踪所有tuple,而是追踪coordination tuple。但是可能会有这样的一个疑问,这是否会导致一个循环依赖?首先Transaction需要根据coordination tuple的接收,进入PROCESSED状态,但是commiter又需要收到commit命令才发送coordination tuple。而只有进入了PROCESSED状态才能发送commit命令。

 

实际上是这样解开这个依赖的。当COORD消息被传到commiter bolt时,实际上该COORD消息会被立即ack,不会再继续往下延伸这个tuple树。这样transaction就能很快进入PROCESSED状态,然后commiter bolt会收到commit命令,此时它会在继续延伸原来的tuple树(通过emitDirect向下级task发送COORD消息)。

4.5.10.9          Coordinator重放某个batch时,BatchBoltExecutor中收到相同txid但是AttemptID不同的batch时,如何处理?之前的batch计算结果何时会被清除?

对于BatchBoltExecutor来说,它内部的map是以TransactionAttempt为key的,因此它会新建另一个IBatchBolt对象进行处理。之前batch计算结果会在超时后被清除。基本上我们可以将Transaction的replay过程看做是一次全新发起的Transaction,在用户看来是与之前的是同一个,但是在框架内部看是全新的一个,会重新对它进行追踪。它与之前的具有相同TransactionID的Transaction是相互独立,互不影响的。

4.5.10.10     虽然说Emitter需要保证相同TransactionID的batch具有相同的tuple系列,但是有时Emitter可能无法提供这种保证,此时如何保证处理且处理一次语义?

比如用户正在读取一个partitioned message broker,单个事务将包含来自多台机器产生的tuple序列。假设现在在事务fail的同时有一个节点挂了。在没有这个节点的情况下,是不可能为该事务重放出完全一致的tuple序列的。如果节点一直不恢复,那么Topology就会一直hang在那。唯一可能的解决方案,就是允许输出一个与先前的batch具有不同tuple序列的batch。那么这样是否仍可能保证exactly-once messaging semantics ?

 

实际证明对于一个非幂等的 transactional spout来说,是可以实现这种语义的,只是用户在开发Topology时需要做更多的事情。

 

如果对于一个给定的 transaction id,batch内容可以改变,那么我们之前的逻辑“如果数据库中的transaction id与当前batch的 transaction id一样,我们就跳过该batch”就变成不合法的了。这是因为当前的这个batch可能与上次提交的那个batch是不同的,因此结果是需要发生改变的。可以通过在数据库中多存一点状态来解决这个问题。还是以在数据库中存储global count的例子为例,我们假设batch的中间计算结果存储在partialCount变量中。之前我们在数据库中存储了如下状态:

class Value {

  Object count;

  BigInteger txid;

}

对于非幂等的 transactional spout来说,需要存储如下信息:

class Value {  Object count;  BigInteger txid;  Object prevCount;}

更新逻辑会变成如下:

1.      如果当前batch中的 transaction id等于数据库中的transaction id:

val.count = val.prevCount + partialCount.

2.      否则:

val.prevCount = val.count, val.count = val.count + partialCount and val.txid = batchTxid

。。。

5         相关讨论

https://groups.google.com/forum/?fromgroups=#!searchin/storm-user/transactional

1.      将acker放到spout中,可以减少网络传输,避免id碰撞:

2.      task之间的消息是保序的

3.      Emitter会缓存transaction直到调用cleanupBefore ,因为一个batch可能会在commit阶段失败,这时就需要重发,因此需要Emitter缓存它们,否则如果直接删除那么就需要在重发时创建,可能会因为丢失了一些状态无法精确地重发。

4.      优化了DRPC和TransactionalTopology中批处理的tuple树,只有coordination tuples会被anchored。用户如果想fail掉一个batch,抛出一个FailedException即可,这会导致该batch的重发。不会对一个batch里的所有tuple进行ack,而是通过将coordination tuple纳入tuple树进行ack。同时Storm的消息机制会保证,task间的消息传递是保序的,也就是说一定是先收到了该batch的tuple,然后才会收到对应的coordination tuple的。

5.      关于acking中xor导致的数据碰撞的概率分析

6.      When HBase fails and no rollback is possible

7.      Transactional topology Coordinator emits tuple always.  

initializeTransaction()不能阻塞,否则可能影响到其他事务的处理,比如事务的提交,fail。确认下spout的ack和fail与nextTuple是否是在一个循环里?

8.      Transactional topology not commiting?

9.      About TransactionTopology’s attemptID 

10.  Topologies with fan-out and fan-in 

11.  some questions about Transactional topologies

12.  Trident 与老版本spout和bolt的关系

Spout和Bolt抽象将一直保留,用户可以直接使用它们实现Trident无法提供的功能。Trident会自动将stream作为一个一个的batch进行处理,同时自动保证了exactly-once 语义,同时提供了更面向批处理的API。但是以batch为单位进行处理引入了额外的开销,导致延迟增大到至少数百毫秒的级别,与之相比Storm一次处理一个tuple的方式只需要10几毫秒。但是一次处理一个tuple的问题在于无法实现exactly-once 语义,只能实现 at-least-once或 at-most-once 语义。此外当前的Trident无法实现 cyclic topologies,很多机器学习算法可能会用这种拓扑,但是是可以直接用Storm来实现这种拓扑的。

 

Trident可以做任何 transactional topology可以做的事情,这也是 transactional topology将要被废弃的原因。但是 transactional topology并不等于spout+bolt,如下几点就属于spout+bolt可以完成但却是Trident无法完成的:

1. get access to which tasks receive a message

2. direct groupings

3. cyclic topologies

除此之外,主要不同在于batching模型及与外部状态交互形式上。

 

Trident的操作将会被放到Bolt中执行,同时Trident本身负责所有acking机制,用户不需要关心。

13.  understanding-the-parallelism-of-a-storm-topology

 

You Might Also Like