分布式系统

Running Hadoop MapReduce on Amazon EC2 and Amazon

2010年1月4日 阅读(124)

http://developer.amazonwebservices.com/connect/entry.jspa?externalID=873&categoryID=112
AWS and Hadoop developer Tom White shows how to use Hadoop and Amazon Web Services together to process a large collection of web access logs.

AWS Products Used: Amazon EC2, Amazon S3 Language(s): Java Date Published: 2007-07-18

By Tom White

Managing large datasets is hard; running computations on large datasets is even harder. Once a dataset has exceeded the capacity of a single filesystem or a single machine, running data processing tasks requires specialist hardware and applications, or, if attempted on a network of commodity machines, it requires a lot of manual work to manage the process: splitting the dataset into manageable chunks, launching jobs, handling failures, and combining job output into a final result.

Apache’s Hadoop project aims to solve these problems by providing a framework for running large data processing applications on clusters of commodity hardware. Combined with Amazon EC2 for running the application, and Amazon S3 for storing the data, we can run large jobs very economically. This paper describes how to use Amazon Web Services and Hadoop to run an ad hoc analysis on a large collection of web access logs that otherwise would have cost a prohibitive amount in either time or money.

Hadoop MapReduce

Apache Hadoop, a sub-project of the well-known Lucene text search library, provides several components for building distributed applications. For this article we shall focus on the MapReduce component, which provides a simple but powerful programming model for writing parallel programs by defining how the processing can be split into small fragments of work.

The MapReduce concept (and name) comes from Google, which is described in an excellent paper by Jeffrey Dean and Sanjay Ghemawat, which is well worth reading. Google’s MapReduce implementation, while extensively used inside the company, is obviously not available for general use. A goal of the Hadoop project is to provide an open source implementation of MapReduce that anyone can run on their own cluster, or on rented hardware, such as an Amazon EC2 cluster. While the Hadoop implementation is similar to that described in the Dean and Ghemawat paper, it is worth noting that there are differences in design and nomenclature.

Writing a MapReduce Program

For our example we are going to write a program that takes web server access log files (as produced by an Apache Web Server, for example) and counts the number of hits in each minute slot over a week. We will analyze months of logs and plot the distribution in order to get a view of how traffic volumes tend to vary over the course of a week. The beauty of this approach is that the same program will scale to months or years of massive logs, simply by increasing the cluster size.

The best way to think about MapReduce programs is to think about the input and output of each phase: the Map phase and the Reduce phase. Each phase has key-value pairs as input, and key-value pairs as output. The types and number of records of the input and output may be different, although the Map output types must be the same as the Reduce input types. Let’s see how to choose the types for our MapReduce program

The input to the Map phase is the access log files, and we use an input format that gives us key-value pairs which are the character offset within the access log (which we ignore) and the corresponding line. Our Map function takes a log line, pulls out the timestamp field for when the server finished processing the request, converts it into a minute-in-week slot, then writes out a (<minute-in-week slot>, <1>) key-value pair. We are mapping each line in the access log to its minute-in-week slot.

The Reduce is given <minute-in-week slot> keys and an iterator over all the values for the key that were produced by the maps. So all we have to do is sum the values as we iterate over them, in order to produce a final output which are (<minute-in-week slot>, <total hits>) key-value pairs. Hadoop’s MapReduce infrastructure is actually doing a lot behind the scenes to make this work, and to make it reliable in the face of hardware failure. It even sorts the output of the Reduce phase by key, so we have exactly what we need to plot our graph. (We won’t use Hadoop to plot our graph for us, but more of that later.)

To illustrate the way this all works, take the following three-line access log:

192.168.0.5 – – [22/Aug/2005:22:07:52 +0000] "GET / HTTP/1.1" 200 1722

192.168.0.5 – – [22/Aug/2005:22:07:52 +0000] "GET /styles/layout.css HTTP/1.1" 200 2187

192.168.0.5 – – [22/Aug/2005:22:08:00 +0000] "GET /projects.html HTTP/1.1" 200 4024

Our Map takes each input line and produces the following output (note 22:07 on 22 August 2005 is the 1387th minute in the week, where the week starts on Monday):

