Tuesday, October 8, 2013

Interesting Big-Data news from around the world : October 2013 - Week 1

Bigdata is in fashion. It has become the in-thing! So it is bound to make some news.

Here are some of the interesting reads I have for you from around the world for the first week of October 2013:
  1. Work4 Exploits Facebook's Graph Search - Who's Next?
    Think the only person that can solicit you on Facebook is a friend or friend of a friend? Think again.
    Today, Work4, a Facebook recruiting solution, is unveiling Graph Search Recruiter, a service which gives companies the ability to search for and contact potential job candidates from across Facebook’s entire membership. Except those whose privacy settings prevent it, that is.
    From a recruiter’s perspective, this seems to be as sexy as it gets.
    Read more...

  2. Why big data has made your privacy a thing of the past?
    Despite the efforts of European regulators to protect citizens' personal data, predictive analytics has made it too easy to piece together information about individuals regardless of the law.
    Read more...

  3. Big data blocks gaming fraud
    The explosion of online games has resulted in the creation of a new industry: in-game currency, currently valued at $1 billion in the U.S. alone. But game developers, particularly startups that are rising and falling on a single game, are losing significant revenue as savvy social game players figure out how to “game” the system by stealing currency rather than buying it.
    Read more...

  4. How to Find Out What Big Data Knows About You?
    The world of Big Data is a world of pervasive data collection and aggressive analytics. Some see the future and cheer it on; others rebel. Behind it all lurks a question most of us are asking—does it really matter? I had a chance to find out recently, as I got to see what Acxiom, a large-scale commercial data aggregator, had collected about me.
    At least in theory large scale data collection matters quite a bit. Large data sets can be used to create social network maps and can form the seeds for link analysis of connections between individuals. Some see this as a good thing; others as a bad one—but whatever your viewpoint, we live in a world which sees increasing power and utility in Big Data’s large scale data sets.
    Read more...

  5. Deutsche Telekom speeds up big data with hosted HANA
    Enterprises have another option for accessing SAP’s in-memory database technology: Deutsche Telekom subsidiary T-Systems has been approved to offer HANA Enterprise Cloud.
    The in-memory database technology can process large data volumes from business applications more quickly than standard server implementations, and also supports new integrated analytical methods, according to T-Systems.
    Read more...

Debugging Hadoop MR Java code in local eclipse dev environment

I have been asked multiple times to blog about this. One of my esteemed colleague has already blogged about it, so here I am just re-blogging it.

The basic thing to remember here is that debugging a Hadoop MR job is going to be similar to any remotely debugged application in Eclipse.

A debugger or debugging tool is a computer program that is used to test and debug other programs (the “target” program). It is greatly useful specially for a Hadoop environment wherein there is little room for error and one small error can cause a huge loss.

Debugging Custom Java code for Hadoop in your local eclipse environment is pretty straight forward and does not take much time to setup.

As you would know, Hadoop can be run in the local environment in 3 different modes :

1. Local Mode
2. Pseudo Distributed Mode
3. Fully Distributed Mode (Cluster)

Typically you will be running your local hadoop setup in Pseudo Distributed Mode to leverage HDFS and Map Reduce(MR). However you cannot debug MR programs in this mode as each Map/Reduce task will be running in a separate JVM process so you need to switch back to Local mode where you can run your MR programs in a single JVM process.

Here are the quick and simple steps to debug this in your local environment:

1. Run hadoop in local mode for debugging so mapper and reducer tasks run in a single JVM instead of separate JVMs. Below steps help you do it.

2. Configure HADOOP_OPTS to enable debugging so when you run your Hadoop job, it will be waiting for the debugger to connect. Below is the command to debug the same at port 8080.

(export HADOOP_OPTS=”-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8008“)

3. Configure fs.default.name value in core-site.xml to file:/// from hdfs://. You won’t be using hdfs in local mode.

4. Configure mapred.job.tracker value in mapred-site.xml to local. This will instruct Hadoop to run MR tasks in a single JVM.

5. Create debug configuration for Eclipse and set the port to 8008 – typical stuff. For that go to the debugger configurations and create a new Remote Java Application type of configuration and set the port as 8080 in the settings.

7. Run your hadoop job (it will be waiting for the debugger to connect) and then launch Eclipse in debug mode with the above configuration. Do make sure to put a break-point first.

That is all you need to do.

Thursday, September 26, 2013

Hadoop: Exit reduce function for the rest of input on some condition

Say we have a reduce function where in we want to halt the reduce function after processing some 'n' keys. We have set a counter to increment on each key, and on condition being satisfied return from the reduce function without bothering about rest of the keys.

Fortunately with the new mapreduce API, we have the ability to do that now.

You can achieve this by overriding the run() of the Reducer class.

Let's straight get to the code:

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

  //reduce method here

  // Override the run()
  @override
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    int count = 0;
    while (context.nextKey()) {
        if (count++ < n) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        } else {
            // exit or do whatever you want
        }
    }
    cleanup(context);
  }
}

Hadoop's speculative task execution

One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program.

