Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / database / SQL-Server

T-SQL MapReduce

4.89/5 (8 votes)
28 Jul 2013CPOL13 min read 42.9K   786  
Running MapReduce in a SQL Server database.

Introduction

The MapReduce algorithm was made famous by Google when Larry Paige and Sergei Brin used it to effectively rank all Web pages by popularity by counting the number of total hyperlinks that pointed to them. But it is well known that the roots of the Map and

Reduce
functions can be found in Haskell and other functional programming languages, and that their respective functionalities as anamorphisms and catamorphisms in category theory were recognized an even longer time ago. The US Patent Office even granted Google a patent for the algorithm, following four rejections. Two database experts, David DeWitt and Michael Stonebraker teamed up in 2008 to proclaim that they were “amazed at the hype that the MapReduce proponents have spread about how it represents a paradigm shift in the development of scalable, data-intensive applications”. And even that “(MapReduce) represents a specific implementation of well known techniques developed nearly 25 years ago; missing most of the features that are routinely included in current DBMS”.

Background

On the one hand, the amount of data mining applications based on MapReduce (as evidenced by the popularity of Apache’s Hadoop framework which is an Open Source implementation of MapReduce) is undeniable. On the other hand, there is very little on the Web to support the claim that such techniques are well known in the database community. Bolstered by my students at Boston University’s Metropolitan College, where I regularly lecture (torture them) on Cloud Computing, and by recent NewSQL (SQL + NoSQL) database startups claims that they can do MapReduce natively, I set out to investigate DeWitt’s and Stonebreaker’s claim that MapReduce was a well-known technique in the database community. If that is the case, I said to myself, surely I could implement MapReduce in SQL (in T-SQL or PL/SQL) and apply it to the typical word count exemplar of ranking the most frequently occurring words in a large body of text such as the Iliad. Jean-Pierre Dijcks of Oracle wrote a blog on implementing MapReduce in Oracle PL/SQL by using pipelined Table Functions, which are functions that can appear in a FROM clause and thus functions as a table returning a stream of rows. Table functions are special functions in PL/SQL because they return rows of data, which is not typical of PL/SQL stored procedures and user-defined functions. Table functions are embedded in the data flow and, when pipelined, allow data to be streamed to a SQL statement avoiding intermediate materialization in most cases. In SQL Server’s T-SQL however, returning rows of data is the standard return type for stored procedures, and so a MapReduce T-SQL implementation should theoretically be even more straightforward. This article describes this implementation. After I introduce the Map and Reduce functions as user-defined functions, I run the typical MapReduce word count exemplar on a few sentences and then on the Iliad, proceeding in typical BigData fashion (testing with a small amount of data before proceeding to BigData amounts). I benchmark a functional approach and compare it to a more imperative approach that uses fast forward cursors on the data. I conclude with some benchmarks and a discussion about parallelization where SQL MapReduce is decomposed into individual, independent workloads and spread across many servers connected as a cluster so that they can be performed in parallel.

SQL MapReduce

We start our T-SQL script by defining the basic word_t data type that we’re going to use, equivalent to Hadoop’s Text data type, and Table-based data types which return tables in T-SQL.

SQL
IF NOT EXISTS (SELECT * FROM sys.types WHERE is_user_defined = 1 AND name = 'word_t')
CREATE TYPE word_t FROM nvarchar(32) NOT NULL;

IF NOT EXISTS (SELECT * FROM sys.types WHERE is_user_defined = 1 AND name = 'words_t')
CREATE TYPE words_t FROM nvarchar(max) NOT NULL;

IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'wordcnts_t')
CREATE TYPE wordcnts_t AS TABLE ( words nvarchar(4000) NOT NULL, countofwords int );

IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DocumentsTable')
CREATE TYPE DocumentsTable AS TABLE (Content nvarchar(max) NOT NULL)
GO

IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DocumentsWithIndexTable')
CREATE TYPE DocumentsWithIndexTable AS TABLE (Indx int, Content nvarchar(max) NOT NULL)
GO

IF NOT EXISTS (SELECT * FROM sys.types WHERE is_table_type = 1 AND name = 'DocumentsWithAutoIncrementIndexTable')
CREATE TYPE DocumentsWithAutoIncrementIndexTable AS TABLE (
       P_Id int PRIMARY KEY IDENTITY, Content nvarchar(max) NOT NULL)
GO

Note that we define the word type word_t as an nvarchar(32), however it can easily be generalized to an nvarchar(MAX). Since our sentence data type is an nvarchar(MAX), we picked a smaller length for the word type. The last data type, DocumentsWithAutoIncrementIndexTable, will help us add content to a table that contains an identity primary index without worrying about specifying the monotonically increasing indexes. We then define the SQL mapper as a user-defined function that returns the Table-based data type which we just defined.

