Tuesday, February 26, 2013

Hadoop - How to do a secondary sort on values ?

The problem at hand here is that you need to work upon a sorted values set in your reducer.

Inputs were of the following form:

udid,datetime,<other-details>

Now, say we have following inputs:

udid1,1970-01-11 23:00:00,<...>

udid1,1970-01-01 23:00:00,<...>

udid1,1970-01-21 23:00:00,<...>

So, when udids are made to be key in the mappers, you would expect following input group in reducer (ignoring the rest of the input details other than datetime for clarity):

<udid1, { 1970-01-11 23:00:00, 1970-01-01 23:00:00,1970-01-21 23:00:00 } >

But we needed to sort the records as per datetime before we might process them. Hence, instead of the above, following is what was required (note that dates are sorted now) :

<udid1, { 1970-01-01 23:00:00, 1970-01-11 23:00:00,1970-01-21 23:00:00 } >

We had a very tough time sorting all the values based on a datetime in the reducer as this needed to be done in-memory and was a great performance killer. Sometimes due to skewed data, i.e. a lot of records for a particular udid, we used to get into Out-Of-Memory issues.

But, not anymore. :)

What we did?

We made hadoop to do this job for us. It isn't very simple and straight forward but I will try to make as much sense as possible.

Hadoop doesn't sort on values. Period. So, we will have to trick it. For this we will have to make our secondary sort column to be a part of the key of the mapper as well as have to leave it in the value also. Hence, a sample output from mapper would something like:

< [udid1,1970-01-11 23:00:00] , [1970-01-11 23:00:00,<other-details>] >

Following four things need to be created/customized:
  1. Map output key class (I have named it CompositeKey)
  2. Partitioner class (ActualKeyPartitioner)
  3. Grouping comparator class (ActualKeyGroupingComparator)
  4. Comparator class (CompositeComparator)

The CompositeKey class is more or less self-explanatory, it would be something like following class:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

/**
* This key is a composite key. The "actual"
* key is the UDID. The secondary sort will be performed against the datetime.
*/
public class CompositeKey implements WritableComparable {

private String udid;
private String datetime;

public CompositeKey() {
}

public CompositeKey(String udid, String datetime) {

this.udid = udid;
this.datetime = datetime;
}

@Override
public String toString() {

return (new StringBuilder()).append(udid).append(',').append(datetime).toString();
}

@Override
public void readFields(DataInput in) throws IOException {

udid = WritableUtils.readString(in);
datetime = WritableUtils.readString(in);
}

@Override
public void write(DataOutput out) throws IOException {

WritableUtils.writeString(out, udid);
WritableUtils.writeString(out, datetime);
}

@Override
public int compareTo(CompositeKey o) {

int result = udid.compareTo(o.udid);
if (0 == result) {
result = datetime.compareTo(o.datetime);
}
return result;
}

/**
* Gets the udid.
*
* @return UDID.
*/
public String getUDID() {

return udid;
}

public void setUDID(String udid) {

this.udid = udid;
}

/**
* Gets the datetime.
*
* @return Datetime
*/
public String getDatetime() {

return datetime;
}

public void setDatetime(String datetime) {

this.datetime = datetime;
}

}


Secondly, we need to implement our own partitioner. The reason we need to do so is that now we are emitting both udid and datetime as the key and the defaut partitioner (HashPartitioner) would then not be able to ensure that all the records related to a certain udid comes to the same reducer (partition). Hence, we need to make the partitioner to only consider the actual key part (udid) while deciding on the partition for the record. This can be done as follows:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {

HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
Text newKey = new Text();

@Override
public int getPartition(CompositeKey key, Text value, int numReduceTasks) {

try {
// Execute the default partitioner over the first part of the key
newKey.set(key.getUDID());
return hashPartitioner.getPartition(newKey, value, numReduceTasks);
} catch (Exception e) {
e.printStackTrace();
return (int) (Math.random() * numReduceTasks); // this would return a random value in the range
// [0,numReduceTasks)
}
}
}

