分布式系统

Sawzall原理与应用

2012年7月24日 阅读(615)

序:Sawzall的论文早在2006年就发表了,后来Google又推出了Tenzing,Dremel等数据分析系统,到了2010年就把Sawzall给开源了,项目主页:http://code.google.com/p/szl/。与Tenzing,Dremel相比, Sawzall所能做的事情还是比较有限,但是作为一种DSL,毕竟还是要比直接写MapReduce job要更易用些。本文就简单描述下其原理使用及扩展方法,转载请注明:

作者:phylips@bmy 2012-7-24

出处:http://duanple.blog.163.com/blog/static/70971767201262491314721/

1. Szl使用

1.1      简介

安装完szl后,用户就可以使用szl命令了,szl是一个可执行程序,可以用来在本地执行sawzall程序对输入数据进行处理。可以以文本文件为输入,也可以以recordio格式为输入。输入数据可以是简单的以分隔符分割的记录,也可以是保存在recordio中的proto序列化数据。szl的工作模式就像经典的awk,用户可以用它来对数据进行统计分析。

参考src/app/szl.cc的实现,用户可以实现符合自己使用场景的脚本执行框架,对自己的数据进行处理,比如可以将其移植到自己的MapReduce环境中。

1.2      实例

1.2.1 文本格式输入处理

输入数据:

1,2

2,2

2,3

2,4

3,5

3,6

Szl程序:

t: table set(100)[int] of int;

fields: array of bytes = splitcsvline(input);

index: int = int(string(fields[0]),10);

value: int = int(string(fields[1]),10);

emit t[index] <- value;

命令:

szl b.szl data -table_output t

1.2.2     protobuf格式输入处理

1.2.2.1  从proto生成szl类型定义

执行命令:protoc –plugin=/home/…/szl-read-only/src/protoc-gen-szl –szl_out=. page_meta.proto

 

出现错误:/home/…/szl-read-only/src/protoc-gen-szl: error while loading shared libraries: libicui18n.so.48: cannot open shared object file: No such file or directory

解决方案:安装icu,sudo yum install icu

设置LD_LIBRARY_PATH重新运行如下命令:

LD_LIBRARY_PATH=/usr/local/lib  protoc –plugin=/home/duanple/peile.duan/szl-read-only/src/protoc-gen-szl –szl_out=. page_meta.proto

 

可以看到在当前目录下有一个page_meta.szl的输出文件

通过如上命令可以将proto定义直接转换成szl类型定义。当然实际中,这个过程是被szl隐藏的,szl自己会在内部调用protoc和protoc-gen-szl,完成这个转换。通过szl的help信息可以看到,它有如下两个选项用来设置这两个工具的路径:-protocol_compiler和-protocol_compiler_plugin。

 

在生成proto对应的szl类型定义后,用户可以在其szl程序中直接include,来取代proto,如下所示。

1.2.2.2  Proto格式输入

Proto定义

message PageMeta {

    optional bytes UrlHash = 1;

    /** meta fields from crawler. */

    optional bytes RawUrl = 2; //also for piece_data

    optional bytes FinalUrl = 3;

}

protoc page_meta.proto –cpp_out=./

 

输入数据生成

为了能够进行实验,首先我们需要一组输入数据,在这里我们利用utilities/recordio.cc中的RecordWriter类写入proto记录。通过这个实验,再结合src/app/szl.cc我们就可以比较清楚的了解到szl的整个执行原理。

 

#include "page_meta.pb.h"

using namespace std;

#include "src/public/recordio.h"

using namespace sawzall;

int main()

{

    PageMeta meta;

    meta.set_rawurl("http://www.baidu.com");

    string outputFile = "data";

    string outputRecord = meta.SerializeAsString();

    RecordWriter* writer = RecordWriter::Open(outputFile.c_str());

    writer->Write(outputRecord.c_str(), outputRecord.size());

    writer->Write(outputRecord.c_str(), outputRecord.size());

    writer->Write(outputRecord.c_str(), outputRecord.size());

    delete writer;

    return 0;

}

 

g++ proto_writer.cpp page_meta.pb.cc -lprotobuf -lpthread –lszl

LD_LIBRARY_PATH=/usr/local/lib  ./a.out

这样就会在当前文件夹下生成一个data文件

Szl程序

统计Record个数

proto "page_meta.proto"

meta: PageMeta = input;

count: table sum of int;

emit count <- 1;

 

命令

