分布式系统

The Anatomy of Hadoop IO Pipeline(译)

2011年8月29日 阅读(609)

作者:Owen Omalley 2009-8-27

原文:http://developer.yahoo.com/blogs/hadoop/posts/2009/08/the_anatomy_of_hadoop_io_pipel/

译者:phylips@bmy 2011-8-28

出处:http://duanple.blog.163.com/blog/static/70971767201172902737677/

引言 

在一个典型的Hadoop MapReduce job中,通常是从HDFS上读取输入文件。为减少文件大小,文件数据通常是压缩过的,因此读取之后需要进行解压,之后得到序列化字节串,在传递给用户定义的Map函数之前再将这些字节串转换为java对象。输出则刚好是一个反向的过程,输出记录会被序列化,压缩,最终传送到HDFS上。这看起来很简单,但是因为下面的一些原因,这两个过程实际上是很复杂的:

l  压缩和解压通常是通过native libaray代码完成

l  在读写过程中通常需要计算一些端到端的CRC32校验和

l  由于各种接口上的限制,导致缓冲区管理很复杂

 在这篇blog里,我们来详细的剖析下Hadoop IO Pipeline,并探索下可能的优化方式。为了让讨论更具体一些,我们以从/向gzip压缩格式的文本文件读取/写入行格式的记录为例。我们没有对DataNode端进行细节上的分析,而是重点关注客户端(map/reduce task processes)在整个Pipeline中的行为。最后,所有的描述都是基于写这篇文章时的Hadoop 0.21 trunk版本,与之相比,某些老的或者更新的版本在某些方面上可能会有区别。

    Reading Inputs

图1展示了使用TextInputFormat从gzip压缩格式的文本文件中读取行记录时的I/O pipeline。首先,该图通过中间的空白线分为两部分,左边代表了DataNode进程,右边是应用程序进程(即Map task)。自底向上,根据缓冲区分配及控制的场所又划分为3个区域:内核空间,native code空间,JVM空间。对于应用程序进程部分,从左到右,是数据块依次会穿过的三个软件层。图中不同颜色的方块,代表不同种类的缓冲区。两个方块之间的箭头代表了一次数据传输或者buffer-copy。每个方块内部的标签标明了缓存区的大体位置(要么是指向该缓冲区的变量,要么是分配该缓存区的模块),同时也会尽量标出缓冲区大小。如果缓冲区大小是可配置的,那么会标明该配置项及其默认值。同时我们为每个数据传输过程标上一个编号。

The Anatomy of Hadoop IO Pipeline(译) - 星星 - 银河里的星星

                             Figure 1: Reading line records from gzipped text files.

 1.      数据从DataNode传到Map task进程。DBlk代表文件数据块,CBlk代表文件检验块。数据内容会通过java nio的TransferTo接口(底层依赖于sendfile系统调用)直接传给客户端。校验内容需要先从DataNode的JVM buffer里取出,然后再传给客户端(图中并未表示出这个过程)。文件数据和检验数据按照如下格式捆绑在一个HDFS packet(通常是64KB)中:{packet header|checksum bytes|date bytes}。

2.      为降低对内核的系统调用次数,从socket获取的数据会被缓存在BufferedInputStream中。这实际上会引入两次buffer-copy:一,数据会从内核空间拷贝到JDK代码中的一个临时direct buffer中;二,数据会再从这个临时direct buffer拷贝到BufferedInputStream的byte[]中。BufferedInputStream的byte[]大小是通过配置项”io.file.buffer.size”来控制的,默认是4KB。在我们的产品环境中,该参数设成了128KB。

3.      通过BufferedInputStream后,校验数据会被存在内部的一个ByteBuffer里(大小大概为(PacketSize/512*4)或者是512B),文件数据(压缩的)会被存到由解压层提供的一个byte[]缓冲区中。因为校验和的计算需要一个完整的512字节的trunk,但是用户的请求可能并不是严格按照512字节对齐的,因此在将数据拷贝到用户提供的byte[]缓冲区之前,需要使用一个512字节的byte[]缓冲区对输入进行对齐。需要注意的是,根据FSInputChecker API的要求,数据还需要以512字节大小的片段为单位拷贝到缓冲区。最后,所有的校验数据会拷贝到FSInputChecker的一个4字节数组中以执行校验和的验证。总地看来,该步骤需要一次额外的buffer-copy。

4.      解压层使用一个byte[]缓冲区接收来自DFSClient层的数据。DecompressorStream会将数据从这个byte[]缓冲区拷贝到一个64KB的direct buffer中,然后调用native library代码解压数据,并将解压后的数据存放到另一个64KB的direct buffer。该步骤引入了两次的buffer-copy。

