Sunday, March 31, 2013

Hadoop : How to have nested directories as input path ?

At times our input directories have sub-directories in them.
And when we add such directory as our input path we are bound to get exception like follows: Not a file
Say we have the following directory structure
Hadoop supports input paths to be a regular expression. I haven’t experimented with a lot of complex regex, but the simple placeholders ? and * does work.
Now, say, we only want to have the input to be from the directories named abcdefg, this can be achieved by having the input path to be something like follows:
To explain this:
In order to exclude sub-directory inputDirectoryNotNeededAsInput and have all the directories with names starting from inputSubDirectory, we have the following construct in the input path:
In order to exclude sub-directories named 1234 (denote that these directories have names with just 4 letters) and have all the directories with names having more than 4 letters eg : abcdefg, we have the following construct in the input path:

How to ensure that one file is completely processed by the same mapper ?

There are times when we want one particular file to be read/processed by the same mapper. This requirement may arise in situations when you have a sequential data in each file and you want to process all the records of a file in exact same sequence they appear in the input file.

So, basically what we are asking here is that : please don'e split our input files and distribute it among different mappers. Simple.

It is even simpler to achieve this :

You have to create your own version of FileInputFormat and override isSplittable(), like this:

Class NonSplittableFileInputFormat extends FileInputFormat{

public boolean isSplitable(FileSystem fs, Path filename){ 
return false; 

And then use the above class to setInputFormatClass().

Monday, March 25, 2013

Amazon EMR: Can we resize a cluster having a running job?

Can we increase or decrease the number of nodes in a running job flow using Amazon’s EMR?
Hell yeah!
To quote Amazon:
You can modify the size of a running job flow using either the API or the CLI. The AWS Management Console allows you to monitor job flows that you resized, but it does not provide the option to resize job flows.
But I will cover only the CLI part here.
There are 3 essential instance groups into which the nodes launched as a part of cluster falls into:
  1. master instance group : This is a must have with a single master node
  2. core instance group : If there are any slave node then there must be at least one node in this instance group. Nodes in this group act as both Tasknode and Datanode.
  3. task instance group : If a cluster has core instance group, it can also have a task instance group containing one or more Tasknodes. They do not have Datanodes (do not contain HDFS).
While trying to resize your cluster, you must bear in mind the following:
  • The number of core nodes can only be increased and not decreased. Reason behind this being if you remove a core node you may lose the contents the HDFS might be having.
  • The number of task nodes can be both increased and decreased.
  • The number of master has to be always just one, so no fiddling with its number.
Now, the EMR CLI provide with following parameters to handle resizing of clusters:
--modify-instance-group INSTANCE_GROUP_IDModify an existing instance group.
--instance-count INSTANCE_COUNTSet the count of nodes for an instance group.
Say, you have a cluster with 5 core nodes and 2 task nodes.
Now, in order to increase the number of core nodes to 10, you shall first find out the INSTANCE_GROUP_ID of the cluster’s core instance group. To do that, you can use CLI’s –describe :
ruby elastic-mapreduce --jobflow JobFlowID --describe
This would return a JSON which should have the “InstanceGroupId” of the “Core Instance Group”.
Now, execute the following :
ruby elastic-mapreduce --modify-instance-group InstanceGroupID --instance-count COUNT
In our case the COUNT will be 10.
NOTE: The count isn’t by-how-many-to-increase, but the new total count of nodes in that instance group. Hence in our case while increasing the number of nodes from 5 to 10, the count has to be 10 and not 5.
Similarly, if we want to decrease the number of task nodes to 2, after finding out the ”InstanceGroupId” of the “Task Instance Group”, we shall execute the following:
ruby elastic-mapreduce --modify-instance-group TaskInstanceGroupID --instance-count 2

Hadoop: How to get file path of the input record being read in mapper?

There might be situations when you need to know exactly from which file the current <key,value> pair being processed in the mapper has come from.

You might have to process records from a certain bucket/path in a different way from the ones from another location. If you are dealing with online advertisement related hadoop job, then one of the location might have the impressions' logs while the other might have the clicks logs, and you need to process then differently in your mapper.

So, how to go about doing that?

You can get the file path from where the input record currently being processed in the mapper has come from.

First you need to get the input split, using the newer mapreduce API it would be done as follows:


But in order to get the file path and the file name you will need to first typecast the result into FileSplit.

So, in order to get the input file path you may do the following:

Path filePath = ((FileSplit) context.getInputSplit()).getPath();
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();

Similarly, to get the file name, you may just call upon getName(), like this:

String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();

Any feedback, good or bad is most welcome.


Email *

Message *