SQL
create function dbo.mapper(@documents DocumentsWithAutoIncrementIndexTable READONLY, @sep nvarchar)

returns @t TABLE ( wordindex int, word word_t )

WITH EXECUTE AS CALLER 

This is the body of the map function we will use to output rows of words from rows of sentences comprising many words. All the T-SQL variables below identified by the preceding “@’ sign are integer variables, except for @document which is a space-separated concatenation of words and @word which is a single word data type.

SQL
While EXISTS(SELECT * From @documents WHERE @lastidprocessed < P_Id)
Begin
 Select Top 1 @id = P_Id, @document = Content From @documents WHERE @lastidprocessed < P_Id
 set @istart = 1; 
    set @len = LEN(@document);
 
 -- For every word within a document 
    While (@istart <= @len)
    Begin 
  --pos := instr(@document, sep, @istart); 
  set @pos = CHARINDEX ( @sep ,@document, @istart )
  if (@pos = 0)
  begin
    set @word = SUBSTRING(@document, @istart, @len); 
    insert into @t values ( @wordcounter, @word )
    set @istart = @len + 1; 
    set @wordcounter = @wordcounter + 1;
  end
  else
  begin 
    set @word = SUBSTRING(@document, @istart, @pos - @istart); 
    insert into @t values ( @wordcounter, @word )
    set @istart = @pos + 1; 
    set @wordcounter = @wordcounter + 1;
  end
 End 
  
 --Delete #tmpTable Where Indx = @id
 set @lastidprocessed = @id
End

We select all rows of the table of type DocumentsWithAutoIncrementIndexTable that is the first parameter of our mapper function, where the index of the table, P_Id is larger than the value of the last processed index, and then we select the first row of that set. This construct allows us to process batches of rows while remaining “functional” with SELECT statements. The other option would be to use cursors, which are more of an imperative construct. Conventional wisdom is to remain as functional as possible because set based operations perform faster than imperative constructs such as cursors and use simpler code. However, we will see later on that T-SQL Fast forward cursors, which fetch results of a query one-at-a-time, or in batches of N-at-a-time, and optimize the query plan for performance are actually faster than fetching batches of rows and then selecting the topmost row functionally with SELECT statements.

The body of the mapper function is pretty straightforward: it inserts into return table @t each word in the sentence, together with a monotonically increasing counter. It segments the sentence into words by looking for the word separator character @sep, which is the second parameter of our mapper function.

Now, for the reduce function, which will essentially emit words and the number of times they're seen. Its signature is:

SQL
create function dbo.reducer(@words DocumentsWithAutoIncrementIndexTable READONLY)
returns @t TABLE ( word word_t, wordcount int )
WITH EXECUTE AS CALLER

And its body is, with @word and @previousword being the only word_t variables (the other “@” variables are all integer typed):

SQL
While EXISTS(SELECT * From @words WHERE @lastidprocessed < P_Id)
Begin
 Select Top 1 @id = P_Id, @word = Content From @words WHERE @lastidprocessed < P_Id
 
 -- For every word, see if it's the same as the previous one
 if (@word <> @previousword)
 begin
  If (@previousword <> '')
  begin
   insert into @t values ( @previousword, @wordoccurrences );
  end
  set @wordoccurrences = 1;
 end
 else
 begin 
  set @wordoccurrences = @wordoccurrences + 1;
 end 
     
 set @previousword = @word;
 set @lastidprocessed = @id
End        
insert into @t values ( @previousword, @wordoccurrences );
Return

Essentially, the reduce function iterates over words and if the word is the same as the previous word, it increments a counter and keeps incrementing until the next word is different, in which case it inserts the previous word and its occurrence count in the @t return table. Now, this logic will only work if the sequence of words are alphabetically sorted so that identical words occur next to each other (actually the alphabetic sort is overkill, we just need identical words to be listed successively). And this is the magic ingredient of the MapReduce recipe: There needs to be a sort that happens in the flow between the mapper and reducer. Hadoop sorts all key/value pairs by key as they’re output by a pool of mappers, and then feeds them sorted into a pool of reducers. Similarly, we are going to use a SQL built-in function to do the sort: The hallowed order by function.

And so this is how our MapReduce word count implementation will look like in SQL, using the user-defined mapper and reducer functions we defined above, and three short sentences

