分布式系统

Photon: Fault-tolerant and Scalable Joining of Con

2013年9月25日 阅读(579)

作者:Rajagopal Ananthanarayanan, Venkatesh Basker etc. Google Inc.

原文:http://www.mpi-sws.org/~areznich/files/photon-sigmod13.pdf

译者:phylips@bmy 2013-9-1

译文:http://duanple.blog.163.com/blog/static/70971767201382591541823/

摘要

Photon是Google开发的用于实时地对多个连续数据流进行join的部署于多个地理位置的分布式系统,具有高扩展性和低延迟的特点。该系统可以在不需要人工干预的情况下,完全容忍设施降级和数据中心级的故障。在Google内部,该系统是与广告系统一起部署的,可以用来对像网页搜索请求和广告点击这样的数据流进行join,它的输出结果是进行广告客户费用结算的重要依据。目前的线上环境中,峰值情况下每分钟要处理数百万的事件,平均延迟低于10秒。本文我们会描述在跨地理位置情况下,维护大规模持久化状态遇到的那些挑战和解决方案,并着重讲述下那些源于实践过程的设计原则。

1.导引

用户访问Google时,会产生网页搜索请求、浏览搜索结果以及点击结果页面上的广告这样的一些事件。如果能够对它们进行近实时的分析,将会产生很好的商业效益。这些数据可以帮助广告投放者微调它们的出价、预算和竞价,并且可以根据用户行为实时进行变更,并持续观察这些变更带来的效果。为了提供实时性的统计数据,我们构建了Photon系统,该系统可以在事件发生后的数秒内,将主要的用户事件(比如搜索请求)与后续事件(比如广告点击)关联起来。Photon会通过一个可以将关联事件标识出来的共享标识符,在多个连续事件流上进行join来产生输出结果。

下面我们以一个搜索请求和广告点击为例,来看下Photon的整个处理步骤:

? 当用户在google.com上产生一个搜索请求后,除了搜索结果外Google还会提供相应的广告。提供该广告的网页服务器也会将与该事件相关的信息发送到负责日志保存的多个数据中心,在这些数据中心内日志会被保存到GFS[14]上。日志数据包括诸如广告商、广告文本、在线广告竞卖参数。该数据后面会被用来为广告商生成报告,进行质量分析等。每个搜索请求会通过一个称为query_id的唯一标识符标识。图1中的t1就代表了查询事件的发生点。

? 接收到搜索结果后,用户可能会点击广告。在将用户重定向到广告投放者的网站的同时,该点击请求会被发送给后端服务器。该点击事件也会被记录并保存到多个数据中心。被记录下的数据中包含用户点击广告的信息,会被用来向投放者计费。点击事件中也包含了相应的query_id信息。每个点击事件也会通过一个唯一的click_id进行标识。图1所示点击事件发生在t2。

 

当点击日志被发送到数据中心后,Photon会根据query_id将它与对应的搜索事件进行join,并将所有的搜索事件相关的信息传给新join出来的点击事件记录(如图1所示)。Photon会从多个数据中心的GFS上读取输入事件流,然后再将join好的输出结果写出到多个数据中心的GFS上。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

1.1问题描述

问题的形式化描述:给定两个连续增长的日志流,primary日志流里的每个事件都包含一个唯一的标识符,并且foreign日志流里的每个事件也都包含一个可以与primary日志流里的事件相关联的该标识符。以RDBMS的术语来说,我们可以将这两个日志流看做是具有外键约束[7]的两张表。primary日志流就类似于primary表,foreign日志流就类似于foreign表。Join就相当于在两个日志流上进行inner-join。点击日志就相当于要与primary表进行join的foreign表,因此我们将点击事件称为foreign事件,搜索事件称为primary事件,query_id就是用来进行join的key。

Photon的主要目标就是实时进行连续事件流的join。在Google,我们使用该框架进行很多事件流的join。

1.2系统挑战

在构建Photon进行连续事件流的join时,我们主要碰到了如下一些挑战:

? Exactly-once语义。Photon是用来向广告商收费、向华尔街投资者报告收益的,等等。Photon必须保证同一次点击永不会被计算两次,因为这会导致对广告商的重复扣费。另一方面,如果有些点击没有被Photon处理那么Google自己就会有金钱上的损失。实际中,为了满足这种商业上的需求,我们需要Photon可以保证99.9999%的事件可以在数秒钟内处理完成,同时保证在数小时内100%的事件都能被处理。这种需求意味着Photon必须提供如下语义:a)任意时刻最多处理一次;b)实时情况下近似准确;c)最终可以达到Exactly-once。

? 自动化的数据中心级容错。数据中心经常会面临各种形式的失效。其中一些是计划中的(比如软件升级或硬件替换),还有一些是预料之外的(比如电力故障或网络分区)。服务中断可能会持续数分钟到数天,甚至是数周。在这种环境下,那些只能在单个数据中心运行的系统将会产生严重的弊端。当发生故障时,需要手动在另一个数据中心将系统启动,过程复杂且容易出错。而重构系统的状态还可能需要数小时,这会严重影响系统的可用性。而在一些临时性故障发生时,又很难判断是在另一个数据中心启动系统,还是相信系统可以安全过渡。对于GFS[14]和BigTable[6]这样的一些分布式系统来说,可以出色地处理一定数量下的机器故障。但是它们并不是设计用来处理大规模数据中心级的故障的,因此需要开发者设计它们的应用程序来确保可以优雅地处理这种失效。鉴于对收入的直接影响,需要Photon是一个高度容错的,甚至可以在不需要人工干预和不影响系统可用性的情况下,就能处理好数据中心级的故障。