<0, 192.168.0.5 – – [22/Aug/2005:22:07:52 +0000] "GET / HTTP/1.1" 200 1722> -> <1387, 1>

<71, 192.168.0.5 – – [22/Aug/2005:22:07:52 +0000] "GET /styles/layout.css HTTP/1.1" 200 2187> -> <1387, 1>

<159, 192.168.0.5 – – [22/Aug/2005:22:08:00 +0000] "GET /projects.html HTTP/1.1" 200 40247> -> <1388, 1>

Our Reduce then adds up the 1 values emitted by the Map, to produce totals:

<1387, (1, 1)> -> <1387, 2>

<1388, (1)> -> <1388, 1>

The Reduce output is written out as a tab-separated file. It says there were two requests in the 1387th minute-in-week slot, and one in the 1388th – which is correct.

1387 2

1388 1

The Code

Let’s translate the above ideas into Java code. First we create a class that will run our program:

public class AccessLogFileAnalyzer {

//…

}

It contains our Map and Reduce inner classes and a main method to launch the job. Here’s the Map class:

public static class MapClass extends MapReduceBase implements Mapper {

private final static LongWritable ONE = new LongWritable(1);

private static Pattern p = Pattern

.compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +

" ([^ ]*) ([^ ]*).*");

private static DateTimeFormatter formatter = DateTimeFormat

.forPattern("dd/MMM/yyyy:HH:mm:ss Z");

private IntWritable minute = new IntWritable();

public void map(WritableComparable key, Writable value,

OutputCollector output, Reporter reporter) throws IOException {

String line = ((Text) value).toString();

Matcher matcher = p.matcher(line);

if (matcher.matches()) {

String timestamp = matcher.group(4);

minute.set(getMinuteBucket(timestamp));

output.collect(minute, ONE);

}

}

private int getMinuteBucket(String timestamp) {

DateTime dt = formatter.parseDateTime(timestamp);

return dt.getMinuteOfDay() + (dt.getDayOfWeek() – 1)

* DateTimeConstants.MINUTES_PER_DAY;

}

}

The interesting work is done by the de>map()de> method, which is specified in the de>org.apache.hadoop.mapred.Mapperde> interface. It takes the de>valuede> parameter that it is passed and casts it to a de>org.apache.hadoop.io.Textde> type – we shall see later how this is specified later. The de>Textde> object is a Hadoop framework class that can be serialized using Hadoop’s serialization protocol and stores text in a UTF-8 encoding. We convert it to a regular Java de>Stringde>, before we use a regular expression for extracting the timestamp field from a Extended Log File Format record. We call the utility method de>getMinuteBucket()de>, which uses the handy Joda Time library to convert the timestamp to an integer minute-in-week slot, then write our output to the de>org.apache.hadoop.mapred.OutputCollectorde>. Notice we use an de>org.apache.hadoop.io.IntWritablede> to wrap the key, and a de>org.apache.hadoop.io.LongWritablede> to wrap the value, so that Hadoop can serialize the values.

The Reduce code is much simpler:

public static class ReduceClass extends MapReduceBase implements Reducer {

public void reduce(WritableComparable key, Iterator values,

OutputCollector output, Reporter reporter) throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += ((LongWritable) values.next()).get();

}

output.collect(key, new LongWritable(sum));

}

}

We simply iterate through the values we are passed, which are the same types as the Map output values (de>LongWritablede>), and sum them. The key is also the same as the Map output key, an (de>IntWritablede>), and we use it to emit the final key-value pair: the minute-in-week slot, and total hit count for that slot.

Hadoop actually comes with a library of stock maps and reducers, and in this case we could have used de>LongSumReducerde> which does the same as our reducer, but it’s useful to see how you can implement your own reducer.

The final piece of code is the de>main()de> method that runs the MapReduce job.

public static void main(String[] args) throws IOException {

if (args.length != 2) {

System.err

.println("Usage: AccessLogFileAnalyzer <input path> <output path>");

System.exit(-1);

}

JobConf conf = new JobConf(AccessLogFileAnalyzer.class);

conf.setInputPath(new Path(args[0]));

conf.setOutputPath(new Path(args[1]));

conf.setOutputKeyClass(IntWritable.class);

conf.setOutputValueClass(LongWritable.class);

conf.setMapperClass(MapClass.class);

conf.setCombinerClass(ReduceClass.class);

conf.setReducerClass(ReduceClass.class);

conf.setNumReduceTasks(1);

JobClient.runJob(conf);

}