Tasks may be slow for various reasons, including hardware degradation, or software mis-configuration, but the causes may be hard to detect since the tasks still complete successfully, albeit after a longer time than expected. Hadoop doesn’t try to diagnose and fix slow-running tasks; instead, it tries to detect when a task is running slower than expected and launches another, equivalent, task as a backup. This is termed speculative execution of tasks.

For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution JobConf options to false, respectively using old API, while with newer API you may consider changing mapreduce.map.speculative and mapreduce.reduce.speculative.

If your jobs' mapper/redcuer takes very less time then it is better to disable the speculative execution. But in cases where your jobs' each mapper/reducer takes a considerable amount of time, you would be better off having speculative execution enabled as if after running for say 45 minutes, if a mapper/reducer fails due to loss of that particular node then another node has to restart it's execution from scratch.

Reference: http://developer.yahoo.com/hadoop/tutorial/module4.html

Sunday, July 28, 2013

Custom Writable : SetWritable

Recently, I needed to have the output key of mapper to be a Set.

While there are chances that in newer versions of hadoop we will have this pre-implemented, but it isn't available yet, so I wrote my own.

In addition to overriding readFields() and write() , other methods need to be overridden are:
  • compareTo()
  • equals()
  • hashCode()

Following is the SetWritable class:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.io.WritableComparable;

/**
 * 
 * @author amar
 */
public class SetWritable implements WritableComparable<SetWritable> {

 private Set<Integer> itemSet;

 /**
  * Constructor.
  */
 public SetWritable() {

 }

 /**
  * Constructor.
  * 
  * @param itemSet
  */
 public SetWritable(Set<Integer> itemSet) {

  this.itemSet = itemSet;
 }

 @Override
 public String toString() {

  return itemSet.toString();
 }

 @Override
 public void readFields(DataInput in) throws IOException {

  // First clear the set. Otherwise we will just accumulate
  // entries every time this method is called.
  if (this.itemSet != null) {
   this.itemSet.clear();
  } else {
   this.itemSet = new HashSet<Integer>();
  }
  int count = in.readInt();
  while (count-- > 0) {
   itemSet.add(in.readInt());
  }
 }

 @Override
 public void write(DataOutput out) throws IOException {

  out.writeInt(itemSet.size());
  for (int item : itemSet) {
   out.writeInt(item);
  }
 }

 @Override
 public int compareTo(ItemSetKey o) {

  if (itemSet.equals(o.itemSet))
   return 0;
  else
   return 1;
 }

 @Override
 public boolean equals(Object other) {

  if (this == other)
   return true;

  if (other == null || (this.getClass() != other.getClass())) {
   return false;
  }

  ItemSetKey guest = (ItemSetKey) other;
  return (this.itemSet.equals(guest.itemSet));
 }

 @Override
 public int hashCode() {

  int result = 0;
  result = this.itemSet.hashCode();
  return result;
 }

 /**
  * Gets the itemSet.
  * 
  * @return itemSet.
  */
 public Set<Integer> getItemSet() {

  return itemSet;
 }

 public void setItemSet(Set<Integer> itemSet) {

  this.itemSet = itemSet;
 }
}

Now, in run(), MapOutputKeyClass needs to be set as follows:

job.setMapOutputKeyClass(ItemSetKey.class);

Saturday, June 1, 2013

Launch Amazon EMR cluster using AWS Java SDK

The Java SDK for AWS has utilities to launch and manage EMR clusters.

You can do the following :
  1. Define and create steps to be added to the EMR cluster.
  2. Define and create bootstrap actions.
  3. Define and launch cluster with above created bootstrap actions and steps
Certainly, there are much more features in the SDK that can be explored.

Let's delve into code without much ado.

Define and create steps:

The important classes which we shall know about here are:
  • com.amazonaws.services.elasticmapreduce.model.StepConfig
  • com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig
For example, if we need to add a step to install hive, we can get the object of HadoopJarStepConfigclass having details for installing hive(like jar location, arguments to be passed, etc.) using StepFactory, which has a number of predefined hadoop jar steps.
Code would look as below:
StepFactory stepFactory = new StepFactory();
StepConfig installHive = new StepConfig().withName("Install Hive")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newInstallHiveStep());

In order to add our custom jars, we shall create object of HadoopJarStepConfig like follows and then using that create the required StepFactory:
HadoopJarStepConfig customJarStep = new HadoopJarStepConfig("Path-to-jar-on-s3");
customJarStep.setArgs("argumnet-list-to-be-passed-to-jar");
StepConfig doSomething = new StepConfig().withName("Do Some Task")
.withHadoopJarStep(customJarStep);

Define and create bootstrap actions:

To quote amazon EMR's docs:
You can use bootstrap actions to install additional software and to change the configuration of applications on the cluster. Bootstrap actions are scripts that are run on the cluster nodes when Amazon EMR launches the cluster. They run before Hadoop starts and before the node begins processing data. You can write custom bootstrap actions, or use predefined bootstrap actions provided by Amazon EMR. A common use of bootstrap actions is to change Hadoop configuration settings. 
Let's see how we can create a BootstrapActionConfig.
The important classes which we shall know about here are:
  • com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig
  • com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig

Following code should be self-explanatory if you have already know about using bootstrap to set custom hadoop settings, if not, read more here.
String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop";
ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig();
 bootstrapScriptConfig.setPath(bootstrapPath);