? 高度可扩展。Photon不仅要能够处理今天的每分钟数百万事件的峰值,还要能处理未来不断增长的事件数量。

? 低延迟。Photon的输出是用来为广告投放者提供统计数据,帮助他们分析效果,优化预算的。要求Photon可以在数秒内完成join可以大大提高整个商业流程的效率。

? 乱序流(Unordered streams)。我们的primary流(用户搜索请求)基本上还是按照事件发生时间排序的。但是foreign流基本上都不是按照query发生时间排序的,因为点击可能是发生在query之后的任意时刻。这就使得它很难使用现有的window join算法[24,25]。

? Delayed primary stream。只有当对应的query事件在数据中心可用时,点击事件才能成功的join上。逻辑上来说,query事件总是要发生在相应的点击事件之前。但是,用于生成query事件和点击事件的服务器是分布在全球各地的(为了最小化终端用户延迟),因此query日志和点击日志是被独立发送到各数据中心的。而query日志又比点击日志大几个数量级,因此query日志比相应的点击日志后到的情况屡见不鲜。无论query日志何时可用,Photon都要能够完成join。这就使得Photon与标准RDBMS有所不同,对于标准RDBMS来说外键总是会在primary表中存在。

1.3我们的贡献

? 据我们所知,这是第一篇在如下系统限制条件下对多个连续数据流的join问题进行形式化定义并予以解决的论文:exactly-once语义,数据中心级容错,高可扩展性,低延迟,乱序流和Delayed primary stream。

? 我们提出了在进行跨地理位置的情况下进行持久化状态维护的挑战和解决方案。在使用普通商品化硬件时,需要格外注意提高其容错性和吞吐率。

? 该论文所提出的解决方案已经完整实现,并且部署上线运行了几个月。基于这些实际经验,我们会提供细节化的性能结果,设计权衡以及通过部署获得的重要设计经验和教训。

在第2节,我们会讨论下在解决所有上述系统挑战时,是如何将我们系统的关键状态存储到多个数据中心。第3节,会具体讨论Photon的设计和架构。第4节,会介绍我们的线上部署参数及从线上收集到的性能参数。第5节,列出了构建Photon系统时获取的经验教训。第6,7节,描述了相关研究,未来工作及总结。

2.基于Paxos的ID Registry

实现容错最简单的方式就是使用replication[22]。我们可以通过在多个数据中心运行相同的系统来处理数据中心级的故障。该策略已经被应用到几乎Google所有的网页服务器和广告服务器,使得数据中心发生故障时对于系统来说是透明的,保证了整个服务的连续性。负载平衡器会自动将终端用户请求重定向到最近的服务器。

为了提供数据中心级的容错,位于多个数据中心的Photon worker会试图对同一个输入事件进行join,但是这些worker必须协调它们的输出以保证每个输入事件最多被join一次。Worker间共享的关键状态由过去N天已经被join的event_id(比如click_id)集合组成,该状态被存储在IdRegistry中。常量N是通过权衡存储开销与抛弃延迟超过N天的事件所带来的成本来确定的。需要注意的是,对于那些延迟超过N天的事件,系统只能选择抛弃,因为系统无法对这样的事件进行去重,而我们最重要的目标是要避免重复扣费。

在将join好的事件写出之前,每个worker都要验证该event_id是否已经存在于IdRegistry中。如果已经存在,那么worker就会跳过该事件的处理。否则,worker就会尝试将event_id写入到IdRegistry。在将join好的事件写出之前,worker必须要能够成功的插入event_id。IdRegistry必须要保证一旦某个event_id被写入,后续的写请求将会失败。

对于IdRegistry来说,关键的需求如下:

? 数据中心级的容错。IdRegistry必须要同步地复制到多个数据中心,这样即使是一个或多个数据中心故障,它依然可以继续提供服务。所能容忍的最大故障数据中心数目必须是可配置的。

? Read-Modify-write事务。Workers必须能够支持条件提交,比如只有当event_id不存在于IdRegistry时才将它写入。

由于这种容错和强一致性需求,我们选择使用Paxos[18]来实现IdRegistry。该算法可以确保将已提交的value值同步复制到半数以上的副本中。

图2展示了IdRegistry在Photon中的使用。我们会在多个数据中心部署相同的Photon pipeline。每个pipeline负责处理最近的那个数据中心保存的所有事件—在Google,日志会被保存到多个地理位置的数据中心。为确保每个事件最多被处理一次,IdRegistry负责保存那些事件过去N天完成join的event_id。由于关于这些已经完成join的事件的全局列表是存储在IdRegistry中的,因此多个数据中心的Photon pipeline可以独立工作而相互之间不需直接通信。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

2.1 IdRegistry服务架构

为了实现一个高效的IdRegistry,我们采用一个基于Paxos实现跨数据中心复制一致性的in-memory key-value存储。通过将事件id作为key存储,IdRegistry可以快速识别出一个事件已经被join了还是没有。如果event_id已经存在,它可以将那些试图提交相同id的worker fail掉。如果event_id还不存在,IdRegistry就执行一个Paxos提交将那些依旧不存在的key插入(比如通过一个Read-Modify-Write事务)。

IdRegistry是建立在PaxosDB[4]之上。PaxosDB实现会为每个新来的value值重复执行Paxos一致性算法,保证每个副本访问到的是相同的value值序列。换句话说,对于每个最新的副本来说,该in-memory key-value存储是一致的。PaxosDB还会保证任意时刻最多只有一个组成员会成为master。只有master能向Paxos提交更新,如果master死了,PaxosDB会自动选举出新的master。

