Tuesday, February 26, 2013

How to perform a global aggregation or count using Hadoop?

Override the run() of the mapper and reducer using the new org.apache.hadoop.mapreduce API. In these methods you can emit the accumulated sum/count from each mapper or reducer.
Also you would need to limit the reducer count by 1 so as to get a global sum of all the sums generated by multiple mappers.
See the below code for more clarity:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AggregationExample extends Configured implements Tool {

/**
* This is Mapper.
* 
*/
public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

try {
// say that you need to sum up the value part
sum+= Double.valueOf(value);
}

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}

// emit out the sum per mapper
outputKey.set(sum);
context.write(outputKey, outputValue);// Notice that the outputValue is empty
cleanup(context);

}
}

/**
* This is Reducer.
* 
*/
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {


// summation of values from each mapper
sum += Double.valueOf(key.toString());

}

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}

// emit out the global sums
outputKey.set(sum);
context.write(outputKey, outputValue);
cleanup(context);
}
}

@Override
public int run(String[] args) throws Exception {

try {
Configuration conf = getConf();

// output key and value separator is empty as in final output only
// key is emitted and value is empty
conf.set("mapred.textoutputformat.separator", "");

// Configuring mapred to have just one reducer as we need to find
// single sum values from all the inputs
conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
conf.setInt("mapred.reduce.tasks", 1);

Job job = new Job(conf);

job.setJarByClass(AggregationExample.class);
job.setJobName("Aggregation Example");

job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}

}

public static void main(String[] args) throws Exception {

if (args.length < 2) {
System.out
.println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
System.exit(-1);
}

int result = ToolRunner.run(new AggregationExample(), args);
System.exit(result);
}

}

9 comments:

  1. Can't we use context.getCounter().increment(1) to set the counters in mapper. and in reducer you can get the value from context.

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. ERPTREE is a leading oracle fuison HCM Training Institute. we offer this course through online we have great experience in succeeding students through online courses. we can calculate our performance through their honest comments in our sites in supporting our services. we have referral program so candidates can earn money through referral. you can share your live experience with other can generate you some money.


    Oracle Fusion HCM Training

    ReplyDelete
  5. It is really a great work and the way in which you are sharing the knowledge is excellent.Thanks for your informative article


    Hadoop Online Training
    R Programming Online Training|
    Data Science Online Training|

    ReplyDelete
  6. REALLY VERY EXCELLENT INFORMATION. I AM VERY GLAD TO SEE YOUR BLOG FOR THIS INFORMATION. THANKS FOR SHARING. KEEP UPDATING.

    NO.1 AQM Services | Application Quality Managment Services | Austere Technologies

    ReplyDelete
  7. wow...nice blog, very helpful information. Thanks for sharing.

    NO.1 API DEVELOPMENT SERVICES | MASSIL TECHNOLOGIES

    ReplyDelete
  8. Wow !! Excellent blog.. Very useful information. Thanks for sharing


    HDFS Training

    ReplyDelete

Any feedback, good or bad is most welcome.

Name

Email *

Message *