Saturday, January 12, 2013

How to combine input files to get to a single mapper and control number of mappers : Implement CombineFileInputFormat

Hadoop is not really good at dealing with tons of small files, hence, it is often desired to combine a large number of smaller input files into less number of bigger files so as to reduce number of mappers. This was one of the first things we did when the problem at our hand was to enhance performance for one of our hadoop job. We had some 10,000 files and we were getting as many mappers, hence too much of scheduling/initialization/writing overhead! We reduced it to some 200 mappers and wallah, performance increased by 5x instantly.

As Input to Hadoop MapReduce process is abstracted by InputFormat. FileInputFormat is a default implementation that deals with files in HDFS. With FileInputFormat, each file is split into one or more InputSplits typically upper bounded by block size. This means the number of input splits is lower bounded by number of input files. This is not an ideal environment for MapReduce process when it's dealing with large number of small files, because overhead of coordinating distributed processes is far greater than when there is relatively large number of small files.

Now, how did we do it?

We implemented the CombineFileInputFormat (Hadoop version 0.20.205, you may find it's counterpart in more recent versions).

The basic parameter which drives the spit size is mapred.max.split.size. Using CombineFileInputFormat and this parameter we can control the number of mappers.

Here is the implementation :

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<Longwritable, Text> {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<Longwritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}

public static class myCombineFileRecordReader implements RecordReader<Longwritable, Text>{
private final LineRecordReader linerecord;

public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}

@Override
public void close() throws IOException {
linerecord.close();

}

@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}

@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}

@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}

@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}

@Override
public boolean next(LongWritable key, Text value) throws IOException {

// TODO Auto-generated method stub
return linerecord.next(key, value);
}

}
}


In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

...
if (argument != null) {
conf.set("mapred.max.split.size", argument);
} else {
conf.set("mapred.max.split.size", "134217728"); // 128 MB
}
...

conf.setInputFormat(CombinedInputFormat.class);
...
PS: This won't work if you have gzipped files as input, as a file compressed with the GZIP codec cannot be split(and hence cannot be merged to be of desired size) because of the way this codec works. A single SPLIT in Hadoop can only be processed by a single mapper; so a single GZIP file can only be processed by a single Mapper.

3 comments:

  1. hello,

    first of all i wana(h). your blog is very helpful to me.

    may you please upload a complete working source code of the above problem. i am very new in this environment so i want to test n see how it works.

    kp

    ReplyDelete
  2. Hi KP,
    Thanks. And you may start from the wordcount v1.0 example shown in the mapReduce tutorial here : http://hadoop.apache.org/docs/stable/mapred_tutorial.html

    Then in there instead of
    conf.setOutputFormat(TextOutputFormat.class);

    use the CombinedInputFormat explained in the article above as follows
    conf.setOutputFormat(CombinedInputFormat.class);

    and also set the value of mapred.max.split.size as mentioned in the articles above.

    ReplyDelete
  3. Very good informative article. Thanks for sharing such nice article, keep on up dating such good articles.
    Best Digital Transformation Services | DM Services | Austere Technologies

    ReplyDelete

Any feedback, good or bad is most welcome.

Name

Email *

Message *