图3展示了单个IdRegistry服务器的架构。Photon worker与IdRegistry之间的交互实际上是一个客户端-服务器模型。Photon worker扮演了客户端的角色,它会发送两种类型的请求:a)用来检查某个event_id是否已经被提交了的查询请求;b)用于当前仅当event_id不存在时才插入的条件提交请求。IdRegistry上的RPC调用处理线程会接收到来自客户端的RPC请求。这些处理线程本身都是非常简单的,它只是负责将输入请求添加到一个内存队列。会有一个后台线程负责从队列中取出请求,在PaxosDB上执行事务以及执行PRC调用的callback将请求结果返回给客户端。

 Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

2.2可扩展的IdRegistry

为了保证某个相对较大地理区域的故障不会影响到IdRegistry,我们将IdRegistry副本部署到不同的地理位置。但是这种需要独立区域的方式,将会导致IdRegistry的吞吐率受限于网络延迟。根据统计,不同地理区域的网络round-trip-time可能高于100ms。这将会导致Paxos的事务吞吐率降低到每秒不到10个事务,与之相比实际需求的规模比它多多个数量级—我们需要每秒处理成千上万个事件。尽管我们可以通过在客户端将多个key打包到一个RPC里面,但是在客户端数目很多的情况下,效果也有限。我们采用两种机制来实现IdRegistry的扩展:服务端batching和sharding。

2.2.1服务端batching

IdRegistry副本位于不同的地理位置,因此根据round-trip延迟单个Paxos提交将会花费100ms。延迟主要是由固有的拓扑结构造成的,而非要发送的数据量。实际上,每个Paxos提交只需要发送少量的事件级信息。基于此,我们实现了服务端batching来提高每个IdRegistry服务器的吞吐量。关键思路就是将多个事件层次上的提交组合成一个大的提交。这类似于经典的数据库技术—group commit[23]。

如图3所示,IdRegistry服务器有一个后台线程(我们称之为Registry线程),该线程负责从队列中取请求,转换为PaxosDB事务,向客户端发送响应。该线程会一次取多个请求,将它们作为一个PaxosDB事务。在对请求进行batch时,Registry线程会进行应用级别的冲突解决。考虑多个请求试图向IdRegistry写入相同event_id的场景。如果event_id不存在,Registry线程会将event_id插入到PaxosDB,然后向客户端返回成功并将该事件标记为已经完成了join;其他客户端会收到一个失败消息需要将事件丢弃。我们通过利用PaxosDB提供的多行事务(可以自动进行条件检查并更新多个key/value对)来实现该行为。

假设针对某个事件的单个RPC请求大小只有100字节,我们就可以很轻易地将数千个RPC请求batch到单个PaxosDB事务里。

2.2.2 分片(Sharding)

尽管服务端batching可以显著提高IdRegistry的吞吐率,但是它仍然无法满足我们的需求。采用Sharding主要源于如下洞察:在IdRegistry中,具有不同id的事件处理是相互独立的。为了利用这种独立性,我们将IdRegistry需要处理的event_id空间划分为多个不相交的分片(shards),这样每个独立的分片可以由独立的IdRegistry服务器进行处理。这样IdRegistry服务器就可以并行运行,并接收分配给它的那些event-id对应的请求。图4展示了请求是如何根据它们的event_id被发送给不同的服务器分片的。Shard编号是通过event_id对总shard数取模的一个确定性hash计算得到的。需要指出的是,每个IdRegistry服务器仍然可以处理来自多个分片的事件;它就可以并行地向多个IdRegistry服务器分片发送RPC请求。这种分片策略提供了高扩展性,同时使得IdRegistry具备了我们所需要的吞吐率。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

动态改变分片数目


伴随着写入日志的事件数的增长,我们需要为IdRegistry添加更多的分片。但是在添加更多分片时,简单地改变模运算中的分片数会违反hash函数的确定性属性,同时这会导致相同event_id的事件在分片数增加后被映射到不同的分片编号。这是不可接受的,因为这会导致输出结果产生重复。因此我们需要一种支持分片数动态改变,同时还能保留现有event_id在分片间映射方式的确定性映射机制—也就是说该机制必须支持后向兼容。

为解决上述问题,我们采用了一种基于时间戳的分片配置,该配置会定义任意给定时间段内的分片数。我们将时间戳与event_id进行关联,并要求两个时间戳间的时钟偏差控制在S秒以内(使用一个全局性的TrueTime[8]服务器,具体细节参见3.1节)。如果当前时间是t1,我们可以选择在未来的时间t2>t1+S,指定那些时间戳小于t2的事件需要使用改变前的分片数进行hash,而那些时间戳>=t2的事件则需要使用改变后的分片数进行hash。这就保证了对于给定的相同事件id我们一定会计算出相同的分片编号。比如,IdRegistry具有100个分片,那么分片配置将会是如下:

start time : 0; end time : 正无穷; number of shards : 100

如果我们将分片数由100改成了120,当前时间是5000,偏差是200,我们的分片配置将会变成:

start time : 0; end time : 5200; number of shards : 100

start time : 5200; end time : 正无穷; number of shards : 120

我们将该配置信息保存在PaxosDB中,确保所有的IdRegistry服务器和客户端使用相同的配置。同样地,也可以采用相同方式减少分片数。

 

2.3删除老的key值

我们只能在IdRegistry上保存有限数目的event_id。由于每个event_id都具有时间戳,IdRegistry可以据此删除那些超过N天的id。我们将N称为垃圾回收阈值。

在每个IdRegistry服务器上,都有一个后台线程周期性的扫描老的key值,如果它们的时间戳超过垃圾回收阈值就将它们从IdRegistry中删除。垃圾回收线程只会运行在IdRegistry的主副本上。为了解决master切换时,IdRegistry服务器时间戳不一致的问题,我们会在IdRegistry服务器上保存垃圾回收时的边界时间戳,一个服务器分片对应一个时间戳。该时间戳会被另一个使用TrueTime[8]的线程周期性的更新,同时我们会保证它永不会倒退。垃圾回收线程只会将那些事件戳小于记录下的边界时间戳的event_id记录清除。