VB.NET
DECLARE @myDocuments DocumentsWithAutoIncrementIndexTable
INSERT INTO @myDocuments VALUES('The quick brown fox jumps over the lazy dog');
INSERT INTO @myDocuments VALUES('The brown dog walks through the brown forest'); 
INSERT INTO @myDocuments VALUES('Little red riding hood looks like a fox in her red dress');

DECLARE @myWords DocumentsWithAutoIncrementIndexTable
INSERT INTO @myWords(Content) SELECT word FROM dbo.mapper(@myDocuments, ' ') order by word asc

select * from dbo.reducer( @myWords ) order by wordcount desc; 
Go 

The first four lines insert sentences into the @myDocuments table. We then feed this table into our mapper and specify the space character as the word separator. The resulting table is then sorted in alphabetically increasing order using the order by word asc SQL statement. Finally, that table is passed to the reducer, who counts the words and orders them from the most frequent to the least frequent. The result is

the 4
brown 3
dog 2
red 2
fox 2
her 1
hood 1
in 1
jumps 1
lazy 1
like 1
Little 1
looks 1
over 1
quick 1
riding 1
through 1
walks 1
dress 1
forest 1
a 1

This is exactly what we would expect with a typical word count application. It would be straightforward to generalize our mapper and reducer user-defined functions so that relational data can be rearranged meaningfully in the user-defined mapper, output in table format, sorted by one or more table fields by the SQL order by built-in function, and meaningfully aggregated in the reducer to return a result set. That is exactly what Hadoop MapReduce does, except that instead of operating on tables it operates on key/value pairs. The SQL MapReduce operation we just performed is a typical “pivot” operation that takes data sorted by one dimension such as sales by geographic areas and aggregates the results by another dimension such as quarterly sales by products. This is a standard database data mining technique, and so we can reasonably conclude that the concept of MapReduce, two user defined functions that respectively create and then aggregate data, with an intermediating sort, is indeed a borrowed concept from the database community. Does Google deserve a patent for their MapReduce? Probably not if one bases it on the functionality, but it is a different story if one bases it on the data environment: Google had its data (Web pages and their hyperlinks) stored on a cluster of machines and a massively distributed file system (reference here). To mine the hyperlinks, Google sorely needed a sort operation, and the distributed sort that happens in the flow between the mapper and reducer on a distributed file system is the magic ingredient of Google’s MapReduce and Apache’s Hadoop. So, as DeWitt and Stonebreaker assert, MapReduce does use the same well-known technique developed many years ago in the database community. Our SQL MapReduce word count prototype proves this without a doubt. But the real innovation by Google is that the MapReduce intervening sort (as well as the Map and Reduce user-defined operations) happens over datasets physically spread across a network. Databases, up until recently, were never physically distributed across a network (certainly not 25 years ago) and SQL operations such as sorts and joins would typically occur with data contained in memory. Google’s MapReduce and Apache’s Hadoop in contrast allow users to mine data that is massively distributed and whose size may not necessarily fit in a single computer’s memory but can be spilled onto disk and effectively computed upon. That was and continues to be an achievement probably worth a patent.

SQL MapReducing the Iliad

I promised you, dear reader, that I would MapReduce the Iliad. Moreover, MapReducing the Iliad will give us a good benchmark to compare SQL and Hadoop MapReduce, and to segue into parallelization and performance implications. Our mapper and reducer functions should work for any body of text; all we need to do is import the Iliad in a database table. We use the following C# program to import the Iliad from a text file into a database table:

C#
try
{
 // create dataset and datatable   
 DataSet dataSet = new DataSet();
 DataTable dataTable = new DataTable("table1");
 dataTable.Columns.Add("line", typeof(string));
 dataSet.Tables.Add(dataTable);
 // Read a text file line by line.
 int counter = 0;
 string line;
 string cleanline;
 string xmlData;
 System.IO.StreamReader file = 
    new System.IO.StreamReader(args[0]);
 while((line = file.ReadLine()) != null)
 {
  try
  {
   cleanline = line.Replace("\f", "/").Replace("&", "[ampersand]");
   xmlData = "<line>" + cleanline + "</line>";
   System.IO.StringReader xmlSR = new System.IO.StringReader(xmlData);
   dataSet.ReadXml(xmlSR, XmlReadMode.IgnoreSchema);
   dataTable.Rows.Add(cleanline);
   counter++;
  }
  catch (System.Xml.XmlException ex)
  {
   Console.WriteLine(ex.Message);
  }
 }
 file.Close();
 // connect to db and bulk import
 SqlConnection conn = new SqlConnection("Data Source=MM176499-PC;Initial Catalog =" + 
   args[1] + "; Integrated Security = SSPI;");
 conn.Open();
 SqlBulkCopy sbc = new SqlBulkCopy(conn);
 if (0 < dataTable.Rows.Count)
 {
  sbc.DestinationTableName = args[2];
  //sbc.WriteToServer(dataTable);
  sbc.WriteToServer(dataSet.Tables[0]);
 }
 sbc.Close();
 conn.Close();
}
catch (Exception exp)
{
 Console.WriteLine("oops: " + exp.Message); 
}

