Sunday, April 28, 2013

Reduce EMR costs : use spot instances and yes, without risk!

As aws states, Spot Instances can significantly lower your computing costs for time-flexible, interruption-tolerant tasks. But, hadoop jobs being run on EMR aren't generally interruption-tolerant. So, how do we use spot instances and still do not lose our clusters while being in between of a running task?

The answer lies in not having all the machines in a cluster to be spot-instances : some on-demand and some spot instances.

Now, how to achieve this?

There are 3 essential instance groups into which the nodes launched as a part of cluster falls into:
  1. master instance group : This is a must have with a single master node
  2. core instance group : If there are any slave node then there must be at least one node in this instance group. Nodes in this group act as both Tasknode and Datanode.
  3. task instance group : If a cluster has core instance group, it can also have a task instance group containing one or more Tasknodes. They do not have Datanodes (do not contain HDFS).

You can choose to use either on-demand or spot-instances for each of your job flows. This is valid for all of the above instance groups. However, from the definition above if you lose a master or core machine then your job is bound to fail. Theoretically, you can have something like:

elastic-mapreduce –create –alive –plain-output
–instance-group master –instance-type m1.small –instance-count 1 –bid-price 0.091 \
–instance-group core –instance-type m1.small –instance-count 10 –bid-price 0.031 \
–instance-group task –instance-type m1.small –instance-count 30 –bid-price 0.021

But realistically, as you know, if you request spot instances, keep in mind that if the current spot price exceeds your max bid, either instances will not be provisioned or will be removed from the current job flow. Thus, if at any time the bid price goes higher and you lose any of your CORE or MASTER node then the job will fail. Both CORE and TASKS nodes run TaskTrackers but only CORE nodes run DataNodes so you would need at least one CORE node.

To hedge the complete loss of a jobflow, multiple instance groups can be created where the `CORE` group is a smaller complement of traditional on-demand systems and the `TASK` group is the group of spot instances. In this configuration, the `TASK` group will only benefit the mapper phases of a job flow as work from the `TASK` group is “hand back up” to the `CORE` group for reduction.

So, say if you have to run a job which would ideally need 40 slave machines, then you can have, say 10 machines(CORE group) as the traditional instance while other 30 as spot instances(TASK group). The syntax for creating the multiple instance groups is below:

elastic-mapreduce –create –alive –plain-output
–instance-group master –instance-type m1.small –instance-count 1 \
–instance-group core –instance-type m1.small –instance-count 10 \
–instance-group task –instance-type m1.small –instance-count 30 –bid-price 0.021.

This will help you to save cost by running spot instances as your nodes and at the same time make sure that job does not fail.

However, keep in mind that it is possible, depending upon your price and the time taken to complete the job, the spot instances may come and go so might in the worst case end up incurring the same cost and taking longer time to complete the job. It will all depend on your bid price so choose the price wisely. We have also been successful in running short tasks (20 minutes) with all the machines as spot instances!

Saturday, April 13, 2013

Creating custom InputFormat : reading paragraph as a record

Say, instead of processing a single line, you need to process a complete paragraph at once as a single record. How will you achieve this?

In order to do this we will need to customize the default behaviour of TextInputFormat i.e. to read each line, into reading a complete paragraph.

But first you will need to create a custom record reader, this can be done by implementing the class RecordReader. The main method where you would tell the record reader to fetch a paragraph instead of a line is next(). See the following implementation, it is self-explanatory:

public class ParagraphRecordReader implements RecordReader<LongWritable, Text>
private LineRecordReader lineRecord;
private LongWritable lineKey;
private Text lineValue;
public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
lineRecord = new LineRecordReader(conf, split);
lineKey = lineRecord.createKey();
lineValue = lineRecord.createValue();

