分布式系统

Hive-A Petabyte Scale Data Warehouse Using Hadoop(

2011年5月24日 阅读(777)

zz from:http://blog.csdn.net/wh62592855/archive/2011/05/17/6427766.aspx

【原文】 Hive – A Petabyte Scale Data Warehouse Using Hadoop

【作者】 Facebook 数据架构组 :  Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, Zheng Shao, Prasad Chakka,         Ning Zhang, Suresh Antony, Hao Liu and Raghotham Murthy

【摘要】 应用于工业的商务智能收集分析所需的数据集正在大量增长,使得传统的数据仓库解决方案变得过于昂贵。 Hadoop 是一个流行的开源map-reduce实现,用于像yahoo, Facebook一类的公司。来存储和处理商用硬件上的大范围数据集。然而map-reduce程序模型还是处于很低级别,即需要开发者来书写客户程序,这些程序往往难于维护与重用。在这篇文论中,我们提出了Hive, 一个开源的数据仓库解决方案,建基于hadoop。Hive支持的查询是类似SQL方式的陈述语言:HiveQL 。这种查询被编译进mapreduce的job用以hadoop的执行。而且HiveQL 允许用户在查询中添加“客户map-reduce脚本插件”。这种语言包含了一个“支持包含原语类型的表的”类型系统 (a type system with support for tables containing primitive types),像数组与图这些Collection以及类似的嵌套组合数据结构。其背后的IO库可扩展以查询客户格式的数据。Hive 还包含了一个系统编目 — 元存储Metastore —包含了“有利于数据挖掘、查询优化与查询编译的”概要模式与统计(schemas and statistics)。在Facebook,,Hive 数据仓库包含了数万张表,共存储超过700TB的数据。广泛的用于每个月超过200个用户的报告以及ad-hoc分析。

(译注)ad-hoc 分析是商务智能中伴随着OLAP以及数据仓库,数据挖掘等工具的一个子主题,无需SQL以及数据模式的深度知识。通过基于用户友好GUI的系统进行无需操作,允许用户自己生成客户化查询。

I.介绍

大数据集上的可伸缩分析(Scalable analysis)是 Facebook 中数个团队,包含了工程师和非工程师,的核心功能。除开公司内分析师使用的 ad hoc 分析以及商务智能应用, Facebook 的一系列产品也基于逻辑解析(analytics)。这些产品的范围从简单的如 " 对 Facebook 的 Ad 网络的内部省察 " 报告应用,到更加高级的例如 Facebook 词典产品 。作为适合那些“千差万别的应用与用户需要的,以及适应成本有效方式应对 facebook 上不断增长的数据的”灵活基础架构的结果,这是有决定性的。 Hive 以及 Hadoop 是我们在 Facebook 用于匹配这些需求的技术。

Facebook 在 2008 年前的整个数据处理基础架构是建立于“使用商用关系数据库 RDBMS 之上的”数据仓库。我们产出的数据增长速度非常快 - 作为一个例子数据从 15TB 增长到 2007 年的 700TB 。那个时候的基础架构是如此的不能满足需求以至于日常的数据分析 job 要花掉至少一天,而且情况是一天天的坏下去。我们有迫切的新基础架构需求以适合数据的增长。作为结构我们开始使用 Hadoop 作为技术手段来应对增长的需求。事实上,对我们而言, Hadoop 已经是一个用于“拍字节尺度以及提供用于商用硬件上的可伸缩性的”开源项目。在 Hadoop 上, 以前必须用多于一天来处理完的相同 job 现在只需要短短几个小时就可以处理完。

然而, Hadoop 并不是非常适用于终端用户(end user),特别是对那些并不很熟悉mapreduce的用户而言。 终端用户必须为简单的 task 如“获取原始计数或平均值”去写 map-reduce 程序。 Hadoop 缺少通用查询语言(象 SQL 一样)的表现形式。作为这种缺乏的结果,用户必须花费数小时(如果不是数天的话)来写程序以完成整个操作(即便就简单分析而言)。 对我们非常清晰的就是,如果想加强公司的数据分析能力,使之更有生产力,那我们就必须改进 Hadoop 的查询能力。 2007 年 1 月,“让数据更靠近用户”激发了我们来建造 Hive 。 我们希望将表、   列、   分区以及 SQL 子集这些熟悉的概念带到 Hadoop 的未结构化世界,同时仍旧保持 Hadoop 的扩展性和灵活。 Hive 于 2008 年 8 月开源,从那以后由Hadoop用户根据其数据处理需求来使用开发。

从一开始, Hive 在 Facebook 内的用户群中就非常流行。今天,我们常规的在Hadoop/Hive集群上由数百用户运行数千份“从简单的加和任务到商务智能,机器学习以及支持 Facebook 产品特性等的不同应用” job 。

在接下来的几节中,我们提供了更多的有关 Hive 架构与性能的细节。节II 描述了数据模型,类型系统以及HiveQL。 节 III 关注数据如何在 Hive 表上存储,基于分布式文件系统 — HDFS ( Hadoop 文件系统)。节IV 描述了系统架构以及 Hive 里不同的组件。节V中,我们重点描述了 Facebook 中惯用的 Hive 统计以及在节VI中提供了相关工作。我们在节VII中展望了未来的工作。

II.数据模型, 系统类型与查询语言

Hive 将数据结构化到众所周知的数据库概念实体中,象表,列, 行以及分区(partitions)。它支持所有主要的原语 - integers, floats, doubles 以及 strings – 同时支持复合类型,如 map , list 以及 structs 。后者可以嵌套以形成更加复杂的类型。而且, Hive 允许用户以他们自己的类型与函数扩展系统。其查询语言非常类似于 SQL ,因此可以很容易的被熟悉 SQL 的人所理解。在数据模型中有一些细微的差别, 类型系统以及HiveQL与传统的数据库不同,且从 Facebook 中获得的经验也成为产生这些差别的一个因素。我们在这些节中会着重说明这些差异的细节。

A. 数据模型和系统类型

类似于传统的数据库, Hive 将数据存储于表中,每张表由多行组成,且每行由给定数量的列组成。 每一列有一个相关那的类型。类型不是原语类型就是复合类型。目前支持以下原语类型:

 整形 – bigint(8 字节), int(4 字节), smallint(2 字节),tinyint(1 字节). 所有整型都是有符号数
 浮点数– float(单精度), double(双精度)
 字符串

Hive 也自动支持以下复合类型:

组合数组 - map< 键类型,值类型 >
列表 - list< 元素类型 >
结构体 - struct< 文件名:文件类型,… >

这些复合类型都模板化了,因此可以组合生成任意类型。比如,list<map<string, struct<p1:int, p2:int>>表现了一个相关数组的列表,在这个相关数组中将 string 映射到一个“包含了两个整形域,称 P1,P2 ”结构体。这些都可以加和到一起,来生成表状态以生成复合预想模式的表。比如, 以下的状态生成有复合模式的表 t1 。

CREATE TABLE t1(st  string , fl  float , li list<map< string ,  struct <p1: int , p2: int >>);  
  查询表达式可以用’.’操作符来获取结构体中的字段。在关联类以及列表中的值可通过'[]’ 操作符来获取。在前一个例子中,t1.li[0]给出了列表中第一个元素,t1.li[0][‘key’]给出了“关于关联数组的 key 之”结构体。最后 该结构体的 p2 字段可通过t1.li[0][‘key’].p2获取。经过这些构建(constructs ), Hive 可以支持任意复合的结构体。

用以上方式所生成的表可用 Hive 默认提供的系列化和反序化工具来序列化以及反序列化。然而,有这样的实例:表的数据是由其它程序,甚至是由历史遗留数据提供的。 Hive 提供了灵活处理这样的数据的方法:将这些数据以无需转换的方式放入表,这样可以为大数据集节省出大量的时间。我们将在下面的节中描述这种方法,这点可以通过提供“一个 实现了称之为正反序列化 SerDe的 java 接口的” jar 包来达到。在这种情况下,类型信息同样可以经由 jar 包“通过提供相应的对象检视器 java 接口,以及经由SerDe接口提供的getObjectInspector 方法的暴露实现”提供( be provided by that jar by providing a corresponding implementation of the ObjectInspector java interface and exposing that implementation through the getObjectInspector method  present in the SerDe interface)。更多关于这些接口的细节可以在 Hive wiki 上找到。但基本的观点是任意数据格式以及其类型编码可以通过“提供包含了 SerDe 实现的 jar 包以及ObjectInspector 接口”以一种可插拔方式进入 Hive 。所有天然的SerDes以及 Hive 支持的复合类型同样是这些接口的实现。结果就是一旦合适的关联在表和 jar 之间建立起来,查询层就以标准的方式对待这些天然类型与格式。例如,加入“包含了SerDe以及ObjectInspector 接口的” jar 包到分布式高速缓存 后 的声明陈述,因此对Hadoop可用,其后继续产生客户的 SerDe 相关表。

add jar /jars/myformat.jar;  
CREATE TABLE t2;  
ROW FORMAT SERDE ‘com.myformat.MySerDe’ ;  
  要注意一点是,如果可能,表模式(table schema)也可以由“复合类型与原语类型的组合”提供。

B. 查询语言

Hive 查询语言(HiveQL)由 SQL 的子集以及一些我们发现在我们的环境中有用的扩展组成。 传统 SQL 特性象是从句子查询, 各种类型的联合如内联接、 左连接、 右连接以及外连接、笛卡尔积、 group by 、 union all , 用 select 生成表以及许多有用的基于原语和复合类型的函数功能使得这个语言非常象 SQL 。 就像先前提到的许多构建(constructs)就是非常类似 SQL 。 这使得任何熟悉 SQL 的人得以立即上手 hive 命令行界面 cli 和开始查询系统。 系统也提供了象是显示表以及描述一类的“有用中间数据浏览能力”,追对查询计划的解释计划能力 ( 尽管计划看上去非常不同于你从传统关系数据库 RDBMS 中看到的 ) 亦如此。(  Useful metadata browsing capabilities like show tables and describe are also present and so are explain plan capabilities to inspect query plans ) 有一些限制如,仅等式谓词(equality predicates)在连接谓词时是支持的,连接 join 必须用 ANSI 的 join 语法来说明,如:

SELECT t1.a1  as  c1, t2.b1  as  c2  
FROM t1 JOIN t2 ON (t1.a2 = t2.b2);  
  替代了更传统的查询形式:

SELECT t1.a1  as  c1, t2.b1  as  c2  
FROM t1, t2  
WHERE t1.a2 = t2.b2;  

 另外一个限制是插入 insert 是如何完成的。 Hive 目前并不支持插入已存在的表或者数据分区,且所有的插入会重写已经存在的数据。相应的,在我们的语法中做了显式的说明:

INSERT OVERWRITE TABLE t1  
SELECT * FROM t2;  
  在现实中这些限制还没有成为过问题。我们很少看到这样的情况:查询不能被表达为等式 / 连接(equi-join)形式;并且自从大多数数据以天或小时的速度装载到我们的数据仓库,我们只是简单的将这些数据装载到为该天或者该小时生成的表的新分区中。然而我们确实意识到随着更高的装载频率,分区的个数会变得非常的大。这就要求我们来实现 INSERT INTO 语义(semantics)。 Hive 中缺乏INSERT INTO,UPDATE以及DELETE ,从另一方面而言允许我们以非常简单的机制来应对读写并行而无需实现复杂的锁协议。

除这些限制之外,HiveQL 还有扩展以支持“用户用 map-reduce 程序表达的,或者用他们所选择的程序语言表达的”分析。 这允许高端用户用 mapreduce 术语(以无缝的可插拔方式合成进 HiveQL )表达复杂逻辑。有时候这是唯一的可行方式。如,用户想使用“已有的 python 或 php 或任何其它语言库的情形下”,进行数据转换。例如,在某文档上的一个表中标准单词计数的例子可以用mapreduce由以下方式表达:

FROM (  
MAP doctext USING ‘python wc_mapper.py’  AS (word, cnt)  
FROM docs  
CLUSTER BY word  
) a  
REDUCE word, cnt USING ‘python wc_reduce.py’ ;  
  如在上例中显示的 MAP 从句,表明“输入列”(该例子中是doctext)如何通过“一用户程序(该例子中是python wc_mapper.py脚本)”放入“输出列”( word 与 cnt )而被转换的。子查询中的CLUSTER BY从句定义了在采用输出列将数据分布到 reducer 上,以及最后的REDUCE从句定义了用户脚本(本例中是 Python w c_reduce.py)以唤醒子查询上的输出列。有时在 map 和 reduce 之间分布的标准需要对 reducers 提供数据,就像它已经在列的集合上排过序了(而不是象以前做分布时那样)。作为这种情况的一个例子是:会话中的所有动作需要按时间排序。 Hive 提供DISTRIBUTE BY从句以及SORT BY从句来完成它,就像下面的示例:

FROM (  
FROM session_table  
SELECT sessionid, tstamp, data  
DISTRIBUTE BY sessionid SORT BY tstamp  
) a  
REDUCE sessionid, tstamp, data USING ‘session_reducer.sh’ ;  
  要注意,上例中没有 map 从句,说明了输入列没有被转换过。类似的,在“ reduce 阶段并不并不执行任何数据转换的”情况下,可能有“没有REDUCE从句的”MAP从句;出现于SELECT 从句之前的FROM 从句是与标准 SQL 语法相异的另外一点。 Hive 允许用户在给定子查询中交换FROM 与SELECT/MAP/REDUCE从句的次序。在处理多个插入时,这就变得非常有用以及形象。HiveQL 支持,作为相同查询的一部分,将不同的转化结果插入到不同的表、分区、 hdfs 或本地目录。这帮助了降低输入数据上的扫描数,如下示例:

FROM t1    
INSERT OVERWRITE TABLE t2    
SELECT t3.c2, count(1)    
FROM t3    
WHERE t3.c1 <= 20    
GROUP BY t3.c2   
////////   
INSERT OVERWRITE DIRECTORY ‘/output_dir’     
SELECT t3.c2, avg(t3.c1)    
FROM t3    
WHERE t3.c1 > 20 AND t3.c1 <= 30    
GROUP BY t3.c2    
///////   
INSERT OVERWRITE LOCAL DIRECTORY ‘/home/dir’     
SELECT t3.c2, sum(t3.c1)    
FROM t3    
WHERE t3.c1 > 30    
GROUP BY t3.c2;    

   该例中表 t1 的不同部分聚合起来(aggregated)并用于生成表 t2, 一个 hdfs 目录(输出目录/output_dir)以及一个本地目录(用户机器上的 /home/dir)。

III. 数据存储,SERDE 和文件格式

A. 数据存储

在 Hive 中,表是一个逻辑数据单元,所以表元数据将表中的数据关联到 hdfs 目录。最基本的数据单元以及它们在 hdfs 名字空间中的映射如下:

表( Tables ) : 一张表存储于 hdfs 的一个目录。
分区(Partitions):   表的分区存储于包含于表目录内的子目录。
桶(Buckets )   - 桶存储于这样的文件:带有分区或依赖表是否为分区表的表目录。

作为一个例子,一张叫test_table的测试表映射到 hdfs 中的<仓库根目录>/测试表。 仓库根目录是由 hive 制定的。metastore.warehouse.dir配置参数在hive-site.xml中。默认的参数值被设置为/user/hive/warehouse 。

一张表无论分区与否都可。一张分区表可由“CREATE TABLE状态下的PARTITIONED BY从句”生成,如下:

CREATE TABLE test_part(c1  string , c2  int )  
PARTITIONED BY (ds string , hr  int );  

 上面的例子显示表分区将存储到 hdfs 的 /user/hive/warehouse/test_part目录。分区存在于每一独特的由用户指定的ds值和hr值中。注意到分区列并不是表数据的一部分,且分区列的值在该分区(它们也同样存在于表元数据中)的路径上被编码一个新的分区可以通过INSERT状态或在表上加一个分区的ALTER状态来生成。两者的形式如下:

INSERT OVERWRITE TABLE  
test_part PARTITION(ds=’2009-01-01′ , hr=12)  
SELECT * FROM t;  
////////   
ALTER TABLE test_part  
ADD PARTITION(ds=’2009-02-02′ , hr=11);  
  对test_part表添加一个新的分区。INSERT状态同样在来自表 t 的数据分区中占据了一定角色,与此同时,alter 表却生成了一个空的分区。这两种状态都以生成相应的目录结尾

 

/user/hive/warehouse/test_part/ds=2009-01-01/hr=12

/user/hive/warehouse/test_part/ds=2009-01-02/hr=11

 

这些目录存放于 hdfs 目录的表中。该方法没有任何并发症影响,即便在分区值包含了以下一些字符情况下:这些字符用于hdfs 以指示目录结构,但是这些字符的完全溢出确实会生成一个与 hdfs 相容的目录名( but proper escaping of those characters does take care of a producing an hdfs compatible directory name)。

Hive 编译器可以应用这些信息来删减“为了评估查询而需要扫描数据的”目录。以test_part表为例,查询是:

SELECT * FROM test_part WHERE ds= ‘2009-01-01’ ;  

 将只扫描所有属于/user/hive/warehouse/test_part/ds=2009-01-01 目录的文件,并且以下查询

SELECT * FROM test_part  
WHERE ds=’2009-02-02′  AND hr=11;  
 仅扫描所有属于属于/user/hive/warehouse/test_part/ds=2009-01-01/hr=12目录的文件。

 

译注:疑为原文输入错误,当扫描ds=2009-02-02/hr=11目录的文件

 

修改数据在处理查询时,带来一重要的影响。在许多方面,分区模式类似许多数据库厂商就分区已经做的(In many respects this partitioning scheme is similar to what has been referred to as list partitioning by many database vendors )【参看: mysql 列表分区 】, 但是在“有键存储的分区值”与“元数据存储的分区值”两者间是有差异的。(there are differences in that the values of the partition keys are stored with the metadata instead of the data)

Hive 用到的另外一个存储单元概念是桶(Buckets)。一个桶是是一个文件,该文件存在于一分区或一张表的页层次目录(leaf level directory of a table) 。在该表被生成时,用户可以指定所需桶的数量以及指定用于换进换出数据的列(the column on which to bucket the  d ata) 。在目前的实现下,该信息用于删减在“用户在样本数据上运行查询”情况下的数据。例如,一张被分成 32 个桶的表,可以通过“选择查看第一个桶中的数据”来快速生成 1/32 样本。 类似的,以下状态:

SELECT * FROM t TABLESAMPLE(2 OUT OF 32);  

 将扫描由第二个桶提供的数据。注意到“保证桶文件被适当的创建与命名”是应用的义务,而且HiveQL 数据描述语言 DDL 当前不会试着去对数据以“与表属性想兼容的方式”方式桶化(HiveQL DDL statements do not currently try to bucket the data in a way that it becomes compatible to the table properties )。因此,桶信息应当小心的使用。

尽管相应于一张表的数据总是存储于 hdfs 的<warehouse_root_directory>/test_table位置, Hive 总允许用户查询存储于 hdfs 的其它地址上的数据。这可以通过下列的  EXTERNAL TABLE 从句来获取:

CREATE EXTERNAL TABLE test_extern(c1  string , c2  int )  
LOCATION ‘/user/mytables/mydata’ ;  
  这样就允许用户将test_extern声明为每一行由两列 c1,c2 组成的外部表。数据文件存储于 hdfs 的位置是:/user/mytables/mydata。注意到由于没有已定义的客户SerDe,就默认假设为 Hive 的内部数据格式。外部表和普通表仅在一点上不同:作用于外部表的drop命令仅仅是 drop 掉表元数据而没有删任何数据。就普通表上的 drop 操作而言,相关该表的数据就被 drop 掉了。

B. 序列化 / 反序列化 Serialization/Deserialization (SerDe)

  如前所述, Hive 可以对用户提供的SerDe java 接口进行实现,并将之关联到表或分区。作为结果就是:可以容易的检索以及解释客户数据格式。 Hive 中默认的SerDe实现称之为 惰性正反序列化(LazySerDe)   – 它将行“惰性的”反序列化到内部对象,所以仅当“在查询表达式中需要该行的列”时,一个列上的会发生反序列化开销(it deserializes rows into internal objects lazily so that the cost of deserialization of a column is incurred only if the column of the row is needed in some query expression.)惰性正反序列化假设数据存储在“行rows被新的一行newline (ascii 码 13 )消除掉的”表,同时在行中的列被 ctrl-A 消除掉(ascii 码 1 )。这种SerDe可用任何列之间的分割 符 delimiter读取数据。例子如状态:

CREATE TABLE test_delimited(c1  string , c2  int )  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ‘/002’   
LINES TERMINATED BY ‘/012’ ;  
 

CREATE TABLE test_delimited(c1  string , c2  int )  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ‘/002’   
LINES TERMINATED BY ‘/012’ ;  
以上说明了表test_delimited的数据用ctrl-B(ascii 码 2 )作为列分割 字 符 ,并用 ctrl-L作为行分割符(ascii 码 12 )。而且 可以用分隔符给序列化键定界,以及 maps 的值与不同的分隔符可用于界定不同的列表元素(delimit the various elements of a list)。这点由以下示例说明:
CREATE TABLE test_delimited2(c1  string ,c2 list<map< string ,  int >>)  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ‘/002’   
COLLECTION ITEMS TERMINATED BY ‘/003’   
MAP KEYS TERMINATED BY ‘/004’ ;  
  除惰性正反系列化外,发布版中提供的hive_contrib.jar包还提供了其他有趣的正反序列化SerDes。特别有用的一个是“正则正反序列化” (RegexSerDe),它允许用户定义一正则表达式从一行中解析出不同的列。以下的状态可以作为例子来分析apache 日志:

add jar  ‘hive_contrib.jar’ ;  
CREATE TABLE apachelog(  
host string ,  
identity string ,  
user string ,  
time string ,  
request string ,  
status string ,  
size string ,  
referer string ,  
agent string )  
ROW FORMAT SERDE  
‘org.apache.hadoop.hive.contrib.serde2.RegexSerDe’   
WITH SERDEPROPERTIES(  
‘input.regex’  = ‘([^ ]*) ([^ ]*) ([^ ]*) (-|//[[^//]]*//]) ([^  
/"]*|/"[^/"]*/") (-|[0-9]*) (-|[0-9]*)(?: ([^ /"]*|/"[^/"]*/" ) ([^  
/"]*|/"[^/"]*/" ))?’,  
‘output.format.string’  = ‘%1$s %2$s %3$s %4$s %5$s %6$s  
%7$s %8$s %9$s’);  

 input.regex属性是应用于每条记录的正则表达式,output.format.string提示了列字段是如何从“匹配正则表达式的组 group ”构建的。该例同样显示了任意的键值对是如何用“WITH SERDEPROPERTIES从句”传递到正反序列化serde 的。这是一种相当有效的机制(capability )将任意参数传递到客户的SerDe中。

C. 文件格式

Hadoop 文件可以由不同的格式存储。 Hadoop 中的文件格式说明了如何存储一条记录于文件中。比如文本格式存储于文本输入格式(TextInputFormat ),同时二进制文件可以存储于序列文件输入格式(SequenceFileInputFormat)。用户同样可以指定他们自己的文件格式。 Hive 不在文件类型的数据存储输入格式上强加任何约束。当表被生成时,格式被说明了。除了上述的两种格式, Hive 同样提供了 RCF 上输入格式(RCFileInputFormat ),这是一种以列为导向的(column oriented manner)存储数据格式。这种组织形式可以提供重要的性能提升,特别是就“不能达到表中所有列的查询”而言。用户可以加入他们自己的文件格式,同时将它们用表组织起来,如下所示:

CREATE TABLE dest1(key INT, value STRING)  
STORED AS  
INPUTFORMAT  
‘org.apache.hadoop.mapred.SequenceFileInputFormat’   
OUTPUTFORMAT  
‘org.apache.hadoop.mapred.SequenceFileOutputFormat’   
  STORED AS从句说明了用于决定“存在于表或分区目录的文件输入输出格式“的类。这些类可以是任何实现了文件输入格式FileInputFormat 与文件输出格式FileOutputFormat java 接口的类。 这些类可以由 Hadoop 中的 jar 包以类似于“在添加用户到SerDes时的示例方式”的方式提供。

IV. 系统体系结构与组件 SYSTEM ARCHITECTURE AND COMPONENTS Hive体系结构

图一 Hive系统架构风格

以下的组件是构成 Hive 的主要组件:

元存储(Metastore )- 存储“系统目录以及关于表、列、分区等的元数据”的组件。
 驱动(Driver )- 控制 HiveQL 生命周期的组件,当 HiveQL 查询穿过 Hive 时。(The component that manages the lifecycle of a HiveQL statement as it moves through Hive.  译注:It 似应指HiveQL )。该驱动同事同事管理着会话句柄以及任何会话的统计。
查询编译器(Query Compiler) - 是一个组件,将HiveQL编译成有向无环图 (directed acyclic graph, DAG)形式的map/reduce任务。
执行引擎  Execution Engine  -   是一个组件,依相依性顺序(dependency order)执行由编译器产生的任务。
Hive 服务器 HiveServer    - 一个提供“健壮的接口(thrift interface )、 JDBC/ODBC 服务器以及提供一种整合 Hive 和其它应用的”组件。
客户端组件 -类似命令行接口CLI(Command Line Interface), web UI 以及 JDBC/ODBC驱动。
包含了正反序列化(SerDe)以及对象观察器(ObjectInspector )接口的可扩展接口,类似于前述用户定义函数 UDF (User Defined Function)以及用户定义聚合函数UDAF(User Defined Aggregate Function)接口,允许用户定义自己的列函数。

HiveQL 状态是通过“命令行接口CLI、web UI或是用 thrift 、 odbc 或 jdbc 接口的的外部客户”提交的。驱动器首先将查询传送到“穿过典型的分析器”的编译器(The driver first passes the query to the compiler where it goes through the typical parse),运用“存储于元存储(Metastore)的”元数据进行类型检查与语法分析阶段。编译器生成一逻辑方案(logical plan),然后通过基于优化器的简单规则得到优化。最终优化后的方案是以有向无环图DAG数据结构形式展现的map-reduce任务,并产生一个hdfs任务。执行引擎然后用 Hadoop 以“它们的依赖顺序”执行这些任务。

在这一节我们提供有关元存储,查询编译器以及执行引擎的更多细节。

A. 元存储 (Metastore)

元存储就Hive而言,扮演了一个系统目录的角色。它存储了“所有关于表,它们的分区,模式,列极其类型,表地址(table locations)等的”信息。该信息可以用thrift接口来查询以及修改【参看:Thrift 软件框架 】,从而可以从客户端以不同的编程语言来调用。由于该信息需要快速的提供到编译器中,我们选择用传统关系数据库RDBMS存储该信息。元数据因此变为一个用 RDBMS 运行的应用,并用到“一开源的对象关系映射 ORM 层:DataNucleus 【参看  http://www.datanucleus.org/ 】”,来转换代理对象( convert object representations into )到相关模式,反之亦然。我们以截然不同于“存储该信息于 hdfs 的”方法,而选择该方法( 译注:何种方法? )。那是因为我们很少用到元存储(we need the Metastore to be very low latency)。DataNucleus 层允许我们以可插拔形式运用诸多 RDBMS 技术。在我们 Facebook 的相关运用中,我们采用 mysql 存储这些信息。

B. 查询编译器 (Query Compiler)

查询编译器用元存储中的元数据存储来生成执行计划,类似于传统数据库的编译器, Hive 编译器如以下步骤处理HiveQL:

解析(Parse )– Hive 用称为Antlr的解析器成器来生成用于查询的抽象语法树 AST 。
类型检查与语法分析(Type checking and Semantic Analysis )– 在该阶段,编译器取得“来自元数据的所有输入与输出表”信息,并用该信息生成逻辑计划。在该阶段将检查表达式的类型一致性,以及标记编译时语法错误。通过一个叫做查询块树 (Query Block Tree) 的媒介陈述方式,进行中从抽象语法树AST到有向无环图DAG的转换操作。编译器把查询嵌套转化到查询块树(QB tree)中父子节点关系。同时,查询块树也起到以下作用:组织抽象语法树 AST 的相关部分,形成“较之vanilla抽象语法树,更易于转化到有向无环图 DAG 操作的”格式。
优化(Optimization) – 优化逻辑由诸如操作“产生于上一个传输结束并作为下一个传输的输入时的有向无环图 DAG ”的传输链组成。任何想要改变编译器或者想要增加新的优化逻辑的人可以很容易的通过“将转换的实现作为Transform接口的扩展,以及将之附加到优化器的转换链”实现这一点 ( by implementing the transformation as an extension of the Transform interface and adding it to the chain of transformations in the optimizer ) 。“转换逻辑”(transformation logic)典型的由“在有向无环图上遍历”构成(comprises of a walk on the operator DAG )。比如当相关条件或者规则满足时,针对 DAG 的特定操作。五个关系到转化(transformation )的基本接口是:节点(Node),图形巡查(GraphWalker),分发器Dispatcher,规则Rule ,处理器Processor。 有向无环图中的节点由 Node 节点接口实现。这就允许 DAG 操作由上述提及的其余接口实现。一个典型的转换包括了对每一个访问的节点巡视 DAG (involves walking the DAG and for every Node visited ),检查是否Rule 规则得到了满足,然后,如果得到满足的话,唤醒该规则相应的处理器Processor。分发器管理者从规则到处理器的映射,以及执行匹配的规则(does the Rule   matching)。它被传递到图形巡查,所以相应的处理器可以当“在巡查中,节点被访问到时”被分发。图二中的流程图如何构建一典型的传输。

转换流程图

Hive 目前完成了下列转换,作为优化阶段的一部分:

列剪辑(Column pruning) – 该优化步骤确保了“在查询处理中唯一需要的列确实”确实从行中投射出去了(ensures that only the columns that are  needed in the query processing are actually projected out of the row)。
 后进先出的谓语 (Predicate pushdown)–如果可能, 谓语以后进先出的方式扫描;从而行可以在处理时先过滤掉(   Predicates are pushed down to the scan if possible so that rows can be filter early in the processing)。
分区剪辑(Partition pruning)–在分区后的列上的谓语,用于压缩(prune out)分区上不满足谓语的文件。
Map 端的连接(Map side joins)– 在连接中涉及一些很小的表的例子中,小的表在所有的mappers 中与其他表连接时是重复的( T he small tables are replicated in all the mappers and joined with other tables)。由查询中的一些暗示来触发行为,格式如下:
SELECT  /*+ MAPJOIN(t2) */  t1.c1, t2.c1  
FROM t1 JOIN t2 ON(t1.c2 = t2.c2);  
一定数量的参数来控制“用于mapper 来掌控重复表内容的”内存数量。有 hive.mapjoin.size.key以及hive.mapjoin.cache.numrows控制“任何时间保存在内存中的”表中行的数量,以及提供给系统联合键的大小。 
连接再排序(Join reordering)   –   当较小的表在内存中保存时,较大的表并不构建在reducer的内存中,而是被流化了(The larger tables are streamed)。 这保证了连接操作在reducer 端不会溢出内存的限制。

且如MAPJOIN 示例, 用户也可以提供示例( hints )或参数集来做以下执行:

i. 数据再分区以把控GROUP BY形成的非对称(skews)–许多现实世界的数据集,在 用通常查询的 GROUP BY从句时,有一个 列上的幂率分布 ( power law distributon )。 这种情况下,对“分布数据在group by列上,然后在reducer 中聚合”常用的计划并不能很好的工作,那是因为大多数数据都在非常少的reducers上取得以及发送( the usual plan of distributing the data on the group by columns and then aggregating in the reducer does not work well as most of the data gets sent to a very few reducers)。这种情况的一个更好的计划是用两个map/reduce阶段来计算聚合( use two map/reduce stages to compute the aggregation)。第一个阶段,数据随机分布(或者分布在DISTINCT 列,如果是唯一聚合 [ Distinct aggregations ] 的话到reducers ,同时计算局部聚合。 这些局部聚合然后分布在GROUP BY列,在第二个map/reduce 阶段到reducers(由于相对于基础数据集,局部聚合元组是非常小的,这种方法典型的将产生更好的性能。在 Hive 中,这种行为可以通过用以下方式设置参数来触发:

set  hive.groupby.skewindata= true ;  
SELECT t1.c1, sum(t1.c2)  
FROM t1  
GROUP BY t1;  
  ii.mappers 中的基于哈希的局部聚合 – 基于哈希的局部集合可以显著的降低由“mappers 到reducers”发送的数据(量)(Hash based partial aggregations can potentially reduce the data that is sent by the mappers to the reducers)。这会反过来减少花费在排序以及归并这些数据所用的时间量。许多性能的提升可以通过这种策略来获取。Hive 允许用户控制“用于mapper以控制哈希表内的行数,以进行此种优化”这样一种内存的数量。参数hive.map.aggr.hash.percentmemory 说明了mapper 内存中可用于把控哈希表那部分的数量。如, 0.5 能确保哈希表大小一旦超过用于mapper的最大内存的一半,存储在那儿的部分聚合就被发送到 reducers 了。hive.map.aggr.hash.min.reduction参数同样也用来控制用于mappers的内存数量。

产生物理计划(Generation of the physical plan) –在优化阶段的最后产生的逻辑计划,然后就分裂为多个map/reduce任务以及hdfs任务。作为一个例子, 非对称数据(skewed data)上的group by可生成两个 m ap/reduce任务,并跟随一最终的hdfs 任务(该任务用于将结果放到 hdfs 中正确的地)。这个阶段的最后,物理计划看上去像一个有向无环图DAG形式的任务图,其中每一个任务封装了计划的一部分。

我们展示一个多表插入查询及与之相应的物理计划(在所有优化之后): 

FROM (SELECT a.status, b.school, b.gender  
           FROM status_updates a JOIN profiles b  
                  ON (a.userid = b.userid  
                         AND a.ds=’2009-03-20′  )) subq1  
//////////   
INSERT OVERWRITE TABLE gender_summary  
                                     PARTITION(ds=’2009-03-20′ )  
SELECT subq1.gender, COUNT(1)  
GROUP BY subq1.gender  
//////////   
INSERT OVERWRITE TABLE school_summary  
                                      PARTITION(ds=’2009-03-20′ )  
SELECT subq1.school, COUNT(1)  
GROUP BY subq1.school  

 该查询接在两个不同的聚合后有一个唯一连接。通过将查询描绘成多表插入(By writing the query as a multi-table-insert ), 我们确认只进行了一次连接。查询的计划如下图 3 所示。

插入map-reduce工作后的多表查询计划

图三:插入三个map/reduce jobs的查询之后的多表查询计划

计划中的这些节点是物理操作符,边代表了两个操作符之间的数据流动( flow )。 每个节点中最后一行表示了该操作符的输出模式。由于文章空间所限,我们不描述在每个操作符节点内部定义的参数。该计划有三个map-reduce job 。

在同一个map-reduce job 中,在重分配操作符(repartition operator :ReduceSinkOperator)下面的操作树(operator tree)部分由 mapper 执行,以上的部分由reducer执行。重分配操作符本身由执行引擎来实施。

注意到第一个map-reduce job 写了两个临时文件到HDFS,tmp1 与 tmp2,它们各自由第二个与第三个map-reduce job 使用。 因此,第二个和第三个map-reduce任务要等待第一个map-reduce job完成。

C. 执行引擎( Execution Engine)

最终任务以它们的依赖序列来执行(executed in the order of their dependencies)。每个有依赖的任务仅在“所有的前置依赖执行之后”被执行。 一个 map/reduce任务首先将其计划的部分序列化到plan.xml文件。这个 xml 文件然后被加到该任务的 job 高速缓存,且用Hadoop,则ExecMapper和ExecReducers 的实例将会出现(instances of ExecMapper and ExecReducers are spawned using Hadoop)。 这些类中的每一个都对plan.xml反序列化,并执行有向无环图 DAG 的相关部分。最终的结果存储于一临时位置。在整个查询的最后,在 DML s 的例子下,最终数据会被移送到希望的位置。从临时位置的查询例子中,数据将被如此处理(In the case of queries the data is served as such from the temporary location)。

V. 在FACEBOOK中使用HIVE

当前我们的数据仓库有700TB数据(在解释了三路复用后, Hadoop上的原始空间达到2.1PB)( which comes to 2.1PB of raw space on Hadoop after accounting for the 3 way replication)。我们每天增加5TB(在复制后是15TB)的压缩后数据l。典型的压缩率是 1 : 7 或者更大。在任何特定的一天,多于7500 个jobs 被提交到集群,而且每天都会处理多于75TB的压缩后数据。随Facebook 网络的持续增长,我们看到了数据的持续增长。与跟随公司架构扩展的同时,集群亦要随用户的增长而扩展。

工作量的一大半由ad hoc 查询占用,其余的则由面板报告(for reporting dashboards)占用。 Hive 之所以允许这种类型的“使用于Facebook的Hadoop集群上”工作量,是由于“使得adhoc 分析可以完成”这样一种简单性。然而,正是由于adhoc job 的不可预测性(unpredictability ),由adhoc 用户共享同一个资源以及报告用户形成了一大操作上的难点。许多次情况下这些jobs 没有适当的调优,以至于耗费了有价值的集群资源。这就反过来降低了报告查询的性能(这些报告查询是注重时间的 ] )。资源排产(Resource scheduling)在Hadoop 里一定程度上有点弱化。当下唯一可见的解决方案似乎是“为 ad hoc 查询以及报告查询各自保持独立的集群”。

同样有许多不同的“在Hive中日常运行的 job ”。这些 job 的范围从“简单的加和 job 生成不同类型的 rollups and cube ”到更加高级的机器学习。系统由新手使用同时也由高端用户使用,新用户可以立即使用该系统或者是在一个小时的新手训练后。

高频率的使用(heavy usage)的一个后果会导致数据仓库生成许多表,这反过来惊人的增加对数据探索工具(data discovery tools)的需要量。特别是对新用户而言。总体而言,系统允许我们以“花费只是,较之更传统的数据仓库体系架构(infrastructure)的,一部分”的方式对工程师和分析师提供数据处理服务。而且Hadoop 有在数千个商业节点上扩缩的能力(the ability of Hadoop to scale to thousands of commodity nodes ),这就给我们一种信心:我们可以很好的扩展该架构 (scale this infrastructure going forward )。

VI. 相关工作

最近有许多关于拍字节范围数据处理系统的工作,有商业的也有开源的。 Scope 是一个类SQL的语言,构筑于微软专利Cosmos   map/reduce与分布式文件系统。 Pig 允许运用写出陈述式脚本来处理数据。 Hive 和这些系统都不同,因为 Hive 提供了一个“保存关于表的元数据于系统中的”系统目录。这就允许 hive 实现传统数据仓库的功能,可以和标准的报表工具(像是微策略MicroStrategy )通过接口互联。HadoopDB 重用了大多数的Hive系统,除了一点以外:“它在每个节点上使用传统数据库实例来存储数据,代替了使用分布式文件系统”。

VII. 总结与前景展望

Hive 是一个在继续前进中的工作。它是一个开源的项目,并由Facebook 以及其他的外部贡献者提供活跃运行的维护。

HiveQL 目前仅接受SQL 的一个子集作为有效的查询。我们工作的方向是致力于把HiveQL作为SQL语法的一部分。 Hive 目前有一个天然的,配以小数量的简单规则的,基于规则的,优化器。我们计划构建基于成本的优化器与适应性优化器技术,以适合更多的“有效率的”计划。我们探索了柱状存储以及更智能的数据放置(placement ),以提升扫描性能。我们运行基于【大范围数据分析方法之比较 】的性能基准线,来衡量我们相对于其他系统而言的进度。在我们初步的实验中,相较于【 大范围数据分析方法之比较 】,我们已经可以提升Hadoop 自身的性能达 20% 。这中提升包含了使用更加快的Hadoop 数据结构来处理数据,比如,用文本(Text)来代替字符串(String)。 “在HiveQL 中很容易表达的”相同的查询,相较于我们优化后的Hadoop 实现有超过 20% 的提升( the same queries expressed easily in HiveQL had ~20% ),例如,从【 大范围数据分析方法之比较 】得知,Hive’s 的性能与Hadoop 代码是处于同等水平。我们同样运行了工业标准的决策支持基准TPC-H 。基于这些实验,我们已经辨认出数个使性能得以提升的领域,并在其上展开了工作,详见【 Hive性能基准 】以及【在 Hive  上运行 TPC-H查询 】。我们强化了Hive 的JDBC 和ODBC 驱动器,以与“只与传统商业关系形数据仓库衔接的”商业 BI 工具整合。我们探索了多查询优化技术的方法,以及在单个map-reduce job 上施行通用 n 路连接(n-way joins)。

致谢

我们谢谢使用者和开发者社区的贡献,特别要感谢Eric黄, 贾云涛,何勇强,爱德华卡皮罗与Dhruba Borthakur

(译注) 致谢中提及好几个中国人,暂且音译之, 值得中国程序员为之自豪

(完)英文版:http://infolab.stanford.edu/~ragho/hive-icde2010.pdf

You Might Also Like