LD_LIBRARY_PATH=/usr/local/lib szl proto.szl –protocol_compiler=/usr/bin/protoc –protocol_compiler_plugin==/home/…/szl-read-only/src/protoc-gen-szl –table_output=count -use_recordio data

 

执行结果

count[] = 3

 

Szl程序

统计RawUrl字段总长度

proto "page_meta.proto"

meta: PageMeta = input;

count: table sum of int;

emit count <- len(string(meta.rawurl));

 

执行结果

count[] = 60

 

Szl程序

使用include取代proto语句

include "page_meta.szl"

meta: PageMeta = input;

count: table sum of int;

emit count <- len(string(meta.rawurl));

 

执行结果

count[] = 60

  

2. Sawzall框架原理

Sawzall的核心概念有两个:记录处理,聚合。即一次处理一条记录,将结果进行聚合。

2.1      核心类

SzlEmitter,EmitterFactory,SzlTabWriter,SzlTabEntry,SzlResults,Process

 

Process 以sawzall源程序为输入,通过Process::set_emitter_factory(DemoEmitterFactory*)和sawzall::RegisterEmitters(&process)设置好emitters,然后通过process.Run(input[i].data(), input[i].size(), NULL, 0)来对每条输入数据进行处理。处理结果需要通过SzlEmitter. Flusher()写出。

 

在这里最核心的是Emitter,Emitter负责响应对table的”<-“操作,在src/public/emitterinterface.h,我们可以看到其接口定义,注释已经写的比较清楚,可以将其看做是针对”emit”语句的一系列响应动作,针对每个元素,会有三个过程:

1)      Begin (<appropriate compound type>, <length>)

2)      PutX

3)      End(<appropriate compound type>, <length>)

一个emit语句会包含如下几个组成部分:Index,Element,Weight。比如如下语句将会产生如下对应调用序列:

  //   emit table[1] <- { "foo": 1, "bar": 0 };

  //   Begin(EMIT, 1)

  //     Begin(INDEX, 1) PutInt(1) End(INDEX, 1)

  //     Begin(ELEMENT, 1)

  //       Begin(MAP, 2)

  //         PutString("foo", 3) PutInt(1)

  //         PutString("bar", 3) PutInt(0)

  //       End(MAP, 2)

  //     End(ELEMENT, 1)

  //   End(EMIT, 1)

 

参考下src/emitvalues/szlemitter.cc实现,除了继承自Emitter的接口外,SzlEmitter新增了Merge,DisplayResults,Flusher,Clear等几个新接口。此外还包含了一些重要的变量:SzlTabWriter* writer,SzlTabEntryMap* table_等。也就是说它除了负责对emit进行响应外,同时还会在内存中保存emit操作后的结果,以及将这些数据写出。

SzlTabEntryMap实际上是hash_map<string, SzlTabEntry*>。SzlTabEntry实际上就是table内的一个value, SzlEmitter内部会调用SzlTabWriter来创建和修改SzlTabEntry,而一个SzlTabEntry也可能是一个复合结构,同时它往往与SzlTabWriter是成对出现的,实际上真正的聚合器逻辑就是通过SzlTabEntry来实现的,src/emitters/目录下就包含了一系列常用的聚合器实现。

 再进一步的看,sawzall从外部看来可以支持很多table类型,table本身也可以是多维的,每维的数据类型也是多样的,通过分析SzlEmitter::Begin,End,PutX,可知实际上在内部它会通过encoder将多个维度上的index值组合成一个key,也就是说内部看到的只有一个key,这一点通过SzlTabEntryMap也可以看出。

2.2      模块结构

Szl的src目录下有如下一些文件夹:

app:包含szl本地化工具的相关实现,app/tests/目录下还包含了一个MapReduce的实现实例

contrib:内含Emacs的一个插件

emitters:一系列聚合器实现

emitvalues:sawzall相关一些基本类型定义,包括decoder,emitter,encoder,tableentry等

engine:sawzall语言相关,包括词法分析,语法分析,执行引擎

fmt:格式化输出相关

intrinsics:一些常见的运算支持

protoc_plugin:protobuf相关工具

public:公开的头文件

utilities:基本工具类

3. 将Sawzall应用于MapReduce环境

3.1      mapreduce_demo_unittest.cc

位于src/app/tests/mapreduce_demo_unittest.cc,对于一个MapReduce程序来说,整个处理过程涉及到如下几个对象:sawzall源程序,输入数据,MapReduce框架,驱动程序。

 