List<String> setMappersArgs = new ArrayList<String>();
setMappersArgs.add("-s");
setMappersArgs.add("textinputformat.record.delimiter=;");
bootstrapScriptConfig.setArgs(args);
BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig();
bootstrapConfig.setName("Set Hadoop Config");
bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig);

Define and launch cluster:

The important class which we shall know about here is :
  • com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest 
Here you may define the various aspects of a cluster, like:
  • Cluster size (Instance count) 
  • Master instance type 
  • Slave instance type 
  • Steps to be added 
  • Bootstrap actions 
  • Log location on S3 
Code is as follows:
RunJobFlowRequest request = new RunJobFlowRequest()
        .withBootstrapActions(mappersBootstrapConfig)
        .withName("Hive Interactive")
        .withSteps(enabledebugging, installHive)
        .withLogUri("s3://myawsbucket/")
        .withInstances(
                new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20")
                        .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true)
                        .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small"));

In order to launch the cluster, execute the following:
RunJobFlowResult result = emr.runJobFlow(request);
That's it. Complete code show-casing above mentioned features is as below:
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;

/**
 * 
 * @author amar
 * 
 */
public class RunEMRJobFlow {

 private static final String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop";

 public static void main(String[] args) {

  String accessKey = "yourAccessKeyHere";
  String secretKey = "yourSecretKeyHere";
  AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
  AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);

  StepFactory stepFactory = new StepFactory();

  StepConfig enabledebugging = new StepConfig().withName("Enable debugging")
    .withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep());

  StepConfig installHive = new StepConfig().withName("Install Hive").withActionOnFailure("TERMINATE_JOB_FLOW")
    .withHadoopJarStep(stepFactory.newInstallHiveStep());
  
  List<String> setMappersArgs = new ArrayList<String>();
  setMappersArgs.add("-s");
  setMappersArgs.add("textinputformat.record.delimiter=;");

  BootstrapActionConfig mappersBootstrapConfig = createBootstrapAction("Set Hadoop Config",
    CONFIG_HADOOP_BOOTSTRAP_ACTION, setMappersArgs);

  RunJobFlowRequest request = new RunJobFlowRequest()
    .withBootstrapActions(mappersBootstrapConfig)
    .withName("Hive Interactive")
    .withSteps(enabledebugging, installHive)
    .withLogUri("s3://myawsbucket/")
    .withInstances(
      new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20")
        .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small"));

  RunJobFlowResult result = emr.runJobFlow(request);
 }

 private static BootstrapActionConfig createBootstrapAction(String bootstrapName, String bootstrapPath,
   List<String> args) {

  ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig();
  bootstrapScriptConfig.setPath(bootstrapPath);

  if (args != null) {
   bootstrapScriptConfig.setArgs(args);
  }

  BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig();
  bootstrapConfig.setName(bootstrapName);
  bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig);

  return bootstrapConfig;
 }

}

Thursday, May 23, 2013

Amazon EMR : How to add More than 256 Steps to a Cluster?

If you have been using Amazon EMR for long and complex tasks, you might know that EMR currently limits the number of steps which can be added to 256.

But at times, it becomes a tad difficult to limit the number of steps to 256. It might be because the problem at hand being complex and needed to be broken into several steps and needed to be run over varied sets of data. Or one might have long running jobs taking care of multiple tasks for a Hive-based data warehouse. Whatever, may be the reason, you shouldn't get depressed about it! As there is a simple workaround for this:

Manually connect to the master node and submit job steps! Just like you run it on you local machine!
Yeah, that's it. Simple.

EMR's CLI already has ways which can facilitate things for us here.
Assuming that you already have a cluster spawned and have it's JobFlowID, follow the below steps to submit job steps directly to the master node:

  1. Move your executables to the master node
    In order to run your job step, you will need to have the jar and or other files required by your job to be moved to the master node. This can be done as follows using EMR CLI's --scp:
    ruby elastic-mapreduce --jobflow JobFlowID --scp myJob.jar
  2. Execute hadoop command, just like you do in local machine.
    This can also be done as follows using EMR CLI's --ssh:
    ruby elastic-mapreduce --jobflow JobFlowID --ssh ' hadoop jar myJob.jar inputPath outputPath otherArguments'
There are other ways also. Refer here for more.


Tuesday, May 14, 2013

How to read a file in HDFS using Hadoop

