Introduction
Using the Mapreduce framework, Yahoo! Inc. was able to sort 1Tb of data in less than 60 seconds (1Tb sort). Unfortunately not every company should have the ability to build a thousand nodes cluster to achieve pseudo real-time querying. Assuming you have a 10 nodes cluster, which is already a good start for playing with big-data, sorting 1Tb of data should take roughly around 1 hour to complete. Given this constraint of limited nodes, it might become a need to reorganize your data on HDFS (e.g., partition, sort) and / or to implement indexes in order to improve overall performances. I am describing in this report the implementation of custom indexes based on Hadoop InputSplits that I have created, the issues I was facing, and finally what have been (if any) the added value compared to non-indexed Mapreduce jobs.
Background
Readers should understand concepts of Hadoop, HDFS and mapreduce, and should have experience in implementing basic Mappers and Reducers classes using Hadoop New API. Should you need to refresh your knowledge, the famous definitive guide from Tom White will be more than helpful.
Input Split
Once you submit a mapreduce job, JobTracker will compute the number of Map tasks that need to be executed on your data set. This number depends on the number of files available in your HDFS, the file’s size and the block’s size. Any file is actually split into one or several InputSplit
(s), which is basically a chunk of 1 or several block(s). From what I understand, each InputSplit
will get its dedicated mapper instance, so the number of mappers should be actually directly related to the number of InputSplit
s.
An InputSplit
is based on 3 different values:
- The file name
- The offset (where
InputSplit
starts) - The length (where
InputSplit
stops)
The InputSplit
method toString()
will return the below pattern:
dfs://server.domain:8020/path/to/my/file:0+100000
By hashing this value using MD5Hash, you can get a unique ID identifying any InputSplit
:
911c058fbd1e60ee710dcc41fff16b27
Main assumption: JobTracker
will always return the exact same InputSplit
s as long as the data set does not change.
Indexing
We can identify 3 different levels: The File
, the InputSplit
, and the Block
. In theory, there should be at least 3 different ways for creating indexes: Index based on File URI, index based on InputSplit
, or index based on block. I must confess I was quite afraid to manually deal with blocks, so I focused mainly on File
and InputSplit
indexes only.
Let’s take two different examples of data set.
In this first example, 2 files in your data set fit in 25 blocks, and have been identified as 7 different InputSplit
s. The target you are looking for (grey highlighted) is available on file #1 (block #2,#8 and #13), and on file #2 (block #17)
- With File based indexing, you will end up with 2 files (full data set here), meaning that your indexed query will be equivalent to a full scan query.
- With
InputSplit
based indexing, you will end up with 4 InputSplit
s on 7 available. The performance should be definitely better than doing a full scan query.
Let’s take a second example. This time, the same data set has been sorted by the column you want to index. The target you are looking for (grey highlighted) is now available on file #1 (block #1,#2,#3 and #4).
- With
File
based indexing, you will end up with only 1 file from your data set. - With
InputSplit
based indexing, you will end up with 1 InputSplit
on 7 available.
For this specific study, I decided to use a custom InputSplit
based index. I believe such an approach should be quite a good balance between the efforts it takes to implement, the added value it might bring in terms of performance optimization, and its expected applicability regardless to the data distribution.
Implementation
Process for implementing index is quite simple and consists of the following 3 steps:
- Build index from your full data set
- Query index in order to get the
InputSplit
(s) for the value you are looking for - Execute your actual mapreduce job on indexed
InputSplit
s only
The first step must be executed only once as long as the full data set does not change.
Building Index
Building a Mapreduce Index might take a really long time to execute as you are outputing each value to index together with its actual InputSplit
location. The key parameter to estimate here is the number of Reducers to be used. Using a single reducer, all your indexes will be written into a single file, but the time required to copy all data from mappers to a single reducer will be definitely too long. Using thousands of reducers, you will output indexes on thousands of different files, but it might be significantly faster to execute. The right value is, I believe, a balance between the number of available reduce slots in your cluster and the expected size of your final index. In order to estimate it properly, I did a Simple Random Sampling (SRS) of 10% my data set. If you expect a 100Gb large index, you could set up to around 50 the number of reducers so that you will end up with 50 files of 2Gb each.
Mapper Class
Using Hadoop context, you can retrieve the current InputSplit
from the running mapper instance. I am extracting the value I want to index, and output it together with its InputSplit
MD5 hash.
public class RebuildIndexMapper extends Mapper<Object, Text, Text, Text> {
private String splitId;
public void setup(Context context) {
InputSplit is = context.getInputSplit();
splitId = MD5Hash.digest(is.toString()).toString();
}
public void map(Object object, Text line, Context context) {
String key = Utils.getValueToIndex(line.toString());
context.write(new Text(key), new Text(splitId));
}
}
Assuming you are indexing IP addresses, intermediates key values will look like the following. From a first mapper:
192.168.0.1 60390c7e429e38e8449519011a24f79d
192.168.0.2 60390c7e429e38e8449519011a24f79d
192.168.0.3 60390c7e429e38e8449519011a24f79d
192.168.0.1 60390c7e429e38e8449519011a24f79d
And from a second one:
192.168.0.1 ccc6decd3d361c3d651807a0c1a665e4
192.168.0.5 ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6 ccc6decd3d361c3d651807a0c1a665e4
192.168.0.3 ccc6decd3d361c3d651807a0c1a665e4
Combiner Class
As you will output key / values for each line of your data set, using a Combiner that removes any duplicate might be really useful. Implementation is quite obvious and will not be described here.
Reducer Class
The goal of the reducer is simply to get the distinct InputSplit
for any indexed value, and output them within 1 single line.
public class RebuildIndexReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<text> values, Context context) {
List<string> list = new ArrayList<string>();
for (Text value : values) {
String str = value.toString();
if (!list.contains(str)) {
list.add(str);
}
}
StringBuilder sb = new StringBuilder();
for (String value : list) {
sb.append(value);
sb.append(",");
}
context.write(key, new Text(sb.toString()));
}
}
Given the same example as before, final output will look like the following:
192.168.0.1 ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2 60390c7e429e38e8449519011a24f79d
192.168.0.3 ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5 ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6 ccc6decd3d361c3d651807a0c1a665e4
Because index output is usually quite large, using a SequenceOutputFile
might be really helpful.
Querying Index
Each time you need to execute a mapreduce job for a given indexed value (IP address in my example), you have first to query the index you have created in the previous step in order to retrieve the distinct InputSplit
this value belongs to.
Mapper Class
The map task is quite simple. For each indexed value that matches the target you are looking for, output all its indexed InputSplit
.
public class FetchIndexMapper extends Mapper<text,> {
private String indexLookup;
public void setup(Context context) {
indexLookup = context.getConfiguration().get("target.lookup");
}
public void map(Text indexKey, Text indexValue, Context context) {
String strKey = indexKey.toString();
if (!strKey.equals(indexLookup)) {
return;
} else {
for (String index : indexValue.toString().split(",")) {
context.write(new Text(index), NullWritable.get());
}
}
}
}
Reducer Class
Purpose of Reduce task is simply to remove any duplicate. Implementation is quite obvious so it will not be described here. Given the same example as before:
192.168.0.1 ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2 60390c7e429e38e8449519011a24f79d
192.168.0.3 ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5 ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6 ccc6decd3d361c3d651807a0c1a665e4
an index query for IP address 192.168.0.1 will output the following:
ccc6decd3d361c3d651807a0c1a665e4
60390c7e429e38e8449519011a24f79d
These InputSplit
MD5Hash should be written somewhere on HDFS temp folder so that it can be read in your actual mapreduce job (next section). If the file is quite large, getting a SequenceOutputFormat
can be – once again – really helpful.
Executing Your mapreduce Job
Now that we have built our index table and retrieved the actual InputSplit
(s) for the target we are looking for, it is time to set up the actual mapreduce job.
Custom FileInputFormat
Using default configuration, Hadoop is able to retrieve the number of InputSplit
to be used using the FileInputFormat
class. We will create our own FileInputFormat
class extending the default one, and overriding its getSplits()
method. You have to read the file you have created in the previous step, add all your indexed InputSplit
s into a list, and then compare this list with the one returned by the super class. You will return to JobTracker
only the InputSplit
s that were found in your index.
public class IndexFileInputFormat extends FileInputFormat<LongWritable, Text> {
.../...
@Override
public List getSplits(JobContext job) throws IOException {
List<InputSplit> totalIs = super.getSplits(job);
List<InputSplit> indexedIs = Utils.removeNonIndexedIS(totalIs);
return indexedIs;
}
}
With somewhere in an Utils
class:
public static List removeNonIndexedIS(List<InputSplit> totalIs){
List<string> md5Is = readInputSplitsFromFile();
List<InputSplit> indexedIs = new ArrayList<InputSplit>();
for (InputSplit split : totalIs) {
String str = MD5Hash.digest(split.toString()).toString();
if (md5Is.contains(str)) {
indexedIs.add(split);
}
}
return indexedIs;
}
}
Driver Code
We have now to use this IndexFileInputFormat
class instead of the default one (FileInputFormat
) in our driver code. During JobTracker
initialization, Hadoop will use only the InputSplits
that match the ones we have specified, and you should end up with less map tasks than required for a “full scan query”.
public class TestIndex(){
public void main(String[] args) {
.../...
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MyCustomMapper.class);
job.setMapperClass(MyCustomMapper.class);
job.setReducerClass(MyCustomReducer.class);
job.setInputFormatClass(IndexFileInputFormat.class);
.../...
job.waitForCompletion(true);
}
}
Testing
Test Environment
For this specific test, I set up a small cluster of 3 virtual nodes (Virtualbox
) as per the below description. Hadoop cluster has been installed from Cloudera Manager 4.5.2 (free edition).
Host
OS: Mac OS X 10.7.5
Processor: Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz
Memory: 16Gb
1 Namenode + JobTracker
OS: Ubuntu server 12.04.2 LTS
memory : 5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0
2 Datanodes + TaskTrackers
OS: Ubuntu server 12.04.2 LTS
memory : 2.5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0
Test Data
I have generated a test data set using a simple perl script. Even though 30Gb of data is far away from what a real big-data environment should be, I believe such a data set is large enough to see any potential improvement by using indexing.
30Gb
100 bytes per records
1Gb per file
I have generated 5’000’000 different target (IP addresses) that I have randomly distributed over these 30’000’000 records. A same IP is found 60 times on average.
Results
I have done the exact same indexed and non-indexed jobs several times over subsets of 5, 10, 15, 20 and 25Gb of my data set. Because my indexed values were evenly distributed, I expected most of my figures to somehow follow a linear trend.
I have represented in the below graph the actual index size vs. data set size. As expected, the index size is linearly growing up, and so is its rebuild execution time. My index is around 7.5Gb large for a 25Gb data set (30%), and has been fully rebuilt in around 30mn.
I have represented in the below graph the execution time for both indexed and non-indexed jobs as a function of the data set size. This is obviously what everybody expects from indexing: Larger your data set is, faster your indexed query will be (proportionally to the non-indexed one). Even though my test environment was really small, I have been able to see the great potential indexes are bringing to mapreduce performance.
In the previous graph, I was not taking into account the time required for rebuilding my index. Even though this is done only once (e.g., once a day), this process is really heavy and might take a lot of time to complete. This must be taken into account when forecasting indexing performance. Assuming you are building your index only once a day, and then executing 10 mapreduce jobs, the overall execution time for all your indexed queries will be:
Time total = rebuild_time + 10 x (indexed_mapreduce_time)
This is what I have represented in the below figure. Time required to execute X requests a day using both indexed and non indexed queries, over a 25Gb data set.
For my specific test environment, I need to execute at least 5 mapreduce jobs a day to take any benefit of using custom indexes.
Conclusion
Mapreduce index performances are strongly dependent on your data distribution, and might be really powerful especially with large data set. Should you need to forecast how powerful this same implementation might be for your specific use case, I suggest you to benchmark on your production environment using a subset of your production data set. For that purpose, you can perform several tests on a Simple Random Sampling so that you will be able to extrapolate these results to your entire data set.
History
- 3rd May, 2013: Initial version