A de>org.apache.hadoop.mapred.JobConfde> object hold details of how to run a MapReduce job. First, we specify the input and output directory paths, which are set from the command line arguments. Next, we set the output key-value types, which are the types that the Reducer emits. We didn’t set the input types, since the defaults (de>org.apache.hadoop.io.LongWritablede> for the beginning of line character offsets, and de>org.apache.hadoop.io.Textde> for the lines) are what we need. Also, the input format and output format – how the input files are turned into key-value pairs, and how the output key-value pairs are turned into output files – are not specified since the default is to use text files (as opposed to using a more compact binary format).

Having set the input and output configuration parameters, we specify the Map and Reduce classes to use. We also set the Combiner class. A Combiner is just a Reduce task that runs in the same process as the Map task after the Map task has finished. The reason this is often a good idea is to greatly reduce the amount of data that has to be sent across the network as input to the Reduce phase. For the current application the reduction in network traffic is stark: rather than serializing as many key-value pairs as there are lines in the input file being processed by the Map task, in the order of 107, the number of pairs emitted after the Combine task runs is the number of distinct minute-in-week slots, 104, a difference of three orders of magnitude.

The last job configuration setting concerns the number of maps and reduces. Selecting good values for these is something of an art, but for this application one Reduce task works well, primarily due to the great optimization achieved by the effect of the Combine phase. This is not typical however, as it is common to have a number of Reduces, which produce a corresponding number of output files. The outputs may need to be combined into a single output as a final post-processing step, or they may be input to a further MapReduce job.

The final line of the program does all the work of submitting the job and waiting for it to finish.

Testing, Testing

Before letting our new program loose on a large cluster of machines it is a good idea to check it works on a small amount of test data. This is easy to do from your IDE. All you need is to put the Hadoop jar (and its dependencies) on the classpath, with your MapReduce job (and its dependencies, Joda Time in this case). When you run the main method of the job it will use a local job runner that runs Hadoop in the same JVM, which allows you to run a debugger, should you need to.

We run de>AccessLogFileAnalyzerde> with two command line arguments: the first to specify the input directory containing a few small access log files, and the second to specify the (initially non-existent) output directory. After the job completes there is a file called part-00000 in the output directory which contains the output key-value pairs for the weekly distribution:

0 6

1 15

2 30

3 22

4 20

5 25



10075 6

10076 10

10077 11

10078 4

10079 4

This looks good. So let’s try analyzing a big dataset.

Data Considerations

So far we have only focused on the MapReduce programming model, without much regard for where the data comes from. Hadoop is designed to process large datasets, typically larger than can be stored on a single machine, and its MapReduce implementation is tuned to process data that is stored on the Hadoop Distributed File System (HDFS). HDFS is another component of Hadoop and was inspired by The Google File System, described in a paper by Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung.

We have several options about how to get the data into HDFS for processing, including:

  1. Write the data to HDFS as it is created.
  2. Copy the data from the machines it was created on to HDFS.
  3. Copy the data from the machines it was created on to Amazon S3, then to HDFS.

Note that files stored on S3 are not subject to the usual 5GB limitation since there is a special Hadoop filesystem that breaks files into blocks so we can store arbitrarily large files on S3.

We’ll illustrate option 3 for the purposes of this article. And while a full discussion of the pros and cons of each option is beyond the scope of this article, it’s worth mentioning that S3 scores well as a persistent store for the log files (it can double as a back up) and it is easy enough to copy them from the web server after they have been rotated at the end of the day. Transfers between S3 and EC2 are free, so we can launch our EC2 Hadoop cluster, transfer the files to it, run our job, then shut down the cluster. This means we only pay for the EC2 instances for as long as the job runs. For very large datasets the time taken to transfer the data from S3 may become prohibitive, in which case it is worth considering storing the data on HDFS on a long-running EC2 cluster.

