Saturday, January 12, 2013

How to combine input files to get to a single mapper and control number of mappers : Implement CombineFileInputFormat

Hadoop is not really good at dealing with tons of small files, hence, it is often desired to combine a large number of smaller input files into less number of bigger files so as to reduce number of mappers. This was one of the first things we did when the problem at our hand was to enhance performance for one of our hadoop job. We had some 10,000 files and we were getting as many mappers, hence too much of scheduling/initialization/writing overhead! We reduced it to some 200 mappers and wallah, performance increased by 5x instantly.

As Input to Hadoop MapReduce process is abstracted by InputFormat. FileInputFormat is a default implementation that deals with files in HDFS. With FileInputFormat, each file is split into one or more InputSplits typically upper bounded by block size. This means the number of input splits is lower bounded by number of input files. This is not an ideal environment for MapReduce process when it's dealing with large number of small files, because overhead of coordinating distributed processes is far greater than when there is relatively large number of small files.

Now, how did we do it?

We implemented the CombineFileInputFormat (Hadoop version 0.20.205, you may find it's counterpart in more recent versions).

The basic parameter which drives the spit size is mapred.max.split.size. Using CombineFileInputFormat and this parameter we can control the number of mappers.

Here is the implementation :

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<Longwritable, Text> {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<Longwritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}

public static class myCombineFileRecordReader implements RecordReader<Longwritable, Text>{
private final LineRecordReader linerecord;

public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}

@Override
public void close() throws IOException {
linerecord.close();

}

@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}

@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}

@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}

@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}

@Override
public boolean next(LongWritable key, Text value) throws IOException {

// TODO Auto-generated method stub
return linerecord.next(key, value);
}

}
}


In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

...
if (argument != null) {
conf.set("mapred.max.split.size", argument);
} else {
conf.set("mapred.max.split.size", "134217728"); // 128 MB
}
...

conf.setInputFormat(CombinedInputFormat.class);
...
PS: This won't work if you have gzipped files as input, as a file compressed with the GZIP codec cannot be split(and hence cannot be merged to be of desired size) because of the way this codec works. A single SPLIT in Hadoop can only be processed by a single mapper; so a single GZIP file can only be processed by a single Mapper.

Tuesday, January 8, 2013

How to pass on custom variables to Mapper and Reducer class?

One way is to use custom counters but they are there for a specific reason, ie. to keep count of some specific state, for example, "NUMBER_OF_RECORDS_DISCARDED".And I believe one can only increment these counters and not set to any arbitrary value(I may be wrong here). But sure they can be used as message passers, but there is a better way, and that is to use job configuration to set a variable and seamlessly.

Setting the message/variable using the old mapred API
JobConf job = (JobConf) getConf();
job.set("messageToBePassed-OR-anyValue", "123-awesome-value :P");

Setting the message/variable using the new mapreduce API:
Configuration conf = new Configuration();
conf.set("messageToBePassed-OR-anyValue", "123-awesome-value :P");
Job job = new Job(conf);

Getting the message/variable using the old API in the Mapper and Reducer: The configure() has to be implemented in the Mapper and Reducer class and the values may be then assigned to a class member so as to be used inside map() or reduce().
...
private String awesomeMessage;
public void configure(JobConf job) {
awesomeMessage = Long.parseLong(job.get("messageToBePassed-OR-anyValue"));
}
...

The variable awesomeMessage can then be used with the map and reduce functions.

Getting the message/variable using the new API in the Mapper and Reducer: Similar thing needs to be done here in the setup().
Configuration conf = context.getConfiguration();
String param = conf.get("messageToBePassed-OR-anyValue");

Saturday, January 5, 2013

What are Combiners in Hadoop ?

Combiners are one of those things every Hadoop developer wants to use but seldom knows when and how to use it. Combiners are basically mini-reducers. They essentially lessen the workload which is passed on further to the reducers. Your mapper may be emitting more than one record per key and they would ultimately be aggregated and passed as a single call to reducer method. So, if these records per key can be combined even before passing them to reducers then amount of data which will be shuffled across the network in order to get it to the right reducer will be reduced and ultimately enhancing our job's performance. Also the sorting in the reduce phase will be quicker. Data flow among mapper, combiner and reducer is shown by a super-simple diagram below: Image
 How to use? A combiner is nothing but 100% same as a reducer implementation. If you are writing in Java then it will be a class which must either extend org.apache.hadoop.mapreduce.Reducer while using the new API or implement org.apache.hadoop.mapred.Reducer while using the older API and override/implement the reduce() method. The standard convention is to use the reducer itself as the combiner, but it may not be desired in every scenario and you may write another class doing some mapper-wide aggregation which may be different from what you are going to perform in your reducers.

When to use? As the name itself suggests combiners should only be used when there is any possibility to combine. Generally, it shall be applied on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c} . But this is just for caution, there is no hard and fast rule that it has to be commutative and associative. Combiners may operate only on a subset of your keys and values or may not execute at all. So if there are very less amount of duplicate keys in your mapper output then at times using combiners may backfire and instead become a useless burden. So use combiners only when there are enough scope of combining.
Quoting from Chuck Lam's 'Hadoop in Action': "A combiner doesn't necessarily improve performance. You should monitor the job's behavior to see if the number of records outputted by the combiner is meaningfully less than the number of records going in. The reduction must justify the extra execution time of running a combiner. "
So, go ahead and see if you can optimize your hadoop job using a combiner or not and do share your thoughts/inputs below. Cheers.

Any feedback, good or bad is most welcome.

Name

Email *

Message *