分布式系统

MapReduce:一个巨大的倒退(II)(zz)

2011年10月3日 阅读(324)

zz from:http://databasecolumn.vertica.com/database-innovation/mapreduce-ii/

MapReduce: A major step backwards 的续

[Note: Although the system attributes this post to a single author, it was written by David J. DeWitt and Michael Stonebraker]

Last week’s MapReduce post attracted tens of thousands of readers and generated many comments, almost all of them attacking our critique. Just to let you know, we don’t hold a personal grudge against MapReduce. MapReduce didn’t kill our dog, steal our car, or try and date our daughters.

Our motivations for writing about MapReduce stem from MapReduce being increasingly seen as the most advanced and/or only way to analyze massive datasets. Advocates promote the tool without seemingly paying attention to years of academic and commercial database research and real world use.

The point of our initial post was to say that there are striking similarities between MapReduce and a fairly primitive parallel database system. As such, MapReduce can be significantly improved by learning from the parallel database community.

So, hold off on your comments for just a few minutes, as we will spend the rest of this post addressing four specific topics brought up repeatedly by those who commented on our previous blog:

MapReduce is not a database system, so don’t judge it as one

MapReduce has excellent scalability; the proof is Google’s use

MapReduce is cheap and databases are expensive

We are the old guard trying to defend our turf/legacy from the young turks

Feedback No. 1: MapReduce is not a database system, so don’t judge it as one

It’s not that we don’t understand this viewpoint. We are not claiming that MapReduce is a database system. What we are saying is that like a DBMS + SQL + analysis tools, MapReduce can be and is being used to analyze and perform computations on massive datasets. So we aren’t judging apples and oranges. We are judging two approaches to analyzing massive amounts of information, even for less structured information.

To illustrate our point, assume that you have two very large files of facts. The first file contains structured records of the form:

Rankings (pageURL, pageRank)

Records in the second file have the form:

UserVisits (sourceIPAddr, destinationURL, date, adRevenue)

Someone might ask, “What IP address generated the most ad revenue during the week of January 15th to the 22nd, and what was the average page rank of the pages visited?”

This question is a little tricky to answer in MapReduce because it consumes two data sets rather than one, and it requires a “join” of the two datasets to find pairs of Ranking and UserVisit records that have matching values for pageURL and destinationURL. In fact, it appears to require three MapReduce phases, as noted below.

Phase 1

This phase filters UserVisits records that are outside the desired data range and then “joins” the qualifying records with records from the Rankings file.

Map program: The map program scans through UserVisits and Rankings records. Each UserVisit record is filtered on the date range specification. Qualifying records are emitted with composite keys of the form <destinationURL, T1 > where T1 indicates that it is a UserVisits record. Rankings records are emitted with composite keys of the form <pageURL, T2 >  (T2 is a tag indicating it a Rankings record). Output records are repartitioned using a user-supplied partitioning function that only hashes on the URL portion of the composite key.

Reduce Program: The input to the reduce program is a single sorted run of records in URL order. For each unique URL, the program splits the incoming records into two sets (one for Rankings records and one for UserVisits records) using the tag component of the composite key. To complete the join, reduce finds all matching pairs of records of the two sets. Output records are in the form of Temp1 (sourceIPAddr, pageURL, pageRank, adRevenue).

The reduce program must be capable of handling the case in which one or both of these sets with the same URL are too large to fit into memory and must be materialized on disk. Since access to these sets is through an iterator, a straightforward implementation will result in what is termed a nested-loops join. This join algorithm is known to have very bad performance I/O characteristics as “inner” set is scanned once for each record of the “outer” set.

Phase 2

This phase computes the total ad revenue and average page rank for each Source IP Address.

Map program: Scan Temp1 using the identity function on sourceIPAddr.

Reduce program: The reduce program makes a linear pass over the data. For each sourceIPAddr, it will sum the ad-revenue and compute the average page rank, retaining the one with the maximum total ad revenue. Each reduce worker then outputs a single record of the form Temp2 (sourceIPAddr,  total_adRevenue, average_pageRank).