public void close() throws IOException {

public LongWritable createKey() {
return new LongWritable();


public Text createValue() {
return new Text("");


public float getProgress() throws IOException {
return lineRecord.getPos();


public synchronized boolean next(LongWritable key, Text value) throws IOException {
boolean appended, isNextLineAvailable;
boolean retval;
byte space[] = {' '};
isNextLineAvailable = false;
do {
appended = false;
retval =, lineValue);
if (retval) {
if (lineValue.toString().length() > 0) {
byte[] rawline = lineValue.getBytes();
int rawlinelen = lineValue.getLength();
value.append(rawline, 0, rawlinelen);
value.append(space, 0, 1);
appended = true;
isNextLineAvailable = true;
} while (appended);

return isNextLineAvailable;

public long getPos() throws IOException {
return lineRecord.getPos();

You will need to then extend TextInputFormat to create a custom InputFomat, just overriding the getRecordReader method and returning the above created ParagraphRecordReader will be sufficient.

Your ParagrapghInputFormat will look like follows:

public class ParagrapghInputFormat extends TextInputFormat
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException {
return new ParagraphRecordReader(conf, (FileSplit)split);

Now, one final thing is to set the input format, set it as follows:


That's it, you are done, now in your mapper you will receive one paragraph at a time.

To have a clear perspective of what we achieved above, let's assume that input file is as follows:
This is awesome.
WTF is this.

This is just a test.

And the mapper code looks like:

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

System.out.println(key+" : "+value);

Then the output to console for the mapper's system.out will be as follows:
0 : This is awesome. WTF is this. 
0 : This is just a test.

Tuesday, April 9, 2013

HDFS split versus Mapper input split ?

Following 2 types of splitting are completely separate:
  1. Splitting files into HDFS blocks
  2. Splitting files to be distributed to the mappers

Now, by default if you are using FileInputFormat, then these both types of splitting kind-of overlaps (and hence are identical).

But you can always have a custom way of splitting for the second point above(or even have no splitting at all, i.e. have one complete file go to a single mapper).

Also you can change the hdfs block size independent of the way your InputFormat is splitting input data.

Another important point to note here is that, while the files are actually broken physically when getting stored in HDFS, but for the split in order to distribute to mappers, there is no actual physical split of files, rather it is only logical split.

Let's discuss it with an example from here :

Suppose we want to load a 110MB text file to hdfs. hdfs block size and Input split size is set to 64MB.
  1. Number of mappers is based on number of Input splits not number of hdfs block splits.
  2. When we set hdfs block to 64MB, it is exactly 67108864(64*1024*1024) bytes. I mean it doesn't matter the file will be split from middle of the line.
  3. Now we have 2 input split (so two maps). Last line of first block and first line of second block is not meaningful. TextInputFormat is responsible for reading meaningful lines and giving them to map jobs. What TextInputFormat does is:
    • In second block it will seek to second line which is a complete line and read from there and gives it to second mapper.
    • First mapper will read until the end of first block and also it will process the (last incomplete line of first block + first incomplete line of second block).
Read more here.

Saturday, April 6, 2013

How to write a file in HDFS using Hadoop

At times you may require to write to HDFS on your own rather than relying on hadoop framework's default way of writing outputs. i.e. if you want to create a file in some specific custom format and you need to write that file on your own from inside the Java code of your mapper or reducer, just as you would write a simple file in your local drive.
You know how to write a file in local file system, but what about writing in HDFS?
That is simple too!

Just follow the following sample code, it is pretty much self-explanatory.


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class WriteToHDFS {

public static void main(String[] args) throws Exception {

if (args.length < 2) {
System.out.println("Usage: WriteToHDFS <hdfs-file-path-to-write-into> <text-to-write-in-file>");
.println("Example: WriteToHDFS 'hdfs:/localhost:9000/myFirstSelfWriteFile' 'Hello HDFS world'");
try {
Path path = new Path(args[0]);
FileSystem fileSystem = FileSystem.get(new Configuration());
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileSystem.create(path, true)));
} catch (Exception e) {

If you want to append to an already existing file in HDFS, then instead of using:
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileSystem.create(path, true)));

use, the following:
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileSystem.append(path, true)));

Any feedback, good or bad is most welcome.


Email *

Message *