对于整个数据处理过程来说,sawzall源程序和输入数据都是输入,驱动程序负责读取数据和sawzall源程序,将sawzall源程序进行动态的编译,然后将该编译后程序针对每条输入记录执行一遍,table的数据是sawzall源程序的输出窗口,每条记录执行时都会去修改table的数据,该table会始终保留这些修改。而table的内容会在调用SzlEmitter::Flusher()时被写出去,该函数会通过调用SzlEmitter::WriteValue(const string& key, const string& value)将SzlTabEntryMap中的所有key,value写出去,因此用户可以通过实现自己的WriteValue函数就可以控制table数据如何写出。

 Table中的数据如果是聚合器类型,需要支持Merge操作,相互会进行Merge。

3.2      执行过程

 首先来考虑下如果将sawzall应用到MapReduce环境中,大概应该怎么实现。首先需要sawzall语言的支持,这个通过Process类可以实现,然后用户程序只需要读出一条条的记录,然后将它交给Process,Process会通过调用Run 处理每条记录,所以Process就像一个执行环境,首先它需要负责保存好记录执行后的table数据,同时它还要能够将table数据传给用户程序,比如随着处理的进行,table中的key,value个数逐渐增多,因此用户就需要将数据Flush出去,同时清空table,然后继续处理避免内存耗尽。写出去的数据是key,value的形式,同时这些key,value数据还要能够进行排序,reducer会重新将他们读取出来,同时需要将相同key下的进行reduce,对于reduce后的结果进行显示或保存。

在这个过程中可能会有如下问题:如何将SzlEmitter与table关联?如何得到sawzall执行后的table数据?如何对Table中数据进行序列化反序列化?

首先Map端负责读取数据和sawzall程序的解析,对于sawzall脚本中的每个table,在整个过程中应该有一个SzlEmitter对象实例与之相对应,在table的定义处,就可以根据table名称,创建出该SzlEmitter对象。然后每读出一条记录,就将它喂给sawzall执行引擎,该引擎内部会执行sawzall的处理逻辑,这些逻辑底层会调用SzlEmitter的相关函数,并更新其内部数据。那么如何根据table定义,创建出对应的SzlEmitter对象呢?Process有个函数set_emitter_factory,可以设置SzlEmitterFactory,而SzlEmitterFactory有个函数NewEmitter,该函数可以根据TableInfo,调用SzlTabWriter::CreateSzlTabWriter创建出对应的SzlTabWriter,然后再以该SzlTabWriter为参数创建出SzlEmitter。这样创建出的SzlEmitter中的SzlTabWriter就是针对给定的table的了。

要了解sawzall中的table是如何与SzlTabWriter关联上的就需要查看CreateSzlTabWriter的实现了,具体在src/emitvalues/szltabentry.cc,可以看到它是通过一个全局静态变量creators来根据table名称比如”sum”找到对应的SzlTabWriter,而另一方面我们看src/emitters/szlmaximum.cc,里面有REGISTER_SZL_TAB_WRITER(maximum, SzlMaximum);该宏会将相应SzlTabWriter的构造函数与其名称相关联。

 Map端可以通过SzlEmitter::GetMemoryUsage得到当前内存使用情况,并判断是否调用SzlEmitter::Flusher。但是对于不同的table可能需要选择不同的处理方式,比如如果只是”stdout <- “,就需要每次执行都做输出,但是如果是”sum <-”只需要最后输出即可,内部如何对这两种情况进行控制呢?当然stdout这个比较特殊,它可能本身并不属于一个table类型,不会将数据存入内存。但是对于sum和collection,这种区别可能就比较明显了,sum只是要一个最终的结果,collection则需要收集所有见过的数据,因此flush的频率也就是不同的。当多个table出现时,如何控制flush的频率可能是个需要权衡的地方。当然table内数据何时写出以及如何写出,都是由外部框架负责的,跟sawzall已经没有了关系。

 3.3      需要做的工作

综上分析,我们总结下,将sawzall用于MapReduce环境,大概需要做这样几个工作:

·         实现一个Emiter,重写WriteValue方法

·         由于一个sawzall程序可能会用到多个table,因此在序列化时,需要在key,value中包含table信息,这样才能在读出时识别出它属于哪个table

·         编写驱动程序,Map端负责读取数据,sawzall程序,构建sawzall执行环境,输出table内容,Reduce端负责读取table内容,并完成聚合,输出结果

