In this article, I will explain how to implement joins during the Map-phase in Hadoop Map-Reduce applications using MapFiles.
Background
In my previous article, Implementing Joins in Hadoop Map-Reduce, I presented the concept of Joining data from different sources in Hadoop, and presented the technique to perform joins during both the Map-phase and Reduce-phase.
This time round, I will discuss an alternate technique of joining during the Map-phase: Joining using MapFiles
. Joining during Map-phase is faster, and using MapFiles
overcomes the limitations of joining using Cache files during Map-phase of a Map-Reduce application.
Sequence Files and MapFiles
MapFiles
are a type of Sequence Files in Hadoop that support random access to data stored in the Sequence File. Before moving further, I will explain Sequence Files.
Sequence Files in Hadoop are written and read directly to Objects (instead of requiring to read line by line into Text
, then and populating the Class instances in the Mapper
class). Sequence Files are written using the SequenceFileInputFormat
and are written using the SequenceFileOutputFormat
. Sequence Files are stored in binary format, and the records stored in the Sequence Files have sync markers every few records, due to which the Sequence Files are also splittable. In addition to this, Sequence Files support record level compression and block level compression. The latter has sync marker between each compressed block. Please take a look at this link for more details on Sequence Files.
Now coming back to MapFile
s.
There are two parts of a MapFile
:
1. Data
Data
file is the actual sequence file containing the data as key value pairs, and is sorted by key. The fact that the Data
file is sorted by key is ensured by the MapFile.Writer
which throws IOException
when MapFile.Writer.append
method is called with an out of order key.
2. Index
Index
file is the smaller sequence file that contains some (or all) of the keys of the Data
file, in a sorted order, with byte offset of the position of data against each key within the Data
file. This Index file is used to look-up the data.
The Index
file is looked up for the key when a lookup is performed. If the key is found, data is extracted from the Data
file using the byte offset, and if the exact key is not found, the offset of the previous key (keys are sorted) is used to go to that offset in the Data
file, which is then sequentially traversed to find the key, and the value against it is returned.
The number of keys in the Index
file is configurable, and can be set using the setIndexInterval()
method which tells the number of keys to be skipped between each key in the Index
file. However, care should be taken since a very small value can result in a large Index
file, which is not ideal for Map-phase joins.
For using MapFiles
, the key class should inherit from WritableComparable
to enable sorting of the keys.
MapFiles
can be also be used as input to Map-Reduce programs, using the SequenceFileInputFormat
, which ignores the Index file and sequentially reads the Data file.
Please visit this page to learn more about MapFiles
.
Advantages of Joining during Map-phase
At this stage, it is important to discuss the advantages and disadvantages of joining during the Map-phase instead of Reduce-phase of Map-Reduce applications. It has the disadvantage that the files to be joined should be small enough to fit in memory, but using MapFiles
overcomes this limitation since MapFiles
are splittable, can be compressed, and depending on the IndexInterval
value, they are relatively small.
Joining during the Map-phase is faster since it skips the sort and shuffle phases, which are costly, and the reduce phase. For jobs that only join the data supplied to it, such jobs are faster if they are Map-only.
In addition to this, since MapFiles
, being a type of Sequence Files, are compressible as previously mentioned, which further reduces their size, making them suitable alternative of Cache files when the data to be joined is comparatively large. This allows to avail the advantages of Map-phase joins despite larger data files. In addition to this, since MapFiles
are sorted, the lookup of keys is very fast, and this fact makes MapFiles
ideal for joining data quickly.
Scenario Implemented in This Example
I shall use the same data files that were used in the previous article, i.e., Microsoft's Adventure Works data. The .csv files for creating the Adventure Works Database along with the script file can be downloaded from this link.
Download the Adventure Works 2012 OLTP Script. In this example, the .csv files, SalesOrderDetail.csv and Products.csv are used.
The join is between SalesOrderDetail
data and Products
data, and with each record in SalesOrderDetail.csv file, the Product
name is output. It can be seen as the following SQL code:
SELECT SalesOrderId, OrderQty, Product.Name, LineTotal
FROM SalesOrderDetail join Products on SalesOrderDetail.ProductId = Products.ProductId
It is clear from the SQL code above that the job in this example is a Map-only job.
Initial Project Setup
I will be using Eclipse with Maven to create the Hadoop Map-Reduce project. The process of setting up the Maven plug-in in Eclipse is outside the scope of this article.
- Open Eclipse, select New Project, and select Maven Project. Click Next.
- Select the check box. Create a Simple Project.
- Name the project. I am using the following:
Group Id: com.example.mapfileexample
Artifact Id: MapFileExample
Name: MapFileExample
- Add the Hadoop dependencies. In Eclipse, in the project structure (usually on the left), select the pom.xml file, in the Dependencies tab, select Add button and enter the following values:
Group Id: org.apache.hadoop
Artifact id: hadoop-client
Version: 2.5.1
- Create the class named
Driver
, which is the main class of this application. Paste the following code in the class:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile.Writer.Option;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Driver extends Configured implements Tool {
public int run(String[] allArgs) throws Exception {
return 0;
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
int output = ToolRunner.run(new Driver(), args);
}
}
Code to Create and Populate a MapFile
MapFile
can be created by using the MapFile.Writer
. In this example, I will read the Products.csv file line by line using FileInputStream
, and output it to a MapFile
with ProductId
(first column) as key and Name
(second column) as value. Note that this part of creating the MapFile
is outside map-reduce, but MapFile
can be the output of a MapReduce
job.
The function that does so is:
private static void CreateMapFile(Configuration conf, String inputFilePath,
String outputFilePath, int keyIndex, int valueIndex) throws IOException {
Path outputLocation = new Path(outputFilePath);
Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
SequenceFile.Writer.Option valueClass = MapFile.Writer.valueClass(Text.class);
MapFile.Writer.setIndexInterval(conf, 1);
MapFile.Writer writer = new MapFile.Writer(conf, outputLocation, keyClass, valueClass);
File file = new File(inputFilePath);
FileInputStream fis = new FileInputStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line;
br.readLine();
int i = 0;
while ((line = br.readLine()) != null){
String[] lineItems = line.split("\\t");
IntWritable key = new IntWritable(Integer.parseInt(lineItems[keyIndex]));
Text value = new Text(lineItems[valueIndex]);
writer.append(key, value);
i++;
}
br.close();
writer.close();
}
As can be seen from the code, the function takes the input file path, and reads the Products.csv file in which values are tab separated. We set the IndexInterval
equal to 1
because it is a small file, and we can have a large number of keys in the Index file.
The MapFile.Writer
constructor takes the Configuration
instance, location where the file is to be created, and key class and value class wrapped in instances of org.apache.hadoop.io.SequenceFile.Writer.Option
. Note that in the statement:
Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
the Option
class is org.apache.hadoop.io.SequenceFile.Writer.Option
which derives from org.apache.hadoop.io.SequenceFile.Writer.Option
class.
Here, keys of the MapFile
are instances of IntWritable
, and values are instances of Text
. Next, BufferedReader
is created and the file is read line by line. Each line is split on the Tab (\t)
character, and first column is picked as key
, and second (name column) is picked as value
. Last step is to call MapFile.Writer.append
method to append keys and values to the MapFile
.
The CreateMapFile
function can be called from main
as:
createMapFile(conf, "/home/hduser/Desktop/sales_data/Product.csv",
"/home/hduser/Desktop/sales_data/ProductMapFile/", 0, 1);
Note that this function is not part of the Map phase and it is not used during the execution of the job. This method should be called separately just to create the MapFile
, and although I have kept it in the Driver
class, keeping it here is not necessary.
The last step before extending the Map
class is to run the program after commenting out the call to ToolRunner.run
method, with the above mentioned call in the main
method. Remember to replace the source and destination file paths with the ones in your computer.
After running the file, open the destination folder to see Index and Data files (parts of the MapFile
) created.
Mapper
In the Mapper
, the created Map
file is read, and is used to lookup using Product Ids to get Product Names. These Product Names are then outputted from the map function. The code of Mapper
class called MapFileExampleMapper
is:
public static class MapFileExampleMapper extends Mapper<LongWritable, Text, NullWritable, Text>
{
private MapFile.Reader reader = null;
public void setup(Context context) throws IOException{
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path dir = new Path(context.getCacheFiles()[0]);
this.reader = new MapFile.Reader(fs, dir.toString(), conf);
}
private Text findKey(IntWritable key) throws IOException{
Text value = new Text();
this.reader.get(key, value);
return value;
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
String[] values = value.toString().split("\\t");
IntWritable productId = new IntWritable(Integer.parseInt(values[4]));
Text productName = findKey(productId);
context.write(NullWritable.get(), new Text(values[0] + "," +
values[3] + ", " + productName.toString() + " , " + values[8]));
}
public void cleanup(Context context) throws IOException{
reader.close();
}
}
The MapFile
location set in Configuration
is retrieved in the setup method of the Mapper
class. The setup
method is called once for each Map
task. Here, the MapFile
is read using the MapFile.Reader
instance, which provides the get
method to retrieve the value for keys.
Next, during each call of the map method of the Mapper
class, the Product Id (fourth column in SalesOrderDetail.csv file) is passed to the findKey
method, which uses MapFile.Reader.get
method to search for the value against each key that is passed. The retrieved names of Products
become part of the output of the Mapper
class.
run Method
Next step is the run
method, which sets the number of reducers equal to zero since it is a map-only job. Also don't forget to remove the call to CreateMapFile
method in main
if it was previously run from there.
The code for run
method is:
public int run(String[] allArgs) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(Driver.class);
job.setMapperClass(MapFileExampleMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
URI[] cacheFileURI = { new URI(args[1]) };
job.setCacheFiles(cacheFileURI);
job.setNumReduceTasks(0);
boolean result = job.waitForCompletion(true);
if (result) {
return 1;
} else {
return 0;
}
}
It sets the path of the passed MapFile
as cache file, which is used in the Mapper
class. Finally, it's time to compile and run the job.
Running the Job in Local Mode
For running the job in local mode from within Eclipse:
Running the Job on the Cluster
First, create folder in the Hadoop file system for items related to this example. I will call the folder mapfileexample:
hadoop fs -mkdir /hduser/mapfileexample/
Next, copy the SalesOrderDetail.csv
to it using copyFromLocal
command:
hadoop fs -copyFromLocal /home/hduser/Desktop/sales_data/SalesOrderDetail.csv
/hduser/mapfileexample/
Next, copy the MapFile
:
hadoop fs -mkdir /hduser/mapfileexample/ProductMapFile
hadoop fs -copyFromLocal /home/hduser/Desktop/sales_data/ProductMapFile/*
/hduser/mapfileexample/ProductMapFile/
And then, run the job:
hadoop jar /home/hduser/Desktop/sales_data/MapFileExample-0.0.1-SNAPSHOT.jar Driver
/hduser/mapfileexample/SalesOrderDetail.csv /hduser/mapfileexample/ProductMapFile
/hduser/mapfileexample/output1
Finally, copy the output to local file system, and examine the output. Run the following:
hadoop fs -copyToLocal /hduser/mapfileexample/output1/*
/home/hduser/Desktop/sales_data/output/fromCluster/
Open the part-m-00000
file, and see that the name of each Product
is present in each row of the resultant output.
Final Code
This is what the code (Driver.java file) looks like in the end:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile.Writer.Option;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Driver extends Configured implements Tool {
public static class MapFileExampleMapper
extends Mapper<LongWritable, Text, NullWritable, Text>
{
private MapFile.Reader reader = null;
public void setup(Context context) throws IOException{
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path dir = new Path(context.getCacheFiles()[0]);
this.reader = new MapFile.Reader(fs, dir.toString(), conf);
}
private Text findKey(IntWritable key) throws IOException{
Text value = new Text();
this.reader.get(key, value);
return value;
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
String[] values = value.toString().split("\\t");
IntWritable productId = new IntWritable(Integer.parseInt(values[4]));
Text productName = findKey(productId);
context.write(NullWritable.get(), new Text(values[0] + "," +
values[3] + ", " + productName.toString() + " , " + values[8]));
}
public void cleanup(Context context) throws IOException{
reader.close();
}
}
public int run(String[] allArgs) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(Driver.class);
job.setMapperClass(MapFileExampleMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
URI[] cacheFileURI = { new URI(args[1]) };
job.setCacheFiles(cacheFileURI);
job.setNumReduceTasks(0);
boolean result = job.waitForCompletion(true);
if (result)
{
return 1;
}
else
{
return 0;
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
int output = ToolRunner.run(new Driver(), args);
}
private static void CreateMapFile(Configuration conf,
String inputFilePath, String outputFilePath,
int keyIndex, int valueIndex) throws IOException
{
Path outputLocation = new Path(outputFilePath);
Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
SequenceFile.Writer.Option valueClass = MapFile.Writer.valueClass(Text.class);
MapFile.Writer.setIndexInterval(conf, 1);
MapFile.Writer writer =
new MapFile.Writer(conf, outputLocation, keyClass, valueClass);
File file = new File(inputFilePath);
FileInputStream fis = new FileInputStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line;
br.readLine();
int i = 0;
while ((line = br.readLine()) != null){
String[] lineItems = line.split("\\t");
IntWritable key = new IntWritable(Integer.parseInt(lineItems[keyIndex]));
Text value = new Text(lineItems[valueIndex]);
writer.append(key, value);
i++;
}
br.close();
writer.close();
}
}
Conclusion
This article explains how to perform Map-phase joins using MapFiles
. It is often useful to perform joins during the Map-phase instead of Reduce-phase to make the jobs faster. Joining using the MapFiles
has the advantage that MapFiles
are splittable, they can be compressed, and index file is small. All these factors make MapFiles
suitable for joining larger amount of data during the Map-phase.
This article is the continuation of my previous article on performing joins in Hadoop Map-Reduce jobs, and as an exercise, this technique can be applied to the example of the previous article.
Feedback, suggestions, corrections are welcome.
History
- 16th March, 2015: Initial version