Tuesday, February 26, 2013

Hadoop - How to do a secondary sort on values ?

The problem at hand here is that you need to work upon a sorted values set in your reducer.

Inputs were of the following form:

udid,datetime,<other-details>

Now, say we have following inputs:

udid1,1970-01-11 23:00:00,<...>

udid1,1970-01-01 23:00:00,<...>

udid1,1970-01-21 23:00:00,<...>

So, when udids are made to be key in the mappers, you would expect following input group in reducer (ignoring the rest of the input details other than datetime for clarity):

<udid1, { 1970-01-11 23:00:00, 1970-01-01 23:00:00,1970-01-21 23:00:00 } >

But we needed to sort the records as per datetime before we might process them. Hence, instead of the above, following is what was required (note that dates are sorted now) :

<udid1, { 1970-01-01 23:00:00, 1970-01-11 23:00:00,1970-01-21 23:00:00 } >

We had a very tough time sorting all the values based on a datetime in the reducer as this needed to be done in-memory and was a great performance killer. Sometimes due to skewed data, i.e. a lot of records for a particular udid, we used to get into Out-Of-Memory issues.

But, not anymore. :)

What we did?

We made hadoop to do this job for us. It isn't very simple and straight forward but I will try to make as much sense as possible.

Hadoop doesn't sort on values. Period. So, we will have to trick it. For this we will have to make our secondary sort column to be a part of the key of the mapper as well as have to leave it in the value also. Hence, a sample output from mapper would something like:

< [udid1,1970-01-11 23:00:00] , [1970-01-11 23:00:00,<other-details>] >

Following four things need to be created/customized:
  1. Map output key class (I have named it CompositeKey)
  2. Partitioner class (ActualKeyPartitioner)
  3. Grouping comparator class (ActualKeyGroupingComparator)
  4. Comparator class (CompositeComparator)

The CompositeKey class is more or less self-explanatory, it would be something like following class:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

/**
* This key is a composite key. The "actual"
* key is the UDID. The secondary sort will be performed against the datetime.
*/
public class CompositeKey implements WritableComparable {

private String udid;
private String datetime;

public CompositeKey() {
}

public CompositeKey(String udid, String datetime) {

this.udid = udid;
this.datetime = datetime;
}

@Override
public String toString() {

return (new StringBuilder()).append(udid).append(',').append(datetime).toString();
}

@Override
public void readFields(DataInput in) throws IOException {

udid = WritableUtils.readString(in);
datetime = WritableUtils.readString(in);
}

@Override
public void write(DataOutput out) throws IOException {

WritableUtils.writeString(out, udid);
WritableUtils.writeString(out, datetime);
}

@Override
public int compareTo(CompositeKey o) {

int result = udid.compareTo(o.udid);
if (0 == result) {
result = datetime.compareTo(o.datetime);
}
return result;
}

/**
* Gets the udid.
*
* @return UDID.
*/
public String getUDID() {

return udid;
}

public void setUDID(String udid) {

this.udid = udid;
}

/**
* Gets the datetime.
*
* @return Datetime
*/
public String getDatetime() {

return datetime;
}

public void setDatetime(String datetime) {

this.datetime = datetime;
}

}


Secondly, we need to implement our own partitioner. The reason we need to do so is that now we are emitting both udid and datetime as the key and the defaut partitioner (HashPartitioner) would then not be able to ensure that all the records related to a certain udid comes to the same reducer (partition). Hence, we need to make the partitioner to only consider the actual key part (udid) while deciding on the partition for the record. This can be done as follows:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {

HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
Text newKey = new Text();

@Override
public int getPartition(CompositeKey key, Text value, int numReduceTasks) {

try {
// Execute the default partitioner over the first part of the key
newKey.set(key.getUDID());
return hashPartitioner.getPartition(newKey, value, numReduceTasks);
} catch (Exception e) {
e.printStackTrace();
return (int) (Math.random() * numReduceTasks); // this would return a random value in the range
// [0,numReduceTasks)
}
}
}

Now, the partitioner only makes sure that the all records related to the same udid comes to a particular reducer, but it doesn't guarantee that all of them will come in the same input group (i.e. in a single reduce() call as the list of values). In order to make sure of this, we will need to implement our own grouping comparator. We shall do similar thing as we did for the partitioner, i.e. only look at the actual key (udid) for grouping of reducer inputs. This can be done as follows:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ActualKeyGroupingComparator extends WritableComparator {

protected ActualKeyGroupingComparator() {

super(CompositeKey.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {

CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;

// (check on udid)
return key1.getUDID().compareTo(key2.getUDID());
}
}


The final thing left is the secondary sorting over datetime field. To achieve this we shall just create our own comparator which first checks for the equality of udids and only if they are equal goes on to check for the datetime field. This shall be implemented as follows and is pretty much self-explanatory:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {

CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;

// (first check on udid)
int compare = key1.getUDID().compareTo(key2.getUDID());

if (compare == 0) {
// only if we are in the same input group should we try and sort by value (datetime)
return key1.getDatetime().compareTo(key2.getDatetime());
}

return compare;
}
}

After creating all these classes, you just need to set this up in your run() as follows:

job.setMapOutputKeyClass(CompositeKey.class);
job.setPartitionerClass(ActualKeyPartitioner.class);
job.setGroupingComparatorClass(ActualKeyGroupingComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);

You are done. You shall get pre-sorted values(sorted over datetime) for each udid in your reduce method.

 

How to perform a global aggregation or count using Hadoop?

Override the run() of the mapper and reducer using the new org.apache.hadoop.mapreduce API. In these methods you can emit the accumulated sum/count from each mapper or reducer.
Also you would need to limit the reducer count by 1 so as to get a global sum of all the sums generated by multiple mappers.
See the below code for more clarity:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AggregationExample extends Configured implements Tool {

/**
* This is Mapper.
* 
*/
public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

try {
// say that you need to sum up the value part
sum+= Double.valueOf(value);
}

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}

// emit out the sum per mapper
outputKey.set(sum);
context.write(outputKey, outputValue);// Notice that the outputValue is empty
cleanup(context);

}
}

/**
* This is Reducer.
* 
*/
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {


// summation of values from each mapper
sum += Double.valueOf(key.toString());

}

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}

// emit out the global sums
outputKey.set(sum);
context.write(outputKey, outputValue);
cleanup(context);
}
}

@Override
public int run(String[] args) throws Exception {

try {
Configuration conf = getConf();

// output key and value separator is empty as in final output only
// key is emitted and value is empty
conf.set("mapred.textoutputformat.separator", "");

// Configuring mapred to have just one reducer as we need to find
// single sum values from all the inputs
conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
conf.setInt("mapred.reduce.tasks", 1);

Job job = new Job(conf);

job.setJarByClass(AggregationExample.class);
job.setJobName("Aggregation Example");

job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}

}

public static void main(String[] args) throws Exception {

if (args.length < 2) {
System.out
.println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
System.exit(-1);
}

int result = ToolRunner.run(new AggregationExample(), args);
System.exit(result);
}

}

Any feedback, good or bad is most welcome.

Name

Email *

Message *