Hadoop comes with tools to move files between different filesystems. To copy files from a locally mounted filesystem you would install Hadoop locally, then run a command like:

bin/hadoop fs -put /path/to/source /path/to/target

The target path is actually a URI. To copy files to S3 we use the de>s3de> URI scheme, so we would do something like

bin/hadoop fs -put /path/to/source s3://<ID>:<SECRET>@<BUCKET>/path/to/target

where de><ID>de>, de><SECRET>de>, de><BUCKET>de> are your Amazon S3 credentials. (Note that since the secret access key can contain slashes, you must remember to escape them by replacing each slash de>/de> with the string de>%2Fde>)

Packaging the Code

How do we get our MapReduce application to the cluster? Simple, we package it in a jar along with its dependencies. It’s a bit like a WAR file, except dependent jars go in a lib subdirectory, rather than in WEB-INF/lib. Here’s the relevant Ant task for our application:

<jar jarfile="${build.dir}/aws-job.jar">

<fileset dir="${build.classes}"/>

<fileset dir="${basedir}" includes="lib/" excludes="**/hadoop*.jar"/>

<manifest>

<attribute name="Main-Class"

value="org/tiling/hadoop_aws_article/AccessLogFileAnalyzer"/>

</manifest>

</jar>

Launching Our EC2 Hadoop Cluster

Hadoop is packaged as a public EC2 image (an AMI) so it is easy for us to get up and running with a cluster. If the version of Hadoop you want to run is not available – for example, if you want to run a patched version – then it is easy to build your own.

Here we’ll just use a stock Hadoop AMI. We can find which versions of Hadoop are available as AMIs by using the Amazon EC2 tools. (Version 0.13.0 was used for this article.)

ec2-describe-images -a | grep hadoop-ec2-images

While it is possible to use the EC2 tools to launch Hadoop instances, Hadoop comes with a set of scripts that make the job of launching a cluster much easier. The scripts come with the standard Hadoop distribution, so start by downloading and unpacking the latest version of Hadoop on your local workstation. (The latest nightly build is recommended since it fixes a bug in the scripts that at the time of writing is not in an official released version). Then edit the EC2 configuration in src/contrib/ec2/bin/hadoop-ec2-env.sh to specify your Amazon Web Service settings, the Hadoop version to run on the cluster (which does not have to match the version of the distribution we unpacked on our workstation), the hostname for the master, and the size of the cluster.

The hostname you select should be one you have control over, as you will be asked to set it to point to a particular IP address during launch. Free services such as DynDNS make this very easy.

How big should the cluster be? It depends on the number of maps and reduces, but in most cases make it as big as you can. By default EC2 users are limited to 20 instances, so this is a natural starting point.

With the configuration out of the way (and see the Running Hadoop on EC2 wiki page if you need more pointers) we’re ready to go. Here’s what to type:

bin/hadoop-ec2 run

This command does the following:

  1. Starts a cluster of Hadoop nodes
  2. Prompts you to set up DNS with the given IP address
  3. Formats the HDFS filesystem on the cluster
  4. Starts the Hadoop daemons on the cluster
  5. Logs you onto the master node

It’s also possible to run these commands one at a time, which can be useful for debugging. Type de>bin/hadoop-ec2de> for usage instructions.

Moving the Data and Code to the Cluster

The next step is to copy our data from S3 to HDFS on our pristine cluster. First create a logs directory in HDFS, then do the copy using the de>distcpde> tool that comes with Hadoop by running the following on the master node:

cd /usr/local/hadoop-<version>

bin/hadoop fs -mkdir logs

bin/hadoop distcp s3://<ID>:<SECRET>@<BUCKET>/path/to/logs logs

We also need to copy our job jar from our workstation by running (from our workstation):

. bin/hadoop-ec2-env.sh

scp $SSH_OPTS /path/to/aws-job.jar root@$MASTER_HOST:

Running and Monitoring Our Job

We are ready to run the job at last. From the master node:

bin/hadoop jar ~/aws-job.jar logs out

The command line output will periodically report progress. However, it is worth using the web interface to monitor the job, since it gives more information and allows you to drill down into log files running on the various nodes, which can be invaluable in the event of failures. The interface is available at de>http://<MASTER_HOST>:50030/de>.