I clean up the Iliad from characters that the SqlBulkCopy API has problems with and voila the Iliad is loaded in rows of a database table called Lines with a single field called Line of nvarchar(MAX) data type. All we need to do to run SQL MapReduce on the Iliad is:

SQL
--sharding
DECLARE @myDocuments DocumentsWithAutoIncrementIndexTable
INSERT INTO @myDocuments SELECT line From Lines;
SQL
--mapper & shuffler
DECLARE @myWords DocumentsWithAutoIncrementIndexTable
INSERT INTO @myWords(Content) SELECT word FROM dbo.mapper(@myDocuments, ' ') order by word asc
--reducer
select * from dbo.reducer( @myWords ) order by wordcount desc; 
Go

Using Fast Forward Cursors

Our mapper and reducer implementations were purely functional, heeding common database community wisdom that cursors would slow down performance. However, our mapper and reducer work with single rows of data. Select statements are optimized for batches of rows to be retrieved and consumed all at once. Cursors are built for positioned updates and scrolling, and fast forward cursors are optimized to give the user a read-only view of the next row of data. In other words, the query plan is optimal for such an operation. So I decided to try those out as well. Using fast forward cursors, our mapper and reducer main loop is re-written as follows.

For the mapper:

set @wordcounter = 1;
SQL
-- cursor based loop
DECLARE SH CURSOR FAST_FORWARD FOR
 SELECT P_Id, Content FROM @documents
OPEN SH
FETCH NEXT FROM SH INTO @id, @document
WHILE (@@FETCH_STATUS = 0)
BEGIN
set @istart = 1; 
    set @len = LEN(@document);
 
 -- For every word within a document 
    While (@istart <= @len)
    Begin 
  set @pos = CHARINDEX ( @sep ,@document, @istart )
  if (@pos = 0)
  begin
    set @word = SUBSTRING(@document, @istart, @len); 
    insert into @t values ( @wordcounter, @word )
    set @istart = @len + 1; 
    set @wordcounter = @wordcounter + 1;
  end
  else
  begin 
    set @word = SUBSTRING(@document, @istart, @pos - @istart); 
    insert into @t values ( @wordcounter, @word )
    set @istart = @pos + 1; 
    set @wordcounter = @wordcounter + 1;
  end
 End  
 FETCH NEXT FROM SH INTO @id, @document
END
CLOSE SH
DEALLOCATE SH

Return

For the reducer:

SQL
set @wordoccurrences = 1;
set @previousword = '';

-- cursor based loop
DECLARE SH CURSOR FAST_FORWARD FOR
 SELECT P_Id, Content FROM @words
OPEN SH
FETCH NEXT FROM SH INTO @id, @word
WHILE (@@FETCH_STATUS = 0)
BEGIN
 -- For every word, see if it's the same as the previous one
 if (@word <> @previousword)
 begin
  If (@previousword <> '')
  begin
   insert into @t values ( @previousword, @wordoccurrences );
  end
  set @wordoccurrences = 1;
 end
 else
 begin 
  set @wordoccurrences = @wordoccurrences + 1;
 end 
     
 set @previousword = @word;
  
 FETCH NEXT FROM SH INTO @id, @word
END
insert into @t values ( @previousword, @wordoccurrences );

Return

The sharding, map, sort, and reduce data flow remains the same as the functional version (the one that does not use cursors).

Benchmarks

We now benchmark our T-SQL MapReduce using both functional and cursor-based T-SQL variants against a typical Hadoop MapReduce. We use the Iliad dataset in all cases, and implement the following typical Java MapReduce word count implementation.