You may want to read programmatically from HDFS in your mapper/reducer. Following on my post on How to write a file in HDFS using Hadoop, here is the simple code to read a file from HDFS:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ReadFromHDFS {

 public static void main(String[] args) throws Exception {

  if (args.length < 1) {
   System.out.println("Usage: ReadFromHDFS ");
   System.out.println("Example: ReadFromHDFS 'hdfs:/localhost:9000/myFirstSelfWriteFile'");
   System.exit(-1);
  }

  try {
   Path path = new Path(args[0]);
   FileSystem fileSystem = FileSystem.get(new Configuration());
   BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
   String line = bufferedReader.readLine();
   while (line != null) {
    System.out.println(line);
    line = bufferedReader.readLine();
   }
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

Sunday, April 28, 2013

Reduce EMR costs : use spot instances and yes, without risk!

As aws states, Spot Instances can significantly lower your computing costs for time-flexible, interruption-tolerant tasks. But, hadoop jobs being run on EMR aren't generally interruption-tolerant. So, how do we use spot instances and still do not lose our clusters while being in between of a running task?

The answer lies in not having all the machines in a cluster to be spot-instances : some on-demand and some spot instances.

Now, how to achieve this?

There are 3 essential instance groups into which the nodes launched as a part of cluster falls into:
  1. master instance group : This is a must have with a single master node
  2. core instance group : If there are any slave node then there must be at least one node in this instance group. Nodes in this group act as both Tasknode and Datanode.
  3. task instance group : If a cluster has core instance group, it can also have a task instance group containing one or more Tasknodes. They do not have Datanodes (do not contain HDFS).

You can choose to use either on-demand or spot-instances for each of your job flows. This is valid for all of the above instance groups. However, from the definition above if you lose a master or core machine then your job is bound to fail. Theoretically, you can have something like:

elastic-mapreduce –create –alive –plain-output
...
–instance-group master –instance-type m1.small –instance-count 1 –bid-price 0.091 \
–instance-group core –instance-type m1.small –instance-count 10 –bid-price 0.031 \
–instance-group task –instance-type m1.small –instance-count 30 –bid-price 0.021
...

But realistically, as you know, if you request spot instances, keep in mind that if the current spot price exceeds your max bid, either instances will not be provisioned or will be removed from the current job flow. Thus, if at any time the bid price goes higher and you lose any of your CORE or MASTER node then the job will fail. Both CORE and TASKS nodes run TaskTrackers but only CORE nodes run DataNodes so you would need at least one CORE node.

To hedge the complete loss of a jobflow, multiple instance groups can be created where the `CORE` group is a smaller complement of traditional on-demand systems and the `TASK` group is the group of spot instances. In this configuration, the `TASK` group will only benefit the mapper phases of a job flow as work from the `TASK` group is “hand back up” to the `CORE` group for reduction.

So, say if you have to run a job which would ideally need 40 slave machines, then you can have, say 10 machines(CORE group) as the traditional instance while other 30 as spot instances(TASK group). The syntax for creating the multiple instance groups is below:

elastic-mapreduce –create –alive –plain-output
...
–instance-group master –instance-type m1.small –instance-count 1 \
–instance-group core –instance-type m1.small –instance-count 10 \
–instance-group task –instance-type m1.small –instance-count 30 –bid-price 0.021.

This will help you to save cost by running spot instances as your nodes and at the same time make sure that job does not fail.

PS:
However, keep in mind that it is possible, depending upon your price and the time taken to complete the job, the spot instances may come and go so might in the worst case end up incurring the same cost and taking longer time to complete the job. It will all depend on your bid price so choose the price wisely. We have also been successful in running short tasks (20 minutes) with all the machines as spot instances!

Saturday, April 13, 2013

Creating custom InputFormat : reading paragraph as a record

Say, instead of processing a single line, you need to process a complete paragraph at once as a single record. How will you achieve this?

In order to do this we will need to customize the default behaviour of TextInputFormat i.e. to read each line, into reading a complete paragraph.

But first you will need to create a custom record reader, this can be done by implementing the class RecordReader. The main method where you would tell the record reader to fetch a paragraph instead of a line is next(). See the following implementation, it is self-explanatory:

public class ParagraphRecordReader implements RecordReader<LongWritable, Text>
{
private LineRecordReader lineRecord;
private LongWritable lineKey;
private Text lineValue;
public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
lineRecord = new LineRecordReader(conf, split);
lineKey = lineRecord.createKey();
lineValue = lineRecord.createValue();
}

@Override
public void close() throws IOException {
lineRecord.close();
}

@Override
public LongWritable createKey() {
return new LongWritable();

}

@Override
public Text createValue() {
return new Text("");

}

@Override
public float getProgress() throws IOException {
return lineRecord.getPos();

}

@Override
public synchronized boolean next(LongWritable key, Text value) throws IOException {
boolean appended, isNextLineAvailable;
boolean retval;
byte space[] = {' '};
value.clear();
isNextLineAvailable = false;
do {
appended = false;
retval = lineRecord.next(lineKey, lineValue);
if (retval) {
if (lineValue.toString().length() > 0) {
byte[] rawline = lineValue.getBytes();
int rawlinelen = lineValue.getLength();
value.append(rawline, 0, rawlinelen);
value.append(space, 0, 1);
appended = true;
}
isNextLineAvailable = true;
}
} while (appended);

return isNextLineAvailable;
}

@Override
public long getPos() throws IOException {
return lineRecord.getPos();
}
}

You will need to then extend TextInputFormat to create a custom InputFomat, just overriding the getRecordReader method and returning the above created ParagraphRecordReader will be sufficient.

Your ParagrapghInputFormat will look like follows:

public class ParagrapghInputFormat extends TextInputFormat
{
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException {
reporter.setStatus(split.toString());
return new ParagraphRecordReader(conf, (FileSplit)split);
}
}

Now, one final thing is to set the input format, set it as follows:

conf.setInputFormat(ParagrapghInputFormat.class);

That's it, you are done, now in your mapper you will receive one paragraph at a time.


To have a clear perspective of what we achieved above, let's assume that input file is as follows:
This is awesome.
WTF is this.

This is just a test.

And the mapper code looks like:

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

System.out.println(key+" : "+value);
}


Then the output to console for the mapper's system.out will be as follows:
0 : This is awesome. WTF is this. 
0 : This is just a test.

Tuesday, April 9, 2013

HDFS split versus Mapper input split ?

Following 2 types of splitting are completely separate:
  1. Splitting files into HDFS blocks
  2. Splitting files to be distributed to the mappers

Now, by default if you are using FileInputFormat, then these both types of splitting kind-of overlaps (and hence are identical).

But you can always have a custom way of splitting for the second point above(or even have no splitting at all, i.e. have one complete file go to a single mapper).

Also you can change the hdfs block size independent of the way your InputFormat is splitting input data.

Another important point to note here is that, while the files are actually broken physically when getting stored in HDFS, but for the split in order to distribute to mappers, there is no actual physical split of files, rather it is only logical split.

Let's discuss it with an example from here :

Suppose we want to load a 110MB text file to hdfs. hdfs block size and Input split size is set to 64MB.
  1. Number of mappers is based on number of Input splits not number of hdfs block splits.
  2. When we set hdfs block to 64MB, it is exactly 67108864(64*1024*1024) bytes. I mean it doesn't matter the file will be split from middle of the line.
  3. Now we have 2 input split (so two maps). Last line of first block and first line of second block is not meaningful. TextInputFormat is responsible for reading meaningful lines and giving them to map jobs. What TextInputFormat does is:
    • In second block it will seek to second line which is a complete line and read from there and gives it to second mapper.
    • First mapper will read until the end of first block and also it will process the (last incomplete line of first block + first incomplete line of second block).
Read more here.

Saturday, April 6, 2013

How to write a file in HDFS using Hadoop

At times you may require to write to HDFS on your own rather than relying on hadoop framework's default way of writing outputs. i.e. if you want to create a file in some specific custom format and you need to write that file on your own from inside the Java code of your mapper or reducer, just as you would write a simple file in your local drive.
You know how to write a file in local file system, but what about writing in HDFS?
That is simple too!

Just follow the following sample code, it is pretty much self-explanatory.

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class WriteToHDFS {

public static void main(String[] args) throws Exception {

if (args.length < 2) {
System.out.println("Usage: WriteToHDFS <hdfs-file-path-to-write-into> <text-to-write-in-file>");
System.out
.println("Example: WriteToHDFS 'hdfs:/localhost:9000/myFirstSelfWriteFile' 'Hello HDFS world'");
System.exit(-1);
}
try {
Path path = new Path(args[0]);
FileSystem fileSystem = FileSystem.get(new Configuration());
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileSystem.create(path, true)));
bufferedWriter.write(args[1]);
bufferedWriter.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

If you want to append to an already existing file in HDFS, then instead of using:
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileSystem.create(path, true)));

