作者:Owen Omalley 2009-8-27
原文:http://developer.yahoo.com/blogs/hadoop/posts/2009/08/the_anatomy_of_hadoop_io_pipel/
译者:phylips@bmy 2011-8-28
出处:http://duanple.blog.163.com/blog/static/70971767201172902737677/
引言
在一个典型的Hadoop MapReduce job中,通常是从HDFS上读取输入文件。为减少文件大小,文件数据通常是压缩过的,因此读取之后需要进行解压,之后得到序列化字节串,在传递给用户定义的Map函数之前再将这些字节串转换为java对象。输出则刚好是一个反向的过程,输出记录会被序列化,压缩,最终传送到HDFS上。这看起来很简单,但是因为下面的一些原因,这两个过程实际上是很复杂的:
l 压缩和解压通常是通过native libaray代码完成
l 在读写过程中通常需要计算一些端到端的CRC32校验和
l 由于各种接口上的限制,导致缓冲区管理很复杂
在这篇blog里,我们来详细的剖析下Hadoop IO Pipeline,并探索下可能的优化方式。为了让讨论更具体一些,我们以从/向gzip压缩格式的文本文件读取/写入行格式的记录为例。我们没有对DataNode端进行细节上的分析,而是重点关注客户端(map/reduce task processes)在整个Pipeline中的行为。最后,所有的描述都是基于写这篇文章时的Hadoop 0.21 trunk版本,与之相比,某些老的或者更新的版本在某些方面上可能会有区别。
Reading Inputs
图1展示了使用TextInputFormat从gzip压缩格式的文本文件中读取行记录时的I/O pipeline。首先,该图通过中间的空白线分为两部分,左边代表了DataNode进程,右边是应用程序进程(即Map task)。自底向上,根据缓冲区分配及控制的场所又划分为3个区域:内核空间,native code空间,JVM空间。对于应用程序进程部分,从左到右,是数据块依次会穿过的三个软件层。图中不同颜色的方块,代表不同种类的缓冲区。两个方块之间的箭头代表了一次数据传输或者buffer-copy。每个方块内部的标签标明了缓存区的大体位置(要么是指向该缓冲区的变量,要么是分配该缓存区的模块),同时也会尽量标出缓冲区大小。如果缓冲区大小是可配置的,那么会标明该配置项及其默认值。同时我们为每个数据传输过程标上一个编号。
Figure 1: Reading line records from gzipped text files.
1. 数据从DataNode传到Map task进程。DBlk代表文件数据块,CBlk代表文件检验块。数据内容会通过java nio的TransferTo接口(底层依赖于sendfile系统调用)直接传给客户端。校验内容需要先从DataNode的JVM buffer里取出,然后再传给客户端(图中并未表示出这个过程)。文件数据和检验数据按照如下格式捆绑在一个HDFS packet(通常是64KB)中:{packet header|checksum bytes|date bytes}。
2. 为降低对内核的系统调用次数,从socket获取的数据会被缓存在BufferedInputStream中。这实际上会引入两次buffer-copy:一,数据会从内核空间拷贝到JDK代码中的一个临时direct buffer中;二,数据会再从这个临时direct buffer拷贝到BufferedInputStream的byte[]中。BufferedInputStream的byte[]大小是通过配置项”io.file.buffer.size”来控制的,默认是4KB。在我们的产品环境中,该参数设成了128KB。
3. 通过BufferedInputStream后,校验数据会被存在内部的一个ByteBuffer里(大小大概为(PacketSize/512*4)或者是512B),文件数据(压缩的)会被存到由解压层提供的一个byte[]缓冲区中。因为校验和的计算需要一个完整的512字节的trunk,但是用户的请求可能并不是严格按照512字节对齐的,因此在将数据拷贝到用户提供的byte[]缓冲区之前,需要使用一个512字节的byte[]缓冲区对输入进行对齐。需要注意的是,根据FSInputChecker API的要求,数据还需要以512字节大小的片段为单位拷贝到缓冲区。最后,所有的校验数据会拷贝到FSInputChecker的一个4字节数组中以执行校验和的验证。总地看来,该步骤需要一次额外的buffer-copy。
4. 解压层使用一个byte[]缓冲区接收来自DFSClient层的数据。DecompressorStream会将数据从这个byte[]缓冲区拷贝到一个64KB的direct buffer中,然后调用native library代码解压数据,并将解压后的数据存放到另一个64KB的direct buffer。该步骤引入了两次的buffer-copy。
5. LineReader维护一个内部缓冲区来接收来自底层的数据。在该buffer中进行行分隔符的查找,并拷贝每行的数据形成一个Text对象。该步骤需要两次buffer-copy。
优化Input Pipeline
将所有的加起来,算上对于解压ing的数据的copy,从数据到达进程的内核缓冲区到传给Map task的map函数,整个read pipeline需要7次buffer-copy。上面的这个过程,可以进行两方面的改进:
l 很多buffer-copy只是简单地为了将数据在direct buffer到byte[] buffer间进行转换
l 校验和的验证可以在bulk级别而不是chunk级别上进行{!即可以在更大的块级别上进行,而不是512字节大小的chunk级别上,这个有些小了}
Figure 2: Optimizing input pipeline.
图2展示了优化后的视图,此时buffer-copy次数已从7降为3:
1. 一个输入packet被分拆为检验部分和数据部分,它们分别存到两个direct buffer中:一个用于存储检验内容的内部direct buffer,以及一个解压层持有的用于保存压缩的数据内容的direct buffer。
2. 解压层将解压后的数据填入LineReader持有的direct buffer。
3. LineReader扫描direct buffer中的字节串,查找行分隔符,构造Text对象。
Writing Outputs
Figure 3: Writing line records into gzipped text files.
下面来看一下write时的过程。图3,展示了Reduce task使用TextOutputFormat向gzip压缩格式的文本文件写入行格式记录的过程。类似于图1,我们为每次数据传输都设置了编号。
1. TextOutputFormat的RecordWriter是无缓冲的。当用户输入一行记录,该Text对象的字节串就会被直接拷贝到压缩层的一个64KB direct buffer中。对于一个很长的行,数据可以一次性地也可以分多次拷贝到这个64KB buffer中。
2. 压缩层每收到一行数据(或者是某个很长的行的一部分),就会调用native压缩代码。压缩后的数据会被存到另一个64KB direct buffer中。在传给DFSClient层之前,数据需要从该64KB direct buffer传到压缩层持有的内部byte[] buffer,这是因为DFSClient层只能以byte[] buffer作为输入。该buffer的大小也是通过配置项”io.file.buffer.size”来控制的。该步骤包含两次buffer-copy。
3. FSOutputSummer从压缩层的byte[] buffer中计算出CRC32校验和,然后将校验内容和数据内容捆绑放到Packet对象的一个byte[] buffer中。同样地,校验和的计算也必须是针对512字节的chunk,因此也需要一个512字节的byte[] buffer负责边界对齐。校验和在计算出来之后,在拷贝到packet之前,也是放在一个4字节的byte[] buffer中。这个步骤包含一次buffer-copy。
4. 当packet数据填满之后,它会被放入一个长度上限为80的队列中。Packet的大小是通过配置项”dfs.write.packet.size”控制的,默认是64KB。该步骤没有引入buffer-copy。
5. 有一个专门的DataStreamer负责监听该队列,当它收到一个packet时就会通过socket将它发送。Socket采用BufferedOutputStream进行了包装。但是内部的byte[] buffer很小(不超过512字节),通常会被忽略。然而,数据仍然需要拷贝到JDK代码中的一个临时的direct buffer中。该步骤包含两次buffer-copy。
6. 数据从Reduce task的内核缓冲区传送到DataNode的内核缓冲区。在数据存储到block文件及校验文件前,在DataNode端可能还会进行一些buffer-copy。与前面的DFS read情况不同,校验数据和文件数据都需要传出内核,进入到JVM这一层。其中的细节超出了我们这里要讨论的内容,图中也未标出。
优化 Output Pipeline
总的来看,上面的过程需要6次buffer-copy。我们应该如何来优化write pipeline呢:
l 可能可以减少一些buffer-copy
l 如果我们只是在输入buffer满的时候调用native压缩代码,那么它的调用次数就会大大减少
l 校验和的计算可以在bulk级别而不是chunk级别上进行
Figure 4: Optimizing output pipeline.
图4展示了优化后的情况,此时只需要4次的buffer-copy:
1. 将用户的Text对象的字节串拷贝到TextOutputFormat持有的direct buffer中
2. 一旦buffer填满,调用native压缩代码,同时直接将压缩数据传送给压缩层的direct buffer
3. FSOutputSummer为压缩层的direct buffer中的数据计算检验和。然后将校验bytes和数据bytes存入packet的direct buffer
4. 将填满的packet放入队列中,同时DataStreamer负责在后台将packet通过socket发送,此时会将数据拷贝到内核缓冲区。