Saturday, April 13, 2013

Creating custom InputFormat : reading paragraph as a record

Say, instead of processing a single line, you need to process a complete paragraph at once as a single record. How will you achieve this?

In order to do this we will need to customize the default behaviour of TextInputFormat i.e. to read each line, into reading a complete paragraph.

But first you will need to create a custom record reader, this can be done by implementing the class RecordReader. The main method where you would tell the record reader to fetch a paragraph instead of a line is next(). See the following implementation, it is self-explanatory:

public class ParagraphRecordReader implements RecordReader<LongWritable, Text>
{
private LineRecordReader lineRecord;
private LongWritable lineKey;
private Text lineValue;
public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
lineRecord = new LineRecordReader(conf, split);
lineKey = lineRecord.createKey();
lineValue = lineRecord.createValue();
}

@Override
public void close() throws IOException {
lineRecord.close();
}

@Override
public LongWritable createKey() {
return new LongWritable();

}

@Override
public Text createValue() {
return new Text("");

}

@Override
public float getProgress() throws IOException {
return lineRecord.getPos();

}

@Override
public synchronized boolean next(LongWritable key, Text value) throws IOException {
boolean appended, isNextLineAvailable;
boolean retval;
byte space[] = {' '};
value.clear();
isNextLineAvailable = false;
do {
appended = false;
retval = lineRecord.next(lineKey, lineValue);
if (retval) {
if (lineValue.toString().length() > 0) {
byte[] rawline = lineValue.getBytes();
int rawlinelen = lineValue.getLength();
value.append(rawline, 0, rawlinelen);
value.append(space, 0, 1);
appended = true;
}
isNextLineAvailable = true;
}
} while (appended);

return isNextLineAvailable;
}

@Override
public long getPos() throws IOException {
return lineRecord.getPos();
}
}

You will need to then extend TextInputFormat to create a custom InputFomat, just overriding the getRecordReader method and returning the above created ParagraphRecordReader will be sufficient.

Your ParagrapghInputFormat will look like follows:

public class ParagrapghInputFormat extends TextInputFormat
{
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException {
reporter.setStatus(split.toString());
return new ParagraphRecordReader(conf, (FileSplit)split);
}
}

Now, one final thing is to set the input format, set it as follows:

conf.setInputFormat(ParagrapghInputFormat.class);

That's it, you are done, now in your mapper you will receive one paragraph at a time.


To have a clear perspective of what we achieved above, let's assume that input file is as follows:
This is awesome.
WTF is this.

This is just a test.

And the mapper code looks like:

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {

System.out.println(key+" : "+value);
}


Then the output to console for the mapper's system.out will be as follows:
0 : This is awesome. WTF is this. 
0 : This is just a test.

2 comments:

  1. We are in need to read a file in byte array of 3900 , apply some rules and write into output file. especially using the concept of mapreduce since we want the program to execute parallely in multiple nodes. may be like reading a file in mapper class and writing to out file using Reduce.Could you please throw some light on this.

    ReplyDelete
  2. Notable points on your website. We are interested to add some more information to this post. Keep posting new content.
    Thanks
    Reliable dedicated hosting

    ReplyDelete

Any feedback, good or bad is most welcome.

Name

Email *

Message *