use, the following:
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileSystem.append(path, true)));

Sunday, March 31, 2013

Hadoop : How to have nested directories as input path ?


At times our input directories have sub-directories in them.
And when we add such directory as our input path we are bound to get exception like follows:
java.io.IOException: Not a file
Say we have the following directory structure
mainInputDirectory
                inputSubDirectory1
                                1234
                                abcdefg
                inputSubDirectory2
                                1234
                                hikjlmno
                inputDirectoryNotNeededAsInput
                                somefolderName
Hadoop supports input paths to be a regular expression. I haven’t experimented with a lot of complex regex, but the simple placeholders ? and * does work.
Now, say, we only want to have the input to be from the directories named abcdefg, this can be achieved by having the input path to be something like follows:
[s3-bucket-path-or-hdfs-path]/mainInputDirectory/inputSubDirectory*/?????*/*
To explain this:
In order to exclude sub-directory inputDirectoryNotNeededAsInput and have all the directories with names starting from inputSubDirectory, we have the following construct in the input path:
/inputSubDirectory*/
In order to exclude sub-directories named 1234 (denote that these directories have names with just 4 letters) and have all the directories with names having more than 4 letters eg : abcdefg, we have the following construct in the input path:
/?????*/

How to ensure that one file is completely processed by the same mapper ?

There are times when we want one particular file to be read/processed by the same mapper. This requirement may arise in situations when you have a sequential data in each file and you want to process all the records of a file in exact same sequence they appear in the input file.

So, basically what we are asking here is that : please don'e split our input files and distribute it among different mappers. Simple.

It is even simpler to achieve this :

You have to create your own version of FileInputFormat and override isSplittable(), like this:

Class NonSplittableFileInputFormat extends FileInputFormat{

@Override 
public boolean isSplitable(FileSystem fs, Path filename){ 
return false; 
}
}

And then use the above class to setInputFormatClass().

Monday, March 25, 2013

Amazon EMR: Can we resize a cluster having a running job?