如果客户端试图对一个小于边界时间戳的事件进行插入或查找,IdRegistry会返回一个具有特定错误码的失败响应,同时客户端会跳过对该事件的处理。

3.单数据中心Pipeline

如第2节所述,我们在多个数据中心部署相同的Photon pipeline。现在我们来描述下单个数据中心内的pipeline架构,如图5所示。在下面的例子中,我们会以点击和搜索日志为输入,但是该架构可以应用于任何类似的数据流。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

在该pipeline中,主要有三个组件:dispatcher,负责不断地读取点击日志并将它们喂给joiner;EventStore,负责提供针对query的高效查询;joiner,负责通过EventStore为给定的点击找到相应的query,使用IdRegistry进行去重,同时产生join后的输出结果。将点击与相应的query进行join的一系列步骤如下:

1. dispatcher负责在日志到达时,读取里面的点击事件,并且在IdRegistry中进行查询。如果该click_id已经存在于IdRegistry,dispatcher就会认为它已经被join过了,会跳过对它的处理。

2. 如果click_id还未存在于IdRegistry,dispatcher会将它异步地发送给joiner,并等待响应。如果joiner无法成功完成join(比如由于网络问题,或者未找到query事件),dispatcher通过在某个backoff周期后将它发送给另一个joiner实例不断进行重试。通过这种方式以最小代价来保证at-least-once语义。

3. Joiner从点击日志中抽取query_id,然后到EventStore里查找相应的query。

4. 如果无法找到对应query,joiner会发送一个失败响应给dispatcher,这样它就可以继续重试。如果query被找到了,joiner就会尝试将click_id注册到IdRegistry。

5. 如果click_id已经存在于IdRegistry,joiner就会认为该事件已经被join过了。如果joiner可以将click_id注册到IdRegistry,joiner就会将来自于query的信息存储到点击事件中,并将它写出。

需要注意的是,上面的算法是有可能产生数据丢失的,比如joiner可能成功将click_id注册到IdRegistry,但是可能在写出时失败。在后面的3.3.2小节,我们会讨论下如何最小化这种数据丢失,同时在3.3.3小节我们会讨论下如何对那些丢失的输出进行恢复。

图6展示了运行在多个数据中心的相同Photon pipeline,全局状态由IdRegistry维护。剩余的小节里,我们会具体描述下基本架构,每个组件的设计及相关技术挑战。

 Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

3.1唯一Event_Id的生成

鉴于被处理数据的重要性,服务器会将所有事件写入到像GFS这样的持久化存储中,同时会将它们备份到多个数据中心。当某个事件被某个服务器记录时,它就被赋予一个全局唯一的标识符(比如上面讨论中的query_id和click_id)。

一个event_id由三部分组成:ServerIP,ProcessID和Timestamp。每个服务器保证生成的时间戳是单调递增的。同一个日志文件里的所有事件基本上是按照时间戳近似有序的,因为同一个进程里有多个线程在并行地生成event_id。由于event_id都是由服务器在本地生成的,而本地时钟是有偏差的,为了将偏差限制在S秒内,我们利用了TrueTime API[8]提供的时钟同步原语对本地时钟进行校正。通过使用GPS和原子钟,它可以将时钟不确定性限制在一定范围内。每个生成event_id的服务器都会运行一个后台线程,每隔S秒就向TrueTime服务器发送RPC请求进行本地时钟同步。

3.2 Dispatcher:确保at-least-once语义

Photon的输入由保存在日志文件里的事件组成。这些文件的大小在持续增长。当日志文件大小达到阈值时,事件会被写到新的文件中。在任意时刻,每个Photon实例都需要追踪数千个不断增大的日志文件。因此,Dispatcher的主要目标就是:持续对日志文件进行监控,以一种可扩展和即时的方式读取新事件,然后将它们以最小的延迟分发给joiner。

Dispatcher会周期性的扫描日志所在目录,检查是否有新文件产生及现有文件大小是否增加。Dispatcher会在本地GFS cell上为每个文件存储一个状态。该状态信息包括当前文件,以及每个文件的当前读取位置。为实现高度的扩展性,Dispatcher会采用多个工作进程并发处理日志文件。通过持久化状态,保证了处理信息在进程重启时也不会丢失。所有的工作进程共享本地存储的持久化文件状态信息,这样它们不用相互介入就可以对日志文件的不同部分进行处理。

在将事件发送给joiner之前,Dispatcher会在IdRegistry中进行查找确保该事件还未被join。根据(第4节)我们的测量结果,这种优化显著地提升了性能。

3.2.1重试逻辑

需要注意的是只有当相对应的搜索事件在日志数据中心可用时,点击事件才能成功被join。但是正如在第1节里提到的,部分搜索日志比点击日志后到达数据中心的情形并不罕见。

在从日志文件中读取了一个点击事件后,Dispatcher就会向joiner发送一个RPC,同时异步地等待响应。当Dispatcher无法从joiner收到成功的响应时,它就会将该点击保存在本地GFS上,后面进行重试,并以此来确保at-least-once语义。为了对重复故障下(比如由于网络拥塞)的重试率进行限制,Dispatcher会使用一种指数回退(backoff)算法。如果超过一定数目的重试并且点击老于某个阈值后,joiner仍无法对点击完成join,joiner会将该点击标记为不可join的,并向Dispatcher返回成功。实际中,不可join的点击只占总点击的很少一部分。

3.2.2处理数据中心失效