Java
package METCS751.iliad;  
  
 import java.io.IOException;  
 import java.util.*;  
  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.conf.*;  
 import org.apache.hadoop.io.*;  
 import org.apache.hadoop.mapred.*;  
 import org.apache.hadoop.util.*;  
  
 public class WordCount {  
  
    public static class Map extends MapReduceBase 
                 implements Mapper<LongWritable, Text, Text, IntWritable> {  
      private final static IntWritable one = new IntWritable(1);  
      private Text word = new Text();  
  
      public void map(LongWritable key, Text value, OutputCollector<Text, 
              IntWritable> output, Reporter reporter) throws IOException {  
        String line = value.toString();  
        StringTokenizer tokenizer = new StringTokenizer(line);  
        while (tokenizer.hasMoreTokens()) {  
          word.set(tokenizer.nextToken());  
          output.collect(word, one);  
        }  
      }  
    }  
  
    public static class Reduce extends MapReduceBase implements 
             Reducer<Text, IntWritable, Text, IntWritable> {  
      public void reduce(Text key, Iterator<IntWritable> values, 
              OutputCollector<Text, IntWritable> output, 
              Reporter reporter) throws IOException {  
        int sum = 0;  
        while (values.hasNext()) {  
          sum += values.next().get();  
        }  
        output.collect(key, new IntWritable(sum));  
      }  
    }  
  
    public static void main(String[] args) throws Exception {  
      JobConf conf = new JobConf(WordCount.class);  
      conf.setJobName("wordcount");  
  
      conf.setOutputKeyClass(Text.class);  
      conf.setOutputValueClass(IntWritable.class);  
  
      conf.setMapperClass(Map.class);  
      conf.setCombinerClass(Reduce.class);  
      conf.setReducerClass(Reduce.class);  
  
      conf.setInputFormat(TextInputFormat.class);  
      conf.setOutputFormat(TextOutputFormat.class);  
  
      FileInputFormat.setInputPaths(conf, new Path(args[0]));  
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  
      JobClient.runJob(conf);  
    }  
}

Hadoop version 1.1.1 in standalone (not parallel) mode takes 10 seconds to count all the words of the Iliad (a little over 1 MB of ASCII text), our functional T-SQL implementation takes 1 min 11 seconds, while our fast forward cursor-based implementation takes 32 seconds on a SQLServer 2008 R2 instance running on a DELL 2.5 GHz Core Duo Windows 7 laptop with 4 Gigs of memory and a 3.9 Windows Experience Index. Fast forward cursors are indeed considerably faster than our SELECT-based functional implementation, and Hadoop is faster than SQL MapReduce in general. SQL is a fantastic language, and probably the biggest “bang for the buck” of all functional languages, but one has to pay a price for the overhead of running that engine (nevertheless, deconstructing SQL queries into MapReduce operations in order to afford users the luxury of using SQL as their data mining language is something that is available today with open source frameworks such as HIVE/HiveQL and PIG).

Hadoop MapReduce is faster on its HDFS filesystem than database table-based SQL implementations of the same functionality. Parallel implementations of SQL MapReduce on network-segmented databases would probably be equally slower than running Hadoop in fully distributed mode on a distributed instance of HDFS on multiple networked nodes. So DeWitt and Stonebreaker can evangelize to their heart’s content about how the technique was invented in the database community, and Google would probably not deny this. However in terms of performance, parallelization potential on massively distributed file systems, and thus the ability to data mine “oogles” of data, the contribution of Google’s MapReduce and Apache’s Hadoop, specifically for how the data is stored, sorted, and accessed through the network, is undeniable and probably worth the patent they were granted.

Conclusion

In conclusion, both parties are right: Yes, MapReduce is in fact a technique that is easily and thus was probably routinely implemented in database mining applications. But Larry Paige and Sergei Brin applied it in the new context of a massively distributed file system and the amount of resulting parallelization was unprecedented and allowed MapReduce to shine as an effective high-performance large-scale data processing methodology even more. There will probably be database companies that continue to claim they can run MapReduce in their version of SQL, but the highest performance MapReduce will probably give will consist of streaming data to a form of Apache’s Hadoop instead of a native functional version. In fact, there are various articles about using the Oracle JVM to run a Hadoop instance in order to MapReduce Oracle table data. In the end, the feud boils down to two distinct data storage philosophies: whether data is better stored hierarchically as with most modern file systems, or relationally as with most modern databases. And in reality, it’s about how data storage APIs are presented to producers and consumers; because we all well know that all data, whether in databases or not, eventually gets saved on OS file systems, which are a combination of hierarchical and relational structures: folders of sub-folders and hard disk space are relationally linked sector by sector.

History

First release: In order to upload Iliad to the database and to MapReduce it with Hadoop, you need to download Iliad in plain text format from here, and add it to the C# executable's Bin/Debug folder, and add it in an input directory on Hadoop 1.1.1 and run MapReduce with the following Cygwin command lines:

javac -classpath hadoop-core-1.1.1.jar -d wordcount_classes WordCount.java
jar cvf wordcount.jar -C wordcount_classes/ . 
bin/hadoop jar wordcount.jar METCS751.iliad.WordCount input output3

I've added Iliad as a download for convenience.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)