Can we increase or decrease the number of nodes in a running job flow using Amazon’s EMR?
Hell yeah!
To quote Amazon:
You can modify the size of a running job flow using either the API or the CLI. The AWS Management Console allows you to monitor job flows that you resized, but it does not provide the option to resize job flows.
But I will cover only the CLI part here.
There are 3 essential instance groups into which the nodes launched as a part of cluster falls into:
  1. master instance group : This is a must have with a single master node
  2. core instance group : If there are any slave node then there must be at least one node in this instance group. Nodes in this group act as both Tasknode and Datanode.
  3. task instance group : If a cluster has core instance group, it can also have a task instance group containing one or more Tasknodes. They do not have Datanodes (do not contain HDFS).
While trying to resize your cluster, you must bear in mind the following:
  • The number of core nodes can only be increased and not decreased. Reason behind this being if you remove a core node you may lose the contents the HDFS might be having.
  • The number of task nodes can be both increased and decreased.
  • The number of master has to be always just one, so no fiddling with its number.
Now, the EMR CLI provide with following parameters to handle resizing of clusters:
--modify-instance-group INSTANCE_GROUP_IDModify an existing instance group.
--instance-count INSTANCE_COUNTSet the count of nodes for an instance group.
Say, you have a cluster with 5 core nodes and 2 task nodes.
Now, in order to increase the number of core nodes to 10, you shall first find out the INSTANCE_GROUP_ID of the cluster’s core instance group. To do that, you can use CLI’s –describe :
ruby elastic-mapreduce --jobflow JobFlowID --describe
This would return a JSON which should have the “InstanceGroupId” of the “Core Instance Group”.
Now, execute the following :
ruby elastic-mapreduce --modify-instance-group InstanceGroupID --instance-count COUNT
In our case the COUNT will be 10.
NOTE: The count isn’t by-how-many-to-increase, but the new total count of nodes in that instance group. Hence in our case while increasing the number of nodes from 5 to 10, the count has to be 10 and not 5.
Similarly, if we want to decrease the number of task nodes to 2, after finding out the ”InstanceGroupId” of the “Task Instance Group”, we shall execute the following:
ruby elastic-mapreduce --modify-instance-group TaskInstanceGroupID --instance-count 2

Hadoop: How to get file path of the input record being read in mapper?

There might be situations when you need to know exactly from which file the current <key,value> pair being processed in the mapper has come from.

You might have to process records from a certain bucket/path in a different way from the ones from another location. If you are dealing with online advertisement related hadoop job, then one of the location might have the impressions' logs while the other might have the clicks logs, and you need to process then differently in your mapper.

So, how to go about doing that?

You can get the file path from where the input record currently being processed in the mapper has come from.

First you need to get the input split, using the newer mapreduce API it would be done as follows:

context.getInputSplit();

But in order to get the file path and the file name you will need to first typecast the result into FileSplit.

So, in order to get the input file path you may do the following:

Path filePath = ((FileSplit) context.getInputSplit()).getPath();
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();

Similarly, to get the file name, you may just call upon getName(), like this:

String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();

Tuesday, February 26, 2013

Hadoop - How to do a secondary sort on values ?

The problem at hand here is that you need to work upon a sorted values set in your reducer.

Inputs were of the following form:

udid,datetime,<other-details>

Now, say we have following inputs:

udid1,1970-01-11 23:00:00,<...>

udid1,1970-01-01 23:00:00,<...>

udid1,1970-01-21 23:00:00,<...>

So, when udids are made to be key in the mappers, you would expect following input group in reducer (ignoring the rest of the input details other than datetime for clarity):

<udid1, { 1970-01-11 23:00:00, 1970-01-01 23:00:00,1970-01-21 23:00:00 } >

But we needed to sort the records as per datetime before we might process them. Hence, instead of the above, following is what was required (note that dates are sorted now) :

<udid1, { 1970-01-01 23:00:00, 1970-01-11 23:00:00,1970-01-21 23:00:00 } >

We had a very tough time sorting all the values based on a datetime in the reducer as this needed to be done in-memory and was a great performance killer. Sometimes due to skewed data, i.e. a lot of records for a particular udid, we used to get into Out-Of-Memory issues.

But, not anymore. :)

What we did?

We made hadoop to do this job for us. It isn't very simple and straight forward but I will try to make as much sense as possible.

Hadoop doesn't sort on values. Period. So, we will have to trick it. For this we will have to make our secondary sort column to be a part of the key of the mapper as well as have to leave it in the value also. Hence, a sample output from mapper would something like:

< [udid1,1970-01-11 23:00:00] , [1970-01-11 23:00:00,<other-details>] >

Following four things need to be created/customized:
  1. Map output key class (I have named it CompositeKey)
  2. Partitioner class (ActualKeyPartitioner)
  3. Grouping comparator class (ActualKeyGroupingComparator)
  4. Comparator class (CompositeComparator)

The CompositeKey class is more or less self-explanatory, it would be something like following class:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