Phase 3

Map program: The program uses a single map worker that scans Temp2 and outputs the record with the maximum value for total_adRevenue.

We realize that portions of the processing steps described above are handled automatically by the MapReduce infrastructure (e.g., sorting and partitioning the records). Although we have not written this program, we estimate that the custom parts of the code (i.e., the map() and reduce() functions) would require substantially more code than the two fairly simple SQL statements to do the same:

Q1

Select as Temp  sourceIPAddr, avg(pageRank) as avgPR, sum(adRevenue) as adTotal
From Rankings, UserVisits
where Rankings.pageURL = UserVisits.destinationURL and
date > “Jan 14″ and date < “Jan 23″
Group by sourceIPAddr

Q2

Select sourceIPAddr, adTotal, avgPR From Temp Where adTotal = max (adTotal)

No matter what you think of SQL, eight lines of code is almost certainly easier to write and debug than the programming required for MapReduce. We believe that MapReduce advocates should consider the advantages that layering a high-level language like SQL could provide to users of MapReduce. Apparently we’re not alone in this assessment, as efforts such as PigLatin and Sawzall appear to be promising steps in this direction.

We also firmly believe that augmenting the input files with a schema would provide the basis for improving the overall performance of MapReduce applications by allowing B-trees to be created on the input data sets and techniques like hash partitioning to be applied. These are technologies in widespread practice in today’s parallel DBMSs, of which there are quite a number on the market, including ones from IBM, Teradata, Netezza, Greenplum, Oracle, and Vertica. All of these should be able to execute this program with the same or better scalability and performance of MapReduce.

Here’s how these capabilities could benefit MapReduce:

Indexing. The filter (date > “Jan 14″ and date < “Jan 23″) condition can be executed by using a B-tree index on the date attribute of the UserVisits table, avoiding a sequential scan of the entire table.

Data movement. When you load files into a distributed file system prior to running MapReduce, data items are typically assigned to blocks/partitions in sequential order. As records are loaded into a table in a parallel database system, it is standard practice to apply a hash function to an attribute value to determine which node the record should be stored on (the same basic idea as is used to determine which reduce work

er should get an output record from a map instance). For example, records being loaded into the Rankings and UserVisits tables might be mapped to a node by hashing on the pageURL and destinationURL attributes, respectively. If loaded this way, the join of Rankings and UserVisits in Q1 above would be performed completely locally with absolutely no data movement between nodes. Furthermore, as result records from the join are materialized, they will be pipelined directly into a local aggregate computation without being written first to disk. This local aggregate operator will partially compute the two aggregates (sum and average) concurrently (what is called a combiner in MapReduce terminology). These partial aggregates are then repartitioned by hashing on this sourceIPAddr to produce the final results for Q1.

It is certainly the case that you could do the same thing in MapReduce by using hashing to map records to chunks of the file and then modifying the MapReduce program to exploit the knowledge of how the data was loaded. But in a database, physical data independence happens automatically. When Q1 is “compiled,” the query optimizer will extract partitioning information about the two tables from the schema.  It will then generate the correct query plan based on this partitioning information (e.g., maybe Rankings is hash partitioned on pageURL but UserVisits is hash partitioned on sourceIPAddr). This happens transparently to any user (modulo changes in response time) who submits a query involving a join of the two tables.

Column representation. Many questions access only a subset of the fields of the input files. The others do not need to be read by a column store.

Push, not pull. MapReduce relies on the materialization of the output files from the map phase on disk for fault tolerance. Parallel database systems push the intermediate files directly to the receiving (i.e., reduce) nodes, avoiding writing the intermediate results and then reading them back as they are pulled by the reduce computation. This provides MapReduce far superior fault tolerance at the expense of additional I/Os.

In general, we expect these mechanisms to provide about a factor of 10 to 100 performance advantage, depending on the selectivity of the query, the width of the input records to the map computation, and the size of the output files from the map phase. As such, we believe that 10 to 100 parallel database nodes can do the work of 1,000 MapReduce nodes.