Now, the partitioner only makes sure that the all records related to the same udid comes to a particular reducer, but it doesn't guarantee that all of them will come in the same input group (i.e. in a single reduce() call as the list of values). In order to make sure of this, we will need to implement our own grouping comparator. We shall do similar thing as we did for the partitioner, i.e. only look at the actual key (udid) for grouping of reducer inputs. This can be done as follows:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ActualKeyGroupingComparator extends WritableComparator {

protected ActualKeyGroupingComparator() {

super(CompositeKey.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {

CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;

// (check on udid)
return key1.getUDID().compareTo(key2.getUDID());
}
}


The final thing left is the secondary sorting over datetime field. To achieve this we shall just create our own comparator which first checks for the equality of udids and only if they are equal goes on to check for the datetime field. This shall be implemented as follows and is pretty much self-explanatory:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {

CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;

// (first check on udid)
int compare = key1.getUDID().compareTo(key2.getUDID());

if (compare == 0) {
// only if we are in the same input group should we try and sort by value (datetime)
return key1.getDatetime().compareTo(key2.getDatetime());
}

return compare;
}
}

After creating all these classes, you just need to set this up in your run() as follows:

job.setMapOutputKeyClass(CompositeKey.class);
job.setPartitionerClass(ActualKeyPartitioner.class);
job.setGroupingComparatorClass(ActualKeyGroupingComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);

You are done. You shall get pre-sorted values(sorted over datetime) for each udid in your reduce method.

 