/**
* This key is a composite key. The "actual"
* key is the UDID. The secondary sort will be performed against the datetime.
*/
public class CompositeKey implements WritableComparable {

private String udid;
private String datetime;

public CompositeKey() {
}

public CompositeKey(String udid, String datetime) {

this.udid = udid;
this.datetime = datetime;
}

@Override
public String toString() {

return (new StringBuilder()).append(udid).append(',').append(datetime).toString();
}

@Override
public void readFields(DataInput in) throws IOException {

udid = WritableUtils.readString(in);
datetime = WritableUtils.readString(in);
}

@Override
public void write(DataOutput out) throws IOException {

WritableUtils.writeString(out, udid);
WritableUtils.writeString(out, datetime);
}

@Override
public int compareTo(CompositeKey o) {

int result = udid.compareTo(o.udid);
if (0 == result) {
result = datetime.compareTo(o.datetime);
}
return result;
}

/**
* Gets the udid.
*
* @return UDID.
*/
public String getUDID() {

return udid;
}

public void setUDID(String udid) {

this.udid = udid;
}

/**
* Gets the datetime.
*
* @return Datetime
*/
public String getDatetime() {

return datetime;
}

public void setDatetime(String datetime) {

this.datetime = datetime;
}

}


Secondly, we need to implement our own partitioner. The reason we need to do so is that now we are emitting both udid and datetime as the key and the defaut partitioner (HashPartitioner) would then not be able to ensure that all the records related to a certain udid comes to the same reducer (partition). Hence, we need to make the partitioner to only consider the actual key part (udid) while deciding on the partition for the record. This can be done as follows:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class ActualKeyPartitioner extends Partitioner<CompositeKey, Text> {

HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
Text newKey = new Text();

@Override
public int getPartition(CompositeKey key, Text value, int numReduceTasks) {

try {
// Execute the default partitioner over the first part of the key
newKey.set(key.getUDID());
return hashPartitioner.getPartition(newKey, value, numReduceTasks);
} catch (Exception e) {
e.printStackTrace();
return (int) (Math.random() * numReduceTasks); // this would return a random value in the range
// [0,numReduceTasks)
}
}
}

Now, the partitioner only makes sure that the all records related to the same udid comes to a particular reducer, but it doesn't guarantee that all of them will come in the same input group (i.e. in a single reduce() call as the list of values). In order to make sure of this, we will need to implement our own grouping comparator. We shall do similar thing as we did for the partitioner, i.e. only look at the actual key (udid) for grouping of reducer inputs. This can be done as follows:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ActualKeyGroupingComparator extends WritableComparator {

protected ActualKeyGroupingComparator() {

super(CompositeKey.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {

CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;

// (check on udid)
return key1.getUDID().compareTo(key2.getUDID());
}
}


The final thing left is the secondary sorting over datetime field. To achieve this we shall just create our own comparator which first checks for the equality of udids and only if they are equal goes on to check for the datetime field. This shall be implemented as follows and is pretty much self-explanatory:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {

CompositeKey key1 = (CompositeKey) w1;
CompositeKey key2 = (CompositeKey) w2;

// (first check on udid)
int compare = key1.getUDID().compareTo(key2.getUDID());

if (compare == 0) {
// only if we are in the same input group should we try and sort by value (datetime)
return key1.getDatetime().compareTo(key2.getDatetime());
}

return compare;
}
}

After creating all these classes, you just need to set this up in your run() as follows:

job.setMapOutputKeyClass(CompositeKey.class);
job.setPartitionerClass(ActualKeyPartitioner.class);
job.setGroupingComparatorClass(ActualKeyGroupingComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);

You are done. You shall get pre-sorted values(sorted over datetime) for each udid in your reduce method.

 

How to perform a global aggregation or count using Hadoop?

Override the run() of the mapper and reducer using the new org.apache.hadoop.mapreduce API. In these methods you can emit the accumulated sum/count from each mapper or reducer.
Also you would need to limit the reducer count by 1 so as to get a global sum of all the sums generated by multiple mappers.
See the below code for more clarity:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AggregationExample extends Configured implements Tool {

/**
* This is Mapper.
* 
*/
public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

try {
// say that you need to sum up the value part
sum+= Double.valueOf(value);
}

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}

// emit out the sum per mapper
outputKey.set(sum);
context.write(outputKey, outputValue);// Notice that the outputValue is empty
cleanup(context);

}
}

/**
* This is Reducer.
* 
*/
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {


// summation of values from each mapper
sum += Double.valueOf(key.toString());

}

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}

// emit out the global sums
outputKey.set(sum);
context.write(outputKey, outputValue);
cleanup(context);
}
}

@Override
public int run(String[] args) throws Exception {

try {
Configuration conf = getConf();

// output key and value separator is empty as in final output only
// key is emitted and value is empty
conf.set("mapred.textoutputformat.separator", "");

// Configuring mapred to have just one reducer as we need to find
// single sum values from all the inputs
conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
conf.setInt("mapred.reduce.tasks", 1);

Job job = new Job(conf);

job.setJarByClass(AggregationExample.class);
job.setJobName("Aggregation Example");

job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}

}

public static void main(String[] args) throws Exception {

if (args.length < 2) {
System.out
.println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
System.exit(-1);
}

int result = ToolRunner.run(new AggregationExample(), args);
System.exit(result);
}

}

Saturday, January 12, 2013

How to combine input files to get to a single mapper and control number of mappers : Implement CombineFileInputFormat

Hadoop is not really good at dealing with tons of small files, hence, it is often desired to combine a large number of smaller input files into less number of bigger files so as to reduce number of mappers. This was one of the first things we did when the problem at our hand was to enhance performance for one of our hadoop job. We had some 10,000 files and we were getting as many mappers, hence too much of scheduling/initialization/writing overhead! We reduced it to some 200 mappers and wallah, performance increased by 5x instantly.