5.      LineReader维护一个内部缓冲区来接收来自底层的数据。在该buffer中进行行分隔符的查找,并拷贝每行的数据形成一个Text对象。该步骤需要两次buffer-copy。

    优化Input Pipeline

将所有的加起来,算上对于解压ing的数据的copy,从数据到达进程的内核缓冲区到传给Map task的map函数,整个read pipeline需要7次buffer-copy。上面的这个过程,可以进行两方面的改进:

l  很多buffer-copy只是简单地为了将数据在direct buffer到byte[] buffer间进行转换

l  校验和的验证可以在bulk级别而不是chunk级别上进行{!即可以在更大的块级别上进行,而不是512字节大小的chunk级别上,这个有些小了}

The Anatomy of Hadoop IO Pipeline(译) - 星星 - 银河里的星星

      Figure 2: Optimizing input pipeline.

图2展示了优化后的视图,此时buffer-copy次数已从7降为3:

1.      一个输入packet被分拆为检验部分和数据部分,它们分别存到两个direct buffer中:一个用于存储检验内容的内部direct buffer,以及一个解压层持有的用于保存压缩的数据内容的direct buffer。

2.      解压层将解压后的数据填入LineReader持有的direct buffer。

3.      LineReader扫描direct buffer中的字节串,查找行分隔符,构造Text对象。

      Writing Outputs   

The Anatomy of Hadoop IO Pipeline(译) - 星星 - 银河里的星星

       Figure 3: Writing line records into gzipped text files.

下面来看一下write时的过程。图3,展示了Reduce task使用TextOutputFormat向gzip压缩格式的文本文件写入行格式记录的过程。类似于图1,我们为每次数据传输都设置了编号。

1.      TextOutputFormat的RecordWriter是无缓冲的。当用户输入一行记录,该Text对象的字节串就会被直接拷贝到压缩层的一个64KB direct buffer中。对于一个很长的行,数据可以一次性地也可以分多次拷贝到这个64KB buffer中。

2.      压缩层每收到一行数据(或者是某个很长的行的一部分),就会调用native压缩代码。压缩后的数据会被存到另一个64KB direct buffer中。在传给DFSClient层之前,数据需要从该64KB direct buffer传到压缩层持有的内部byte[] buffer,这是因为DFSClient层只能以byte[] buffer作为输入。该buffer的大小也是通过配置项”io.file.buffer.size”来控制的。该步骤包含两次buffer-copy。

3.      FSOutputSummer从压缩层的byte[] buffer中计算出CRC32校验和,然后将校验内容和数据内容捆绑放到Packet对象的一个byte[] buffer中。同样地,校验和的计算也必须是针对512字节的chunk,因此也需要一个512字节的byte[] buffer负责边界对齐。校验和在计算出来之后,在拷贝到packet之前,也是放在一个4字节的byte[] buffer中。这个步骤包含一次buffer-copy。

4.      当packet数据填满之后,它会被放入一个长度上限为80的队列中。Packet的大小是通过配置项”dfs.write.packet.size”控制的,默认是64KB。该步骤没有引入buffer-copy。

5.      有一个专门的DataStreamer负责监听该队列,当它收到一个packet时就会通过socket将它发送。Socket采用BufferedOutputStream进行了包装。但是内部的byte[] buffer很小(不超过512字节),通常会被忽略。然而,数据仍然需要拷贝到JDK代码中的一个临时的direct buffer中。该步骤包含两次buffer-copy。

6.      数据从Reduce task的内核缓冲区传送到DataNode的内核缓冲区。在数据存储到block文件及校验文件前,在DataNode端可能还会进行一些buffer-copy。与前面的DFS read情况不同,校验数据和文件数据都需要传出内核,进入到JVM这一层。其中的细节超出了我们这里要讨论的内容,图中也未标出。

   优化 Output Pipeline

总的来看,上面的过程需要6次buffer-copy。我们应该如何来优化write pipeline呢:

l  可能可以减少一些buffer-copy

l  如果我们只是在输入buffer满的时候调用native压缩代码,那么它的调用次数就会大大减少

l  校验和的计算可以在bulk级别而不是chunk级别上进行

The Anatomy of Hadoop IO Pipeline(译) - 星星 - 银河里的星星

        Figure 4: Optimizing output pipeline.

 图4展示了优化后的情况,此时只需要4次的buffer-copy:

1.      将用户的Text对象的字节串拷贝到TextOutputFormat持有的direct buffer中

2.      一旦buffer填满,调用native压缩代码,同时直接将压缩数据传送给压缩层的direct buffer

3.      FSOutputSummer为压缩层的direct buffer中的数据计算检验和。然后将校验bytes和数据bytes存入packet的direct buffer

4.      将填满的packet放入队列中,同时DataStreamer负责在后台将packet通过socket发送,此时会将数据拷贝到内核缓冲区。

You Might Also Like