To further illustrate out point, suppose you have a more general filter, F, a more general group_by function, G, and a more general Reduce function, R. PostgreSQL (an open source, free DBMS) allows the following SQL query over a table T:

Select R (T) From T Group_by G (T) Where F (T)

F, R, and G can be written in a general-purpose language like C or C++. A SQL engine, extended with user-defined functions and aggregates, has nearly — if not all — of the generality of MapReduce.

As such, we claim that most things that are possible in MapReduce are also possible in a SQL engine. Hence, it is exactly appropriate to compare the two approaches. We are working on a more complete paper that demonstrates the relative performance and relative programming effort between the two approaches, so, stay tuned.

Feedback No. 2: MapReduce has excellent scalability; the proof is Google’s use

Many readers took offense at our comment about scaling and asserted that since Google runs MapReduce programs on 1,000s (perhaps 10s of 1,000s) of nodes it must scale well. Having started benchmarking database systems 25 years ago (yes, in 1983), we believe in a more scientific approach toward evaluating the scalability of any system for data intensive applications.

Consider the following scenario. Assume that you have a 1 TB data set that has been partitioned across 100 nodes of a cluster (each node will have about 10 GB of data). Further assume that some MapReduce computation runs in 5 minutes if 100 nodes are used for both the map and reduce phases. Now scale the dataset to 10 TB, partition it over 1,000 nodes, and run the same MapReduce computation using those 1,000 nodes. If the performance of MapReduce scales linearly, it will execute the same computation on 10x the amount of data using 10x more hardware in the same 5 minutes. Linear scaleup is the gold standard for measuring the scalability of data intensive applications. As far as we are aware there are no published papers that study the scalability of MapReduce in a controlled scientific fashion. MapReduce may indeed scale linearly, but we have not seen published evidence of this.

Feedback No. 3: MapReduce is cheap and databases are expensive

Every organization has a “build” versus “buy” decision, and we don’t question the decision by Google to roll its own data analysis solution. We also don’t intend to defend DBMS pricing by the commercial vendors. What we wanted to point out is that we believe it is possible to build a version of MapReduce with more functionality and better performance. Pig is an excellent step in this direction.

Also, we want to mention that there are several open source (i.e., free) DBMSs, including PostgreSQL, MySQL, Ingres, and BerkeleyDB. Several of the aforementioned parallel DBMS companies have increased the scale of these open source systems by adding parallel computing extensions.

A number of individuals also commented that SQL and the relational data model are too restrictive. Indeed, the relational data model might very well be the wrong data model for the types of datasets that MapReduce applications are targeting. However, there is considerable ground between the relational data model and no data model at all. The point we were trying to make is that developers writing business applications have benefited significantly from the notion of organizing data in the database according to a data model and accessing that data through a declarative query language. We don’t care what that language or model is. Pig, for example, employs a nested relational model, which gives developers more flexibility that a traditional 1NF doesn’t allow.

Feedback No. 4: We are the old guard trying to defend our turf/legacy from the young turks

Since both of us are among the “gray beards” and have been on this earth about 2 Giga-seconds, we have seen a lot of ideas come and go. We are constantly struck by the following two observations:

How insular computer science is. The propagation of ideas from sub-discipline to sub-discipline is very slow and sketchy. Most of us are content to do our own thing, rather than learn what other sub-disciplines have to offer.

How little knowledge is passed from generation to generation. In a recent paper entitled “What goes around comes around,” (M. Stonebraker/J. Hellerstein, Readings in Database Systems 4th edition, MIT Press, 2004) one of us noted that many current database ideas were tried a quarter of a century ago and discarded. However, such pragma does not seem to be passed down from the “gray beards” to the “young turks.”  The turks and gray beards aren’t usually and shouldn’t be adversaries.

Thanks for stopping by the “pasture” and reading this post. We look forward to reading your feedback, comments and alternative viewpoints.

You Might Also Like