Thursday, September 26, 2013

Hadoop: Exit reduce function for the rest of input on some condition

Say we have a reduce function where in we want to halt the reduce function after processing some 'n' keys. We have set a counter to increment on each key, and on condition being satisfied return from the reduce function without bothering about rest of the keys.

Fortunately with the new mapreduce API, we have the ability to do that now.

You can achieve this by overriding the run() of the Reducer class.

Let's straight get to the code:

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

  //reduce method here

  // Override the run()
  @override
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    int count = 0;
    while (context.nextKey()) {
        if (count++ < n) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        } else {
            // exit or do whatever you want
        }
    }
    cleanup(context);
  }
}

Hadoop's speculative task execution

One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program.

Tasks may be slow for various reasons, including hardware degradation, or software mis-configuration, but the causes may be hard to detect since the tasks still complete successfully, albeit after a longer time than expected. Hadoop doesn’t try to diagnose and fix slow-running tasks; instead, it tries to detect when a task is running slower than expected and launches another, equivalent, task as a backup. This is termed speculative execution of tasks.

For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution JobConf options to false, respectively using old API, while with newer API you may consider changing mapreduce.map.speculative and mapreduce.reduce.speculative.

If your jobs' mapper/redcuer takes very less time then it is better to disable the speculative execution. But in cases where your jobs' each mapper/reducer takes a considerable amount of time, you would be better off having speculative execution enabled as if after running for say 45 minutes, if a mapper/reducer fails due to loss of that particular node then another node has to restart it's execution from scratch.

Reference: http://developer.yahoo.com/hadoop/tutorial/module4.html

Any feedback, good or bad is most welcome.

Name

Email *

Message *