40 comments:

  1. Hi, Neat explanation thankyou.
    Could you please explain briefly how to use compareTo method.

    When I searched for example in net with respect to Hadoop, I found two styles one inside the compareTo body they are recursively calling compareTo and other basic comparison operations and ensured an int is returned. I understood that compareTo is used for sorting of keys in Hadoop, why recursive call needs to be made, is it to sort all keys in order?

    My understanding is compareTo just sorts and doesn't perfom grouping of keys right?

    Could you please throw light on these details

    ReplyDelete
  2. Thanks Ram. If you look closely it is not that complareTo() is called recursivley, compareTo of different object is called.
    So, say you have 2 properties in your custom Key class, one is a String another an Integer. Now, you need to sort according to the String property first and then by the Integer property, so your compareTo() would first call the compareTo for the String and only when that value is 0, i.e. String from both the objects are same then compareTo for the Integers would be called.

    ReplyDelete
  3. Hi, this is a pretty good doc. However, if you already override the compareTo function in your CompositeKey class, you don't need to add sort comparator class at all. The compareTo function in CompositeKey class will do this job.

    ReplyDelete
  4. Hi Hao,

    I tried above program and it worked fine without explicitly implementing CompositeKeyComparator since the CompositeKey class already implemented compareTo method.

    But I want to send the reducer only largest udid1 and date values instead of all udid values.
    That is reducer output should show only hightest date details.

    ReplyDelete
    Replies
    1. I am wondering as what is the difference between the custom comparator and the compareTo() method in the derived Writable Comparable class.

      Delete
  5. Hi Amar,

    Nice article, Could you kindly let me know the solution for the problem of ' how to find out last URL accessed by user in map reduce."

    Regards
    Basha

    ReplyDelete
  6. This is a great inspiring tutorials on hadoop.I am pretty much pleased with your good work.You put really very helpful information. Keep it up.
    Hadoop Training in hyderabad

    ReplyDelete
  7. Hi Amar,

    This is a great help.

    I'm running above program on 2.2.0 and getting

    java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: MovieLens$CompositeKey.()

    Any pointers around this would be appreciated.

    Thanks,
    -Nishith

    ReplyDelete
    Replies
    1. It was resolved by making all comparator inner classes as static.

      Thanks,
      -Nishith

      Delete
  8. Hi,

    I am a beginner learning java map reduce. I have seen m-r programs structured as a driver, mapper and reducer code. My understanding is that usually the control flow would be as follows [1 Input file---2 Input file split---3 Recorder reader----4 Mapper (within the mapper 5 Partition-----6 Sort ------7 Grouping by using comparator, all this stages can happen either by user defined code or pre existing code ) ---8 Reducer. Kindly correct me if my understanding is wrong. I do understand the role of each stages, however I am not able to infer the absence of a dedicated MAPPER code ( public class mymapper extends Mapper) and also a dedicated reducer code in the above code. I am confused :( and it would be great if someone can provide me with some pointers.

    ReplyDelete
  9. I am wondering as what is the difference between the custom comparator and the compareTo() method in the derived Writable Comparable class. I am a beginner trying to understand things in the Hadoop world.

    ReplyDelete
  10. why compareTo of CompositeKey is not enough for sorting? why CompositeKeyComparator's compare() has to be defined? both are seems to be doing same thing.

    Since CompositeKeyComparator's compare() is used for sorting, when is compareTo() of CompositKey is actually used?

    ReplyDelete
  11. It is only after attending the hadoop hadoop online training, I was selected for job in an MNC in India. Thanks for support provided by the informative blogs like this.

    ReplyDelete
  12. In which class i use run method ?
    job.setMapOutputKeyClass(CompositeKey.class);
    job.setPartitionerClass(ActualKeyPartitioner.class);
    job.setGroupingComparatorClass(ActualKeyGroupingComparator.class);
    job.setSortComparatorClass(CompositeKeyComparator.class);
    anyone have this compiled source code ?

    ReplyDelete
  13. Hi Amarkanth,

    Very good blog. I need some clarification on how the composite key gets split as the original key + value at the reducer.

    composite key = uid + timestamp

    will this get split into ( uid, {timestamp1,timestamp2, timestamp3...etc })
    after the grouping comparator?

    Thanks,
    Yash

    ReplyDelete
  14. Very well written Amar, keep it up.Thanks for such nice post.

    ReplyDelete
  15. This comment has been removed by a blog administrator.

    ReplyDelete
  16. hi I need u r help is there anyother way to contact u

    ReplyDelete
  17. I trust it's a great job the board is doing and it must be cheered. Each course taken for more prominence's benefit of individuals is a decent one.
    Escorts Services in Delhi
    Vasant Kunj Escorts
    CP Escorts Services
    Mahipalpur Escorts
    Escorts in Saket

    ReplyDelete
  18. Thanks a lot very much for the high quality and results-oriented help. I won’t think twice to endorse your blog post to anybody who wants and needs support

    about this area.
    hadoop training in bangalore

    ReplyDelete
  19. I like the post format as you create user engagement in the complete article. It seems round up of all published posts. Thanks for gauging the informative posts.
    cara menggugurkan kandungan

    ReplyDelete
  20. Very good informative article. Thanks for sharing such nice article, keep on up dating such good articles.
    Austere Technologies | Best Cloud Solution services

    ReplyDelete
  21. Very nice information and explanation about cloud computing. Thanks for sharing. keep on updating such a nice information.

    NO.1 MOBILE APP DEVELOPMENT SERVICES | MASSIL TECHNOLOGIES

    ReplyDelete
  22. REALLY VERY EXCELLENT INFORMATION. I AM VERY GLAD TO SEE YOUR BLOG FOR THIS INFORMATION. THANKS FOR SHARING. KEEP UPDATING.

    NO.1 IOT Services | INTERNET OF THINGS | Best IOT Services |

    ReplyDelete
  23. This comment has been removed by the author.

    ReplyDelete
  24. Nice blog with excellent information. Thank you, keep sharing.

    Join in Avinash College Of Commerce for Best career in commerce

    ReplyDelete
  25. Excellent information you made in this blog, very helpful information. Thanks for sharing.

    Software Testing | Austere Technology

    ReplyDelete
  26. Great article, really very helpful content you made. Thank you, keep sharing.

    chartered accountant | Avinash college of commerce

    ReplyDelete

Any feedback, good or bad is most welcome.

Name

Email *

Message *