由Dispatcher维护的文件状态存放在它所在的数据中心本地的。如果本地GFS数据中心发生故障,Dispatcher会停止运行。但这并不影响Photon系统的E2E可用性,因为我们至少拥有位于不同数据中心独立运行的两个相同的Photon pipeline。一旦发生故障的数据中心恢复正常,如果持久化状态完好,dispatcher就能在它停止处恢复,开始处理积压的日志。由于积压的日志大部分已被其他数据中心的pipeline处理,这些事件实际已经存在于IdRegistry中,因此Dispatcher可以很快追上(catch-up)最新状态,开始处理最近的日志。我们会针对IdRegistry的请求进行限流,避免它过载同时确保catch-up期间的性能。

极端情况下,dispatcher的持久化状态可能无法从数据中心故障中恢复。如果其他数据中心的pipeline一直正常运行,我们可以手动对它进行初始化,使得它可以忽略累积日志直接处理当前最新日志。

3.3 Joiner

Joiner被实现为一个无状态的RPC服务器,负责接受来自dispatcher的请求,与IdRegistry和EventStore进行协调执行join以及特定的商业逻辑。

3.3.1处理逻辑

在接收到来自Dispatcher的RPC之后,joiner会将click_id和query_id从点击事件中抽取出来。然后,joiner会向EventStore发送一个异步RPC调用,去查找给定query_id对应的query。如果EventStore无法找到相应的query,joiner会向Dispatcher返回一个错误码,这样在一定的回退时间后Dispatcher就会再重发。当joiner收到来自Dispatcher的一个事件后,它首先检查目前是否有太多请求在发送,如果是它会拒绝掉,然后Dispatcher稍后再进行重试。为保证joiner处理的事件流量保持平滑,这样的限流是很必要的。

EventStore查找成功会返回与点击相对应的query。在处理的下一步骤,query和点击将会被传给一个叫adapter的库。通常,adapter只是简单地将两个事件合并为一个join的事件,然后将它传回给joiner。但是,它也支持采用一些用于过滤的应用特定的商业逻辑,或者是基于query的某些属性强制joiner跳过某些点击。将这些业务逻辑封装到adapter,增强了Photon的灵活性。这样的话,不需要对核心架构做任何修改,就可以对任意的两个流进行join。

在收到来自adapter join后的结果后,joiner将会尝试通过一个异步RPC向IdRegistry注册click_id。如果成功的话,joiner将会接着将join结果添加到输出日志。因为IdRegistry保证了at-most-once语义,对于任意click_id将最多有一个joiner实例能够成功完成注册,避免了在输出日志中出现重复记录。

如果joiner无法成功注册该click_id,它就会将该事件丢弃,我们将这称为wasted join。在我们的产品部署中,会至少在两个数据中心部署dispatcher,因此对每个点击事件Photon至少会读取两次。但是我们会通过让dispatcher发送给joiner前先到IdRegistry中查找下来尽量最小化这种wasted join。

与Dispatcher不同的是,joiner不会持久化任何状态。此外,所有的joiner功能上都是等价的,因此我们可以通过基于RPC的负载平衡机制将请求在所有joiner间进行分发。

3.3.2最小化Joiner Losses

只有在将event_id成功写入到IdRegistry后,joiner才会将事件写到输出日志。一旦event_id注册到IdRegistry,其他的注册请求将会失败。IdRegistry保证了这一属性,但是由于输出写入与id注册间事务原子性的缺乏,可能导致在处理事件时产生如下所述的一些问题。

尤其是在IdRegistry成功注册了event_id后,joiner超时或者是返回的响应在网络上丢失了的话,这种问题会更严重。来自于相同joiner的后续注册请求将会由于click_id已经存在而失败,同时点击将永不会产生join结果。在我们的早期部署环境中,我们观察到有大量的点击因为这个原因而丢失。

为了解决这个问题,当joiner提交一个click_id到IdRegistry,除了click_id它还会发送一个全局唯一的token(由joiner的服务器地址,进程标识符和时间戳组成)给IdRegistry。IdRegistry会将该joiner相关信息作为一个与click_id相关联的value值进行存储。如果joiner在给定的超时时间内未收到来自IdRegistry的响应,它会以相同的token值进行重试。

在IdRegistry收到一个针对现有click_id的注册请求时,它会检查存储在IdRegistry里的token值是否与请求里的一致,如果一致就认为它是上次成功注册该click_id的joiner因未收到响应而进行的重试。在这种情况下,IdRegistry将会返回成功给joiner,该joiner就可以成功输出日志。这种机制使得我们可以优雅的处理joiner的重试,同时大大降低了点击丢失率。在我们的线上部署环境中,通过这种方法将点击丢失率降低了两个数量级。

另一种事件丢失的潜在来源在于joiner的意外crash。一旦joiner向IdRegistry发送了RPC请求,之后可能成功完成注册,但是joiner所在的工作进程可能在接收到确认消息及写日志前发生crash和重启。因为Photon worker都是运行在商用硬件上,这种临时故障会很常见,尤其是在规模很大的情况下。而且其他joiner也不会再向输出日志提交该事件,因为保存在IdRegistry的token不再属于任何joiner。我们通过对joiner可以向IdRegistry发送的RPC请求数进行限制来最小化这种丢失。一旦达到限制,joiner就会对来自dispatcher的输入请求进行限流。

3.3.3验证与恢复

如前所述,由于在将事件记录入IdRegistry和将事件输出到日志之间事务原子性的缺乏,Photon可能会在输出日志中丢失掉某些事件。比如在如下场景下可能会引发这个问题:a)在写入到IdRegistry后,joiner发生了crash或重启;b)日志存储系统发生了数据丢失;c)代码bug。