Interpreting the Results

The final summary output is shown in the table below. The system maintains counts of input and output data records and bytes, in this case the job processed just under 100GB of data – six weeks of logs – and it took about 35 minutes. (It should go without saying that this is a great deal faster than processing on a single machine would have taken.)

It took about 5 minutes to transfer the data (which was compressed – Hadoop can read compressed input data out of the box) from S3 to to HDFS, so the whole job took less than a hour. At $0.10 per instance-hour, this works out at only $2 for the whole job, plus S3 storage and transfer costs – that’s external transfer costs, because remember transfers between EC2 and S3 are free.

Counter Map Reduce Total Map-Reduce Framework Map input records 449,662,417 0 449,662,417 Map output records 449,661,579 0 449,661,579 Map input bytes 105,793,389,172 0 105,793,389,172 Map output bytes 5,395,938,948 0 5,395,938,948 Combine input records 449,661,579 0 449,661,579 Combine output records 60,730 0 60,730 Reduce input groups 0 10,080 10,080 Reduce input records 0 60,730 60,730 Reduce output records 0 10,080 10,080

The output from the job is very small, so we can easily copy it to our workstation then run a simple R script to produce a graph of traffic over the week.

png("web_hits_over_week.png")

data <- read.table("part-00000")

plot(data, axes=FALSE, ann=FALSE, type="p", pch=".")

lines(c(1440*7,0), c(0, 0), col="gray")

for (i in 0:7) {

lines(c(1440,1440)*i, c(0, max(data)), col="gray")

}

dev.off()

Running Hadoop MapReduce on Amazon EC2 and Amazon S3(zz) - 星星 - 银河里的星星

Don’t Forget to Shutdown

We’ve run our job and we’re happy with the results (which we’ve safely copied from the cluster), so we can shut down the cluster. From our workstation:

bin/hadoop-ec2 terminate-cluster

Your Next MapReduce Application

MapReduce is a great programming model for large scale data processing tasks, and Amazon EC2 is a great place to run your MapReduce programs to process data stored on Amazon S3. In this article we’ve looked at how to do ad hoc web access log analysis, but there are plenty of other applications that can be expressed as MapReduce programs – applications that process your data. So get writing your data processing programs with Hadoop and use Amazon Web Services to run them at the scale you need.

Resources

Tom White is a committer on the Apache Hadoop project. He is the primary author of Hadoop’s Amazon EC2 and S3 integration.

Related Documents
Type: AMIs
Hadoop AMI
Type: Sample Co<wbr>de
Sample Code for "Running Hadoop MapReduce on Amazon EC2 and S3"



Discussion
Running Hadoop MapReduce on Amazon EC2 and Amazon S3(zz) - 星星 - 银河里的星星

The 5 most recent discussion messages. View full discussion.

smarttux
Posts: 148
Registered: 3/11/08
Running Hadoop MapReduce on Amazon EC2 and Amazon S3
Posted: Jun 13, 2008 12:20 PM PDT  

  Click to reply to this thread Reply

Thanks for the informative article. I have a couple of questions.

What could happen if the EC2 host running one of the Hadoop slave instances crash ? Do we need to backup all data and relaunch EC2-Hadoop cluster ? Or will this cluster automatically replace such a failed node ?

chris@wensel.net
Posts: 25
Registered: 9/6/07
Re: Running Hadoop MapReduce on Amazon EC2 and Amazon S3
Posted: Jun 13, 2008 4:44 PM PDT   in response to: smarttux  

  Click to reply to this thread Reply

The node won’t be replaced.

But every block of data on the cluster is replicated at least 3 times by default. You can tweak this. So if you lose a node, it is very unlikely you will lose data.

But you do need to start a replacement slave instance manually. This can be done using the Hadoop 0.17.0 ami’s (use the scripts from the same Hadoop release). Pre Hadoop 0.17.0 growing a cluster on demand was not possible per the scripts (not a limitation of Hadoop per se).

Note these scripts are different than the article you just likely read. You should make note of the changes documented on the Hadoop wiki.

If you get stuck, ping the Hadoop mail list, or #hadoop on irc.

ckw


Reviews

You Might Also Like