Introduction
The MapReduce algorithm was made famous by Google when
Larry Paige and Sergei Brin used it to effectively rank all Web pages by
popularity by counting the number of total hyperlinks that pointed to them. But it
is well known that the roots of the Map
and
Reduce
functions can be found in Haskell and other functional
programming languages, and that their respective functionalities as
anamorphisms and catamorphisms in category theory were recognized an even
longer time ago. The US Patent Office even granted Google a
patent for the algorithm,
following four rejections. Two database experts,
David DeWitt and Michael Stonebraker teamed up in
2008 to
proclaim that they were
“amazed at the hype that the MapReduce
proponents have spread about how it represents a paradigm shift in the
development of scalable, data-intensive applications”. And even that
“(MapReduce) represents a specific implementation of well known techniques
developed nearly 25 years ago; missing most of the features that are routinely
included in current DBMS”.Background
On the one
hand, the amount of data mining applications based on MapReduce (as evidenced
by the popularity of Apache’s Hadoop framework which is an Open Source
implementation of MapReduce) is undeniable. On the other hand, there is very
little on the Web to support the claim that such techniques are well known in
the database community. Bolstered by my students at Boston University’s
Metropolitan College, where I regularly lecture (torture them) on Cloud Computing,
and by recent NewSQL (SQL + NoSQL) database startups claims that they can do
MapReduce natively, I set out to investigate DeWitt’s and Stonebreaker’s claim
that MapReduce was a well-known technique in the database community. If that is
the case, I said to myself, surely I could implement MapReduce in SQL (in T-SQL
or PL/SQL) and apply it to the typical word count exemplar of ranking the most
frequently occurring words in a large body of text such as the Iliad. Jean-Pierre
Dijcks of Oracle wrote a blog on implementing MapReduce in Oracle PL/SQL by
using pipelined Table Functions, which are functions that can appear in a FROM
clause and thus functions as a table returning a stream of rows. Table
functions are special functions in PL/SQL because they return rows of data,
which is not typical of PL/SQL stored procedures and user-defined functions.
Table functions are embedded in the data flow and, when pipelined, allow data
to be streamed to a SQL statement avoiding intermediate materialization in most
cases. In SQL Server’s T-SQL however, returning rows of data is the standard return
type for stored procedures, and so a MapReduce T-SQL implementation should
theoretically be even more straightforward. This article describes this
implementation. After I introduce the
Map and Reduce functions as user-defined functions, I run the typical MapReduce
word count exemplar on a few sentences and then on the Iliad, proceeding in
typical BigData fashion (testing with a small amount of data before proceeding
to BigData amounts). I benchmark a functional approach and compare it to a more
imperative approach that uses fast forward cursors on the data. I conclude with
some benchmarks and a discussion about parallelization where SQL MapReduce is
decomposed into individual, independent workloads and spread across many
servers connected as a cluster so that they can be performed in parallel.
SQL MapReduce
We start our T-SQL script by defining the basic word_t
data type that we’re going to use, equivalent to Hadoop’s Text
data type, and
Table-based data types which return tables in T-SQL.
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_user_defined = 1 AND name = 'word_t')
CREATE TYPE word_t FROM nvarchar(32) NOT NULL;
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_user_defined = 1 AND name = 'words_t')
CREATE TYPE words_t FROM nvarchar(max) NOT NULL;
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'wordcnts_t')
CREATE TYPE wordcnts_t AS TABLE ( words nvarchar(4000) NOT NULL, countofwords int );
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DocumentsTable')
CREATE TYPE DocumentsTable AS TABLE (Content nvarchar(max) NOT NULL)
GO
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DocumentsWithIndexTable')
CREATE TYPE DocumentsWithIndexTable AS TABLE (Indx int, Content nvarchar(max) NOT NULL)
GO
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DocumentsWithAutoIncrementIndexTable')
CREATE TYPE DocumentsWithAutoIncrementIndexTable AS TABLE (
P_Id int PRIMARY KEY IDENTITY, Content nvarchar(max) NOT NULL)
GO
Note that we define the word type word_t
as an nvarchar(32), however it can easily be generalized to an
nvarchar(MAX)
. Since
our sentence data type is an nvarchar(MAX)
, we picked a smaller length for the word type. The last data type, DocumentsWithAutoIncrementIndexTable
, will help us add
content to a table that contains an identity primary index without worrying about specifying the monotonically increasing indexes. We then define the SQL
mapper as a user-defined function that returns the Table-based data type which we just defined.
create function dbo.mapper(@documents DocumentsWithAutoIncrementIndexTable READONLY, @sep nvarchar)
returns @t TABLE ( wordindex int, word word_t )
WITH EXECUTE AS CALLER
This is the body of the map function we will use to
output rows of words from rows of sentences comprising many words. All the
T-SQL variables below identified by the preceding “@’ sign are integer
variables, except for @document which is a space-separated concatenation of
words and @word which is a single word data type.
While EXISTS(SELECT * From @documents WHERE @lastidprocessed < P_Id)
Begin
Select Top 1 @id = P_Id, @document = Content From @documents WHERE @lastidprocessed < P_Id
set @istart = 1;
set @len = LEN(@document);
While (@istart <= @len)
Begin
set @pos = CHARINDEX ( @sep ,@document, @istart )
if (@pos = 0)
begin
set @word = SUBSTRING(@document, @istart, @len);
insert into @t values ( @wordcounter, @word )
set @istart = @len + 1;
set @wordcounter = @wordcounter + 1;
end
else
begin
set @word = SUBSTRING(@document, @istart, @pos - @istart);
insert into @t values ( @wordcounter, @word )
set @istart = @pos + 1;
set @wordcounter = @wordcounter + 1;
end
End
set @lastidprocessed = @id
End
We select all rows of the table of type DocumentsWithAutoIncrementIndexTable
that is the first parameter of our mapper function, where the index of the
table, P_Id
is larger than the value of the last processed index, and then we
select the first row of that set. This construct allows us to process batches
of rows while remaining “functional” with SELECT
statements. The other option
would be to use cursors, which are more of an imperative construct.
Conventional wisdom is to remain as functional as possible because set based
operations perform faster than imperative constructs such as cursors and use
simpler code. However, we will see later on that T-SQL Fast forward cursors,
which fetch results of a query one-at-a-time, or in batches of N-at-a-time, and
optimize the query plan for performance are actually faster than fetching
batches of rows and then selecting the topmost row functionally with SELECT
statements.
The body of the mapper function is pretty
straightforward: it inserts into return table @t
each word in the sentence,
together with a monotonically increasing counter. It segments the sentence into
words by looking for the word separator character @sep
, which is the second
parameter of our mapper function.
Now, for the reduce function, which will essentially emit
words and the number of times they're seen. Its signature is:
create function dbo.reducer(@words DocumentsWithAutoIncrementIndexTable READONLY)
returns @t TABLE ( word word_t, wordcount int )
WITH EXECUTE AS CALLER
And its body is, with @word
and @previousword
being the only
word_t
variables (the other “@” variables are all integer typed):
While EXISTS(SELECT * From @words WHERE @lastidprocessed < P_Id)
Begin
Select Top 1 @id = P_Id, @word = Content From @words WHERE @lastidprocessed < P_Id
if (@word <> @previousword)
begin
If (@previousword <> '')
begin
insert into @t values ( @previousword, @wordoccurrences );
end
set @wordoccurrences = 1;
end
else
begin
set @wordoccurrences = @wordoccurrences + 1;
end
set @previousword = @word;
set @lastidprocessed = @id
End
insert into @t values ( @previousword, @wordoccurrences );
Return
Essentially, the reduce function iterates over words and
if the word is the same as the previous word, it increments a counter and keeps
incrementing until the next word is different, in which case it inserts the
previous word and its occurrence count in the @t
return table. Now, this logic
will only work if the sequence of words are alphabetically sorted so that
identical words occur next to each other (actually the alphabetic sort is
overkill, we just need identical words to be listed successively). And this is
the magic ingredient of the MapReduce recipe: There needs to be a sort that
happens in the flow between the mapper and reducer. Hadoop sorts all key/value
pairs by key as they’re output by a pool of mappers, and then feeds them sorted
into a pool of reducers. Similarly, we are going to use a SQL built-in function
to do the sort: The hallowed order by
function.
And so this is how our MapReduce word count
implementation will look like in SQL, using the user-defined mapper and reducer
functions we defined above, and three short sentences
DECLARE @myDocuments DocumentsWithAutoIncrementIndexTable
INSERT INTO @myDocuments VALUES(
INSERT INTO @myDocuments VALUES(
INSERT INTO @myDocuments VALUES(
DECLARE @myWords DocumentsWithAutoIncrementIndexTable
INSERT INTO @myWords(Content) SELECT word FROM dbo.mapper(@myDocuments,
select * from dbo.reducer( @myWords ) order by wordcount desc;
Go
The first four lines insert sentences into the @myDocuments table. We then feed this table
into our mapper and specify the space character as the word separator. The
resulting table is then sorted in alphabetically increasing order using the order by word
asc
SQL statement. Finally, that table is passed to the reducer, who counts the
words and orders them from the most frequent to the least frequent. The result
is
the 4
brown 3
dog 2
red 2
fox 2
her 1
hood 1
in 1
jumps 1
lazy 1
like 1
Little 1
looks 1
over 1
quick 1
riding 1
through 1
walks 1
dress 1
forest 1
a 1
This is exactly what we would expect with a typical word
count application. It would be straightforward to generalize our mapper and
reducer user-defined functions so that relational data can be rearranged
meaningfully in the user-defined mapper, output in table format, sorted by one
or more table fields by the SQL order by built-in function, and meaningfully
aggregated in the reducer to return a result set. That is exactly what Hadoop
MapReduce does, except that instead of operating on tables it operates on
key/value pairs. The SQL MapReduce operation we just performed is a typical
“pivot” operation that takes data sorted by one dimension such as sales by
geographic areas and aggregates the results by another dimension such as
quarterly sales by products. This is a standard database data mining technique,
and so we can reasonably conclude that the concept of MapReduce, two user
defined functions that respectively create and then aggregate data, with an
intermediating sort, is indeed a borrowed concept from the database community. Does
Google deserve a patent for their MapReduce? Probably not if one bases it on
the functionality, but it is a different story if one bases it on the data
environment: Google had its data (Web pages and their hyperlinks) stored on a
cluster of machines and a massively distributed file system (reference here). To
mine the hyperlinks, Google sorely needed a sort operation, and the distributed
sort that happens in the flow between the mapper and reducer on a distributed
file system is the magic ingredient of Google’s MapReduce and Apache’s Hadoop.
So, as DeWitt and Stonebreaker assert, MapReduce does use the same well-known
technique developed many years ago in the database community. Our SQL MapReduce
word count prototype proves this without a doubt. But the real innovation by
Google is that the MapReduce intervening sort (as well as the Map and Reduce
user-defined operations) happens over datasets physically spread across a
network. Databases, up until recently, were never physically distributed across
a network (certainly not 25 years ago) and SQL operations such as sorts and
joins would typically occur with data contained in memory. Google’s MapReduce
and Apache’s Hadoop in contrast allow users to mine data that is massively
distributed and whose size may not necessarily fit in a single computer’s
memory but can be spilled onto disk and effectively computed upon. That was and
continues to be an achievement probably worth a patent.
SQL MapReducing the Iliad
I promised you, dear reader, that I would MapReduce the
Iliad. Moreover, MapReducing the Iliad will give us a good benchmark to compare
SQL and Hadoop MapReduce, and to segue into parallelization and performance
implications. Our mapper and reducer functions should work for any body of
text; all we need to do is import the Iliad in a database table. We use the
following C# program to import the Iliad from a text file into a database table:
try
{
DataSet dataSet = new DataSet();
DataTable dataTable = new DataTable("table1");
dataTable.Columns.Add("line", typeof(string));
dataSet.Tables.Add(dataTable);
int counter = 0;
string line;
string cleanline;
string xmlData;
System.IO.StreamReader file =
new System.IO.StreamReader(args[0]);
while((line = file.ReadLine()) != null)
{
try
{
cleanline = line.Replace("\f", "/").Replace("&", "[ampersand]");
xmlData = "<line>" + cleanline + "</line>";
System.IO.StringReader xmlSR = new System.IO.StringReader(xmlData);
dataSet.ReadXml(xmlSR, XmlReadMode.IgnoreSchema);
dataTable.Rows.Add(cleanline);
counter++;
}
catch (System.Xml.XmlException ex)
{
Console.WriteLine(ex.Message);
}
}
file.Close();
SqlConnection conn = new SqlConnection("Data Source=MM176499-PC;Initial Catalog =" +
args[1] + "; Integrated Security = SSPI;");
conn.Open();
SqlBulkCopy sbc = new SqlBulkCopy(conn);
if (0 < dataTable.Rows.Count)
{
sbc.DestinationTableName = args[2];
sbc.WriteToServer(dataSet.Tables[0]);
}
sbc.Close();
conn.Close();
}
catch (Exception exp)
{
Console.WriteLine("oops: " + exp.Message);
}
I clean up the Iliad from characters that the SqlBulkCopy
API has problems with and voila the Iliad is loaded in rows of a database
table called Lines with a single field called Line
of nvarchar(MAX)
data type. All
we need to do to run SQL MapReduce on the Iliad is:
DECLARE @myDocuments DocumentsWithAutoIncrementIndexTable
INSERT INTO @myDocuments SELECT line From Lines;
DECLARE @myWords DocumentsWithAutoIncrementIndexTable
INSERT INTO @myWords(Content) SELECT word FROM dbo.mapper(@myDocuments, ' ') order by word asc
--reducer
select * from dbo.reducer( @myWords ) order by wordcount desc;
Go
Using Fast Forward Cursors
Our mapper and reducer implementations were purely
functional, heeding common database community wisdom that cursors would slow
down performance. However, our mapper and reducer work with single rows of data.
Select statements are optimized for batches of rows to be retrieved and
consumed all at once. Cursors are built for positioned updates and scrolling,
and fast forward cursors are optimized to give the user a read-only view of the
next row of data. In other words, the query plan is optimal for such an
operation. So I decided to try those out as well. Using fast forward cursors,
our mapper and reducer main loop is re-written as follows.
For the mapper:
set @wordcounter = 1;
DECLARE SH CURSOR FAST_FORWARD FOR
SELECT P_Id, Content FROM @documents
OPEN SH
FETCH NEXT FROM SH INTO @id, @document
WHILE (@@FETCH_STATUS = 0)
BEGIN
set @istart = 1;
set @len = LEN(@document);
While (@istart <= @len)
Begin
set @pos = CHARINDEX ( @sep ,@document, @istart )
if (@pos = 0)
begin
set @word = SUBSTRING(@document, @istart, @len);
insert into @t values ( @wordcounter, @word )
set @istart = @len + 1;
set @wordcounter = @wordcounter + 1;
end
else
begin
set @word = SUBSTRING(@document, @istart, @pos - @istart);
insert into @t values ( @wordcounter, @word )
set @istart = @pos + 1;
set @wordcounter = @wordcounter + 1;
end
End
FETCH NEXT FROM SH INTO @id, @document
END
CLOSE SH
DEALLOCATE SH
Return
For the reducer:
set @wordoccurrences = 1;
set @previousword = '';
DECLARE SH CURSOR FAST_FORWARD FOR
SELECT P_Id, Content FROM @words
OPEN SH
FETCH NEXT FROM SH INTO @id, @word
WHILE (@@FETCH_STATUS = 0)
BEGIN
if (@word <> @previousword)
begin
If (@previousword <> '')
begin
insert into @t values ( @previousword, @wordoccurrences );
end
set @wordoccurrences = 1;
end
else
begin
set @wordoccurrences = @wordoccurrences + 1;
end
set @previousword = @word;
FETCH NEXT FROM SH INTO @id, @word
END
insert into @t values ( @previousword, @wordoccurrences );
Return
The sharding, map, sort, and reduce data flow remains the
same as the functional version (the one that does not use cursors).
Benchmarks
We now benchmark our T-SQL MapReduce using both
functional and cursor-based T-SQL variants against a typical Hadoop MapReduce.
We use the Iliad dataset in all cases, and implement the following typical Java
MapReduce word count implementation.
package METCS751.iliad;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Hadoop version 1.1.1 in standalone (not parallel) mode
takes 10 seconds to count all the words of the Iliad (a little over 1 MB of
ASCII text), our functional T-SQL implementation takes 1 min 11 seconds, while
our fast forward cursor-based implementation takes 32 seconds on a SQLServer
2008 R2 instance running on a DELL 2.5 GHz Core Duo Windows 7 laptop with 4
Gigs of memory and a 3.9 Windows Experience Index. Fast forward cursors are
indeed considerably faster than our SELECT-based functional implementation, and
Hadoop is faster than SQL MapReduce in general. SQL is a fantastic language,
and probably the biggest “bang for the buck” of all functional languages, but
one has to pay a price for the overhead of running that engine (nevertheless,
deconstructing SQL queries into MapReduce operations in order to afford users
the luxury of using SQL as their data mining language is something that is
available today with open source frameworks such as HIVE/HiveQL and PIG).
Hadoop MapReduce is faster on its HDFS filesystem than
database table-based SQL implementations of the same functionality. Parallel
implementations of SQL MapReduce on network-segmented databases would probably
be equally slower than running Hadoop in fully distributed mode on a
distributed instance of HDFS on multiple networked nodes. So DeWitt and Stonebreaker
can evangelize to their heart’s content about how the technique was invented in
the database community, and Google would probably not deny this. However in
terms of performance, parallelization potential on massively distributed file
systems, and thus the ability to data mine “oogles” of data, the contribution
of Google’s MapReduce and Apache’s Hadoop, specifically for how the data is
stored, sorted, and accessed through the network, is undeniable and probably
worth the patent they were granted.
Conclusion
In conclusion, both parties are right: Yes, MapReduce is
in fact a technique that is easily and thus was probably routinely implemented in
database mining applications. But Larry Paige and Sergei Brin applied it in the
new context of a massively distributed file system and the amount of resulting
parallelization was unprecedented and allowed MapReduce to shine as an
effective high-performance large-scale data processing methodology even more. There
will probably be database companies that continue to claim they can run MapReduce
in their version of SQL, but the highest performance MapReduce will probably give
will consist of streaming data to a form of Apache’s Hadoop instead of a native
functional version. In fact, there are various articles about using the Oracle
JVM to run a Hadoop instance in order to MapReduce Oracle table data. In the end, the feud boils down to two
distinct data storage philosophies: whether data is better stored hierarchically
as with most modern file systems, or relationally as with most modern databases.
And in reality, it’s about how data storage APIs are presented to producers and
consumers; because we all well know that all data, whether in databases or not,
eventually gets saved on OS file systems, which are a combination of
hierarchical and relational structures: folders of sub-folders and hard disk
space are relationally linked sector by sector.
History
First release: In order to upload Iliad to the database and to MapReduce it with
Hadoop, you need to download Iliad in plain text format from here, and add it to the C# executable's
Bin/Debug folder, and add it in an input directory on Hadoop 1.1.1 and run MapReduce with the following
Cygwin command lines:
javac -classpath hadoop-core-1.1.1.jar -d wordcount_classes WordCount.java
jar cvf wordcount.jar -C wordcount_classes/ .
bin/hadoop jar wordcount.jar METCS751.iliad.WordCount input output3
I've added Iliad as a download
for convenience.