正常情况下,观察到的数据丢失率都是很低的—小于所有参与join的事件总量的0.0001%。因此,下面描述的恢复模式主要是作为针对bug,自然灾害或者是不可控制的故障的一种预防措施。

Photon也提供了验证系统,来检查输入中的每个事件是否都出现在了输出中。如果出现在IdRegistry中的某个事件没有出现在输出中,我们会通过IdRegistry中的token读取该事件对应的服务器地址和进程标识符,找到对应的joiner。如果joiner发生过crash或重启,那么该事件可以安全地被重新处理而不会导致重复。为了恢复该事件,我们会将它从IdRegistry中删除,同时将该事件重新添加给dispatcher。

恢复系统只需要读取丢失事件的token信息。因此,我们可以通过将那些已经成功提交到输出日志的事件token信息删除,来优化IdRegistry的存储开销。我们持续不断地扫描join后的日志,并将其对应的token值从IdRegistry中清除。因此IdRegistry只需要保存那些丢失事件的token信息到将它们恢复为止。

3.4EventStore

EventStore提供了根据query_id找到对应query事件详细信息的查询服务。实现这种查询的一种简单方式就是,顺序读取所有查询日志,然后将query_id与该query所在日志名称及偏移的映射信息存储起来。该映射关系可以存储在一个分布式hash表(比如BigTable[6])中。

我们基于数据本身的特点构建了两种EventStore实现:CacheEventStore,充分利用事件的时间访问局部性(比如大部分的广告点击通常在搜索请求发出后很快就产生了);LogsEventStore,充分利用了日志文件里的搜索请求基本上是根据事件发生的时间戳近似有序的(详见3.1节)。

3.4.1 CacheEventStore

CacheEventStore是一个纯内存的类似于Memcached[12]的分布式key-value存储,为最近的查询提供高吞吐率和低延迟服务。CacheEventStore是根据query_id进行hash分片的,同时我们采用一致性hash[17]来支持平滑地进行rehashing。CacheEventStore会通过一个能够顺序读取查询日志的简单读者进程进行预热,将数据保存到内存中。CacheEventStore可以将最近几分钟的事件保存到内存中,当它被填满时,最近最少被访问的事件将会被换出。由于外部事件要远远少于保存在CacheEventStore中的主事件,因此会有很多事件永远都不会被访问到。CacheEventStore是一个best-effort系统:在cache不命中的情况下,查询将会被推送给LogsEventStore。它未使用任何持久化机制,单纯是为了优化查询的延迟,以及通过避免将请求发送给LogsEventStore,节省磁盘操作。尽管为了对它进行预热,我们会读取所有主日志,不过都是顺序读取,同时避免了LogsEventStore所需的昂贵的随机读取。

3.4.2 LogsEventStore

LogsEventStore负责为那些无法由CacheEventStore处理的请求提供服务。正常情况下这种请求只占总查询的一小部分(10%)。但是在catch-up期间处理老日志时或者CacheEventStore出现问题时,LogsEventStore可能需要服务所有的查询请求。

LogsEventStore通过查阅一个持久化的日志文件map来确定某事件在日志文件中的近似位置。然后只需要从该位置开始,顺序读取少量数据就可以找到对应的事件。日志文件map实际是一系列键值对,key是event_id,value值是该事件在日志文件中的位置。这些键值对会在固定间隔(每W秒或每M字节)内,在事件读取者读取主日志文件对CacheEventStore进行预热时产生。这些键值对会被存储在BigTable[6]中。为了找到事件对应的文件名和在文件中的偏移,LogsEventStore会进行一个起点为ServerIP:ProcessId:(Timestamp-W),终点为ServerIP:ProcessId:Timestamp的区间扫描。并不一定要有key存在于table中,但是只要主事件读取者配置地每W秒就生成一下map的键值,结果将至少包含一条记录,通过该记录就可以将读取者定位到距离目标事件M字节的范围内。通过调整M和W的值,我们就可以在map表大小与每次查询所需的I/O量之间找到一个平衡。

4.性能结果

Photon已经上线一年多了,实际证明它要比我们之前的单数据中心系统具有更高的可靠性。在此期间内,Photon在未影响到End-to-End延迟的情况下,历经多次计划中及计划外的数据中心故障。Pipeline及IdRegistry副本的实际配置如下:

? IdRegistry副本被部署在美国三地的五个数据中心。这三个地区的网络来回延迟达到100ms。每个地区最多有2个副本。我们配置IdRegistry,使得它只要在五个里有三个达到一致时,就可以完成提交。这种配置使得我们可以处理某个区域的完全失效。

? Pipeline中的所有其他组件(包括dispatcher,joiner等)被部署在美国分别位于东西海岸的两个数据中心。这两个区域也都有与之临近的存储日志的数据中心。

下面这些统计数据集中展示了Photon的规模和效率:

? Photon每天会产生数十亿的join事件。峰值期间,Photon每分钟需要处理数百万的事件。

? 每天,Photon会消耗数TB的外部日志(如点击日志),和数十TB的主日志(如搜索日志)。

? 在每个数据中心有超过1000个的IdRegistry分片在运行。

? 每个数据中心部署了数千个dispatcher和joiner,以及数百个LogsEventStore和CacheEventStore worker。这两个组件会被备份在全球的两个或多个数据中心。

鉴于数据的重要性,Photon会监控大量的性能度量参数。下面我们会展示一些来自实际生产环境的性能数据。需要指出的是由于商业方面的原因,下面某些图中的时间区间和粒度未标识出来。

4.1端到端延迟

 