·         具体实现可以参考src/app/szl.cc和src/app/tests/mapreduce_demo_unittest.cc

3.4      伪代码

Mapper

InitializeAllModules();

sawzall::RegisterStandardTableTypes();

sawzall::Executable exe(program_name.c_str(), source.c_str(), sawzall::kNormal);

sawzall::Process process(&exe, false, NULL);

DemoEmitterFactory emitter_factory(result, num_shards);

process.set_emitter_factory(&emitter_factory);

sawzall::RegisterEmitters(&process);

process.Initialize();

for (int i = 0; i < num_input_lines; i++) {

    process.Run(input[i].data(), input[i].size(), NULL, 0)

}

 

// Flush the emitter output to the mapper output shards.

for (int i = 0; i < emitter_factory.emitters().size(); i++) {

    SzlEmitter* emitter = emitter_factory.emitters()[i];

    emitter->Flusher();

}

 Reducer

 for (int i = 0; i < reducer_input.size(); i++) {

    const string& name_key = reducer_input[i].first;

    const vector<string>& values = reducer_input[i].second;

    size_t separator_index = name_key.find(kSzlKeyValueSep);

    string name = name_key.substr(0,separator_index);

    string key = name_key.substr(separator_index+1); 

    map<string,SzlTabWriter*>::iterator it = tabwriters.find(name);

    const SzlTabWriter* tw = it->second;

    // Create the tabwriter and tabentry for the key & value.

   SzlTabEntry* te = tw->CreateEntry(key); 

    if (tw->Aggregates()) {

      // For aggregating tables, first merge the values.

      for (int j = 0; j < values.size(); j++) {

        SzlTabEntry::MergeStatus status = te->Merge(values[j]);

        if (status == SzlTabEntry::MergeError)

          LOG(FATAL) << "error merging results";

      }

      string value;

      te->Flush(&value);

      // Write the output to the mill

      result.push_back(KeyValuePair(name_key,value));

    } else {

      // Non-aggregating tables.

      if (tw->WritesToMill()) {

        // Just write the value directly into the mill.

        for (int j = 0; j < values.size(); j++) {

          result.push_back(KeyValuePair(name_key,values[j]));

        }

      } else {

        // Direct output table – let the table write the value.

        for (int j = 0; j < values.size(); j++) {

          te->Write(values[j]);

        }

      }

    }

    delete te;

  }

4. 相关工作

以目前的眼光来看sawzall,它存在如下一些问题:项目活跃度不够,相关的讨论极少,很久已无更新,目前来看已经是比较老的查询系统了,不知道Google内部是否还在用,但是已知的是在sawzall之后Google已经开发出了一些新的系统完成类似工作,如FlumeJava,Dremel,Tenzing,当然对于一些比较复杂的统计需求来说sawzall还是有其优势的;Sawzall本身更偏向于统计分析,基本上都是只读性的操作;对输入数据格式支持有限,目前内置支持文本或protobuf格式;开源出的版本缺少MapReduce支持,当然这也是Google的MapReduce未开源导致的,所以并没有真正开放出MapReduce环境下的代码,基本上如不做修改只能作为一个单机程序来使用;与手写MapReduce相比,性能会有些下降。当然作为最早的封装于MapReduce之上的脚本式编程语言,这项工作还是具有很大意义的,它大大简化了MapReduce的编写过程。后来的Pig,Hive都参考了它,尤其是Pig更与之类似,都是过程性的,语法都比较符合程序员的使用习惯,与类SQL的HQL则差异较大。

Sawzall实际上已经有比较长的历史了,2003年就已开始应用在Google内部,2006年相关论文发表,2010年开源。值得一提的是其作者之一是Rob Pike,著名的Unix先驱,在贝尔实验室最早和Ken Thompson以及 Dennis M. Ritche 参与Unix开发,UTF-8的设计人,经典书籍The Unix Programming Environment 和 The Practice of Programming 的作者,Google在2009年推出的Go语言就是出自他和Ken Thompson等人之手。

5. 参考文献

http://code.google.com/p/szl/wiki/Overview

Experiences Scaling Use of Google’s Sawzall

Interpreting the Data:Parallel Analysis with Sawzall

http://www.cs.uwaterloo.ca/~kmsalem/courses/CS848W10/presentations/Karyakin-Sawzall.pdf

用Sawzall在map-reduce框架下做数据统计

 

You Might Also Like