As Input to Hadoop MapReduce process is abstracted by InputFormat. FileInputFormat is a default implementation that deals with files in HDFS. With FileInputFormat, each file is split into one or more InputSplits typically upper bounded by block size. This means the number of input splits is lower bounded by number of input files. This is not an ideal environment for MapReduce process when it's dealing with large number of small files, because overhead of coordinating distributed processes is far greater than when there is relatively large number of small files.

Now, how did we do it?

We implemented the CombineFileInputFormat (Hadoop version 0.20.205, you may find it's counterpart in more recent versions).

The basic parameter which drives the spit size is mapred.max.split.size. Using CombineFileInputFormat and this parameter we can control the number of mappers.

Here is the implementation :

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<Longwritable, Text> {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<Longwritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}

public static class myCombineFileRecordReader implements RecordReader<Longwritable, Text>{
private final LineRecordReader linerecord;

public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}

@Override
public void close() throws IOException {
linerecord.close();

}

@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}

@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}

@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}

@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}

@Override
public boolean next(LongWritable key, Text value) throws IOException {

// TODO Auto-generated method stub
return linerecord.next(key, value);
}

}
}


In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

...
if (argument != null) {
conf.set("mapred.max.split.size", argument);
} else {
conf.set("mapred.max.split.size", "134217728"); // 128 MB
}
...

conf.setInputFormat(CombinedInputFormat.class);
...
PS: This won't work if you have gzipped files as input, as a file compressed with the GZIP codec cannot be split(and hence cannot be merged to be of desired size) because of the way this codec works. A single SPLIT in Hadoop can only be processed by a single mapper; so a single GZIP file can only be processed by a single Mapper.

Tuesday, January 8, 2013

How to pass on custom variables to Mapper and Reducer class?

One way is to use custom counters but they are there for a specific reason, ie. to keep count of some specific state, for example, "NUMBER_OF_RECORDS_DISCARDED".And I believe one can only increment these counters and not set to any arbitrary value(I may be wrong here). But sure they can be used as message passers, but there is a better way, and that is to use job configuration to set a variable and seamlessly.

Setting the message/variable using the old mapred API
JobConf job = (JobConf) getConf();
job.set("messageToBePassed-OR-anyValue", "123-awesome-value :P");

Setting the message/variable using the new mapreduce API:
Configuration conf = new Configuration();
conf.set("messageToBePassed-OR-anyValue", "123-awesome-value :P");
Job job = new Job(conf);

Getting the message/variable using the old API in the Mapper and Reducer: The configure() has to be implemented in the Mapper and Reducer class and the values may be then assigned to a class member so as to be used inside map() or reduce().
...
private String awesomeMessage;
public void configure(JobConf job) {
awesomeMessage = Long.parseLong(job.get("messageToBePassed-OR-anyValue"));
}
...

The variable awesomeMessage can then be used with the map and reduce functions.

Getting the message/variable using the new API in the Mapper and Reducer: Similar thing needs to be done here in the setup().
Configuration conf = context.getConfiguration();
String param = conf.get("messageToBePassed-OR-anyValue");

Saturday, January 5, 2013

What are Combiners in Hadoop ?

Combiners are one of those things every Hadoop developer wants to use but seldom knows when and how to use it. Combiners are basically mini-reducers. They essentially lessen the workload which is passed on further to the reducers. Your mapper may be emitting more than one record per key and they would ultimately be aggregated and passed as a single call to reducer method. So, if these records per key can be combined even before passing them to reducers then amount of data which will be shuffled across the network in order to get it to the right reducer will be reduced and ultimately enhancing our job's performance. Also the sorting in the reduce phase will be quicker. Data flow among mapper, combiner and reducer is shown by a super-simple diagram below: Image
 How to use? A combiner is nothing but 100% same as a reducer implementation. If you are writing in Java then it will be a class which must either extend org.apache.hadoop.mapreduce.Reducer while using the new API or implement org.apache.hadoop.mapred.Reducer while using the older API and override/implement the reduce() method. The standard convention is to use the reducer itself as the combiner, but it may not be desired in every scenario and you may write another class doing some mapper-wide aggregation which may be different from what you are going to perform in your reducers.

When to use? As the name itself suggests combiners should only be used when there is any possibility to combine. Generally, it shall be applied on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c} . But this is just for caution, there is no hard and fast rule that it has to be commutative and associative. Combiners may operate only on a subset of your keys and values or may not execute at all. So if there are very less amount of duplicate keys in your mapper output then at times using combiners may backfire and instead become a useless burden. So use combiners only when there are enough scope of combining.
Quoting from Chuck Lam's 'Hadoop in Action': "A combiner doesn't necessarily improve performance. You should monitor the job's behavior to see if the number of records outputted by the combiner is meaningfully less than the number of records going in. The reduction must justify the extra execution time of running a combiner. "
So, go ahead and see if you can optimize your hadoop job using a combiner or not and do share your thoughts/inputs below. Cheers.

Any feedback, good or bad is most welcome.

Name

Email *

Message *