我们将每个被join的事件的端到端延迟定义如下:当joiner准备输出一个joined事件时,我们会用当前时间戳减去该事件进入日志时的时间(直接通过event_id的时间戳获取)作为延迟。图7展示了一个30天周期内的端到端延迟的第90个百分位数分布情况,从图中可以看出90%的被join的事件的端到端延迟小于7秒。这种低延迟主要是因为pipeline的多个阶段都是通过RPC调用相互沟通,而无需将中间数据写入磁盘(见第3节)。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

一个潜在的性能瓶颈是由LogsEventStore所需的磁盘操作导致的。通过CacheEventStore,Photon显著降低了基于磁盘的查找次数。图8展示了CacheEventStore的命中率。图中表明,大部分的查询都由CacheEventStore直接处理掉了,只有少部分的流量是通过较慢的LogsEventStore处理的。值得注意的是,与流量的增长相比LogsEventStore处理的流量要相对平稳些。这意味着随着流量增长,命中率还有空间提高。

 Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

4.2数据中心故障的影响

 

图9展示了过去某个时间段内在两个独立数据中心的Pipeline生成的被join的事件数。当两个数据中心都处于健康状态时,它们基本上是一半一半的。但是当其中一个发生故障后,另一个就自动开始负责处理全部的事件。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

类似的在某个IdRegistry数据中心down机时,Photon的整体端到端性能也未受影响:因为在我们的配置中,总共运行了5个PaxosDB副本,最多可以容忍其中两个出现问题。

4.3 IdRegistry性能

在2.2节我们讨论了IdRegistry如何将多个客户端请求打包成一个PaxosDB请求来提高吞吐量。图10展示了单个IdRegistry分片服务端batching的效果。该分片每秒钟会接收数百个客户端写请求。但是batching之后,PaxosDB每秒的事务数降到了6-12。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

   

图11展示了IdRegistry基于时间戳的动态re-sharding效果。在重新分片之前,分片0的大小为1.5GB,之后我们将分片数调整为原来的三倍并约定在周五的12点起生效,之后分片0的大小逐渐变小,而新增加的分片90则逐渐变大,直到最后它们都变成了500MB大小。IdRegistry会对那些生命期超过3天的事件进行垃圾回收,这也是为何它们基本上在三天后达到相同大小的原因。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

4.4资源利用率

如第2节所述,我们通过在多个数据中心运行独立的pipeline来容忍数据中心级的故障,很明显这会导致某些已完成工作的重复。回忆下3.2节,曾提到通过让dispatcher查找IdRegistry来降低不必要的join。图12表明即使我们在两个数据中心将事件都读出来,实践中这种优化都非常有效:只有不到5%的事件会被两边的joiner都处理过。

Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

这种查询技术也大大降低了down机一段时间之后的dispatcher追赶时间。图13展示了经历2天down机之后,某数据中心的dispatcher恢复过程。由于大部分的事件已经被其他健康的数据中心处理了,所以dispatcher很快就追上了最新状态。

 Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams(译) - 星星 - 银河里的星星

 

5.设计经验

将我们在设计和实现该系统中的经验总结如下:

? 为了构建一个基于Paxos的可扩展分布式系统,我们需要最小化由系统管理的关键状态,理想状态下应将它限制在元数据的范围内。通过Paxos进行的数据写入是非常昂贵的,因为需要将它同步地写入到广域网上的多个数据中心。当然如果数据规模相对小,也是可以像Spanner那样直接将它们全部通过Paxos完成写入。

? 在Photon中,IdRegistry是用来维护全局状态一致性的关键,反过来想它也可能成为限制系统扩展性的瓶颈。为了在设计中避免这一限制,我们做出的最重要决定就是允许它在运行中支持动态的resharding。

? 与通过磁盘进行数据通信相比,RPC通信更有助于降低系统的端到端延迟。但是使用RPC需要对丢失的RPC进行应用级的检查和重试机制来保证可靠性。 尤其是在实际中需要对异步的RPC调用进行限流避免服务端过载。设计独立的job单元(在这里,就是dispatcher)负责完成其他job间基于RPC的通信。

? 为了让系统具有更好的扩展性,最好是将各种任务封装成独立的worker池(换句话说,就是进行分治)。在我们之前的joining系统中,joiner和EventStore完成的事情都是在同一个组件中完成的,同时我们也没有CacheEventStore。由于LogsEventStore需要打开文件,进行磁盘操作,这使得worker只有在处理那些query都在一个文件里的点击事件时才表现地比较高效。这就大大限制了系统的扩展性。对于给定的某项任务,只有将woker设计得可以很好地处理任意的输入片段,系统才会有好的扩展性。

? 某项流处理系统[26]推荐将独立事件组装成更大的batch。只有当一个batch内的事件可以同时处理时,这样做才有意义。对于我们的场景来说,经常能够碰到某点击事件先于它对应的查询事件到达。如果我们将多个点击打成一个batch,那么只有当对应的query都可用的时候整个batch才能够提交。因此是否需要进行batch,需要系统设计者仔细斟酌。

6.相关工作

在并行分布式RDBMS[19]领域,有着关于join算法的丰富研究。过去的十几年,研究人员已经提出了很多在连续的多个数据流上进行join的技术[1,3,5,13,15,20,24-26]。据我们所知,已有的这些工作都不是在如下限制条件下解决的该问题:exactly-once语义,数据中心级容错,高可扩展性,低延迟,乱序流和Delayed primary stream。[24]采用了现代多核系统,而不是商品化硬件。[3]采用了MapReduce进行数据流的join。[21]提出采用Paxos实现容错的数据库多副本一致性。与Photon不同,他们将多个Paxos副本存放在单个数据中心内,因为无法处理数据中心级的故障。[26]提出将连续数据流切分成离散的单元,这对于Photon并不适合,因为我们可能会看到离散流里的某些点击可能先于它们对应的搜索请求到达。[9]提出了一种关于join的近似算法。大多数现有的join算法假设两个输入流都是根据它们共享的标识符有序的,因此他们会限定一个窗口进行join[16]。而在Photon中,点击与搜索请求间的延迟是任意大的。

