Saturday, April 13, 2013

Creating custom InputFormat : reading paragraph as a record

Say, instead of processing a single line, you need to process a complete paragraph at once as a single record. How will you achieve this?

In order to do this we will need to customize the default behaviour of TextInputFormat i.e. to read each line, into reading a complete paragraph.

But first you will need to create a custom record reader, this can be done by implementing the class RecordReader. The main method where you would tell the record reader to fetch a paragraph instead of a line is next(). See the following implementation, it is self-explanatory:

public class ParagraphRecordReader implements RecordReader<LongWritable, Text>
{
private LineRecordReader lineRecord;
private LongWritable lineKey;
private Text lineValue;
public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
lineRecord = new LineRecordReader(conf, split);
lineKey = lineRecord.createKey();
lineValue = lineRecord.createValue();
}

@Override
public void close() throws IOException {
lineRecord.close();
}

@Override
public LongWritable createKey() {
return new LongWritable();

}

@Override
public Text createValue() {
return new Text("");

}

@Override
public float getProgress() throws IOException {
return lineRecord.getPos();

}

@Override
public synchronized boolean next(LongWritable key, Text value) throws IOException {
boolean appended, isNextLineAvailable;
boolean retval;
byte space[] = {' '};
value.clear();
isNextLineAvailable = false;
do {
appended = false;
retval = lineRecord.next(lineKey, lineValue);
if (retval) {
if (lineValue.toString().length() > 0) {
byte[] rawline = lineValue.getBytes();
int rawlinelen = lineValue.getLength();
value.append(rawline, 0, rawlinelen);
value.append(space, 0, 1);
appended = true;
}
isNextLineAvailable = true;
}
} while (appended);

return isNextLineAvailable;
}

@Override
public long getPos() throws IOException {
return lineRecord.getPos();
}
}

You will need to then extend TextInputFormat to create a custom InputFomat, just overriding the getRecordReader method and returning the above created ParagraphRecordReader will be sufficient.

Your ParagrapghInputFormat will look like follows:

public class ParagrapghInputFormat extends TextInputFormat
{
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException {
reporter.setStatus(split.toString());
return new ParagraphRecordReader(conf, (FileSplit)split);
}
}

Now, one final thing is to set the input format, set it as follows:

conf.setInputFormat(ParagrapghInputFormat.class);

That's it, you are done, now in your mapper you will receive one paragraph at a time.


To have a clear perspective of what we achieved above, let's assume that input file is as follows:
This is awesome.
WTF is this.

This is just a test.

And the mapper code looks like:

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

System.out.println(key+" : "+value);
}


Then the output to console for the mapper's system.out will be as follows:
0 : This is awesome. WTF is this. 
0 : This is just a test.

1 comment:

  1. We are in need to read a file in byte array of 3900 , apply some rules and write into output file. especially using the concept of mapreduce since we want the program to execute parallely in multiple nodes. may be like reading a file in mapper class and writing to out file using Reduce.Could you please throw some light on this.

    ReplyDelete

Any feedback, good or bad is most welcome.

Name

Email *

Message *