Spanner[8],Megastore[2]和DynamoDB[11]提供了跨数据中心的一致性多副本存储服务。但是Spanner和Megastore目前均未支持服务端batching,无法将多个客户端请求打包成一个Paxos提交。

7.总结及未来工作

本文描述了我们在构建和部署可以应对数据中心级故障的分布式流式joining系统中的经验。Dispatcher会持续将事件发送给joiner,直到在IdRegistry中注册了该事件为止,以此来保证at-least-once语义。IdRegistry是系统的核心,因为它保证了事件最多会在输出中出现一次。Dispatcher和IdRegistry一起保证了事件丢失只可能在一种情况下发生,即系统将事件成功提交给了IdRegistry但是在将事件写入到输出日志时失败了。我们采用一种离线机制来对这种数据丢失进行恢复。本系统没有单点,同时所有的组件都可以伴随着流量的增加而线性扩展,包括系统的关键状态都可以通过IdRegistry的resharding来进行扩展。组件间通过异步的基于RPC的通信方式来降低延迟,同时帮助系统实现富统计信息的实时可用性。

我们目前正在对Photon进行扩展以支持多个流的join,以及实现亚线性的扩展能力。为了进一步的降低端到端延迟,我们计划提供一种更快速的RPC方式,使得那些产生点击和查询事件的服务器可以直接向joiner发送RPC调用,而不再是由dispatcher等待数据被拷贝到数据中心。绝大多数的事件都可以通过这种更快速的方式处理,然后那些落下的事件可以通过现有的基于日志的系统进行处理。

参考文献

[1] D. J. Abadi et al. \The Design of the Borealis Stream Processing Engine". Proc. of CIDR 2005, pp.277-289.

[2] J. Baker et al. \Megastore: Providing scalable, highly available storage for interactive devices". Proc. of CIDR 2011, pp.223-234.

[3] S. Blanas et al. \A comparison of join algorithms for log processing in Mapreduce". Proc. of SIGMOD 2010,pp.975-986.

[4] T. D. Chandra, R. Griesemer, and J. Redstone. \Paxos made live: an engineering perspective". Proc. of ACM PODC 2007, pp.398-407.

[5] S. Chandrasekaran and M. J. Franklin. \Streaming queries over streaming data". Proc. of VLDB 2002,pp.203-214.

[6] F. Chang et al. \Bigtable: A Distributed Storage System for Structured Data". ACM TOCS 2008, 26.2,pp.4:1-4:26.

[7] E. F. Codd. \A Relational Model of Data for Large Shared Data Banks", Communications of the ACM 13(6): p377-387, 1970.

[8] J. C. Corbett et al. \Spanner: Google’s Globally-Distributed Database". Proc. of OSDI 2012.

[9] A. Das, J. Gehrke, and M. Riedewald. \Approximate join processing over data streams". Proc. of SIGMOD 2003, pp.40-51.

[10] J. Dean and S. Ghemawat. \MapReduce: Simplied data processing on large clusters". Proc. of OSDI 2004,pp.137-149.

[11] G. DeCandia et al. \Dynamo: Amazon’s Highly Available Key-value Store". Proc. of SOSP. 2007, pp. 205-220.

[12] B. Fitzpatrick. \Distributed Caching with Memcached".Linux Journal, Issue 124, 2004, pp.5.

[13] B. Gedik, P. S. Yu, and R. R. Bordawekar. \Executing stream joins on the cell processor", Proc. of VLDB 2007,pp.363-374.

[14] S. Ghemawat, H. Gobio_, and S-T Leung. \The Google File System". 19th Symposium on Operating Systems Principles 2003, pp.20-43.

[15] M. A. Hammad, W. G. Aref, and A. K. Elmagarmid.\Joining multiple data streams with window constraints". Computer Science Technical Reports, #02-115.

[16] J. Kang, J. F. Naughton, and S. D. Viglas. \Evaluating window joins over unbounded streams". Proc. of VLDB 2002, pp.341-352.

[17] D. Karger et al. \Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving HotSpots on the World Wide Web". Proc. of ACM SOTC 1997, pp.654-663.

[18] L. Lamport. \The part-time parliament", ACM TOCS 16.2 1998, pp.133-169.

[19] P. Mishra and M. H. Eich. \Join processing in relational databases". ACM Computing Surveys 1992,24(1), pp.63-113.

[20] L. Neumeyer et al. \S4: Distributed Stream Computing Platform". Proc. of KDCloud 2010.

[21] J. Rao, E. J. Shekita, and S. Tata. \Using Paxos to Build a Scalable, Consistent, and Highly Available Datastore". Proc. of VLDB 2011, pp.243-254.

[22] F. B. Schneider. \Implementing fault-tolerant services using the state machine approach: A tutorial", ACM Computing Surveys 22 1990, pp.299-319 (1990).

[23] D. Shasha and P. Bonnet. \Database Tuning: Principles, Experiments, and Troubleshooting Techniques".Proc. of SIGMOD 2004, pp.115-116.

[24] J. Teubner and R. Mueller. \How soccer players would do stream joins". Proc. of SIGMOD 2011, pp.625-636.

[25] J. Xie and J. Yang. \A survey of join processing in data streams". Data Streams – Models and Algorithms 2007,pp.209-236.

[26] M. Zaharia et al.\Discretized Streams: An E_cient and Fault-tolerant Model for Stream Processing on Large Clusters". Proc. of HotCloud 2012, pp.10-10.

You Might Also Like