Saturday, June 1, 2013

Launch Amazon EMR cluster using AWS Java SDK

The Java SDK for AWS has utilities to launch and manage EMR clusters.

You can do the following :
  1. Define and create steps to be added to the EMR cluster.
  2. Define and create bootstrap actions.
  3. Define and launch cluster with above created bootstrap actions and steps
Certainly, there are much more features in the SDK that can be explored.

Let's delve into code without much ado.

Define and create steps:

The important classes which we shall know about here are:
  • com.amazonaws.services.elasticmapreduce.model.StepConfig
  • com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig
For example, if we need to add a step to install hive, we can get the object of HadoopJarStepConfigclass having details for installing hive(like jar location, arguments to be passed, etc.) using StepFactory, which has a number of predefined hadoop jar steps.
Code would look as below:
StepFactory stepFactory = new StepFactory();
StepConfig installHive = new StepConfig().withName("Install Hive")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newInstallHiveStep());

In order to add our custom jars, we shall create object of HadoopJarStepConfig like follows and then using that create the required StepFactory:
HadoopJarStepConfig customJarStep = new HadoopJarStepConfig("Path-to-jar-on-s3");
customJarStep.setArgs("argumnet-list-to-be-passed-to-jar");
StepConfig doSomething = new StepConfig().withName("Do Some Task")
.withHadoopJarStep(customJarStep);

Define and create bootstrap actions:

To quote amazon EMR's docs:
You can use bootstrap actions to install additional software and to change the configuration of applications on the cluster. Bootstrap actions are scripts that are run on the cluster nodes when Amazon EMR launches the cluster. They run before Hadoop starts and before the node begins processing data. You can write custom bootstrap actions, or use predefined bootstrap actions provided by Amazon EMR. A common use of bootstrap actions is to change Hadoop configuration settings. 
Let's see how we can create a BootstrapActionConfig.
The important classes which we shall know about here are:
  • com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig
  • com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig

Following code should be self-explanatory if you have already know about using bootstrap to set custom hadoop settings, if not, read more here.
String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop";
ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig();
 bootstrapScriptConfig.setPath(bootstrapPath);
List<String> setMappersArgs = new ArrayList<String>();
setMappersArgs.add("-s");
setMappersArgs.add("textinputformat.record.delimiter=;");
bootstrapScriptConfig.setArgs(args);
BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig();
bootstrapConfig.setName("Set Hadoop Config");
bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig);

Define and launch cluster:

The important class which we shall know about here is :
  • com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest 
Here you may define the various aspects of a cluster, like:
  • Cluster size (Instance count) 
  • Master instance type 
  • Slave instance type 
  • Steps to be added 
  • Bootstrap actions 
  • Log location on S3 
Code is as follows:
RunJobFlowRequest request = new RunJobFlowRequest()
        .withBootstrapActions(mappersBootstrapConfig)
        .withName("Hive Interactive")
        .withSteps(enabledebugging, installHive)
        .withLogUri("s3://myawsbucket/")
        .withInstances(
                new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20")
                        .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true)
                        .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small"));

In order to launch the cluster, execute the following:
RunJobFlowResult result = emr.runJobFlow(request);
That's it. Complete code show-casing above mentioned features is as below:
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;

/**
 * 
 * @author amar
 * 
 */
public class RunEMRJobFlow {

 private static final String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop";

 public static void main(String[] args) {

  String accessKey = "yourAccessKeyHere";
  String secretKey = "yourSecretKeyHere";
  AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
  AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);

  StepFactory stepFactory = new StepFactory();

  StepConfig enabledebugging = new StepConfig().withName("Enable debugging")
    .withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep());

  StepConfig installHive = new StepConfig().withName("Install Hive").withActionOnFailure("TERMINATE_JOB_FLOW")
    .withHadoopJarStep(stepFactory.newInstallHiveStep());
  
  List<String> setMappersArgs = new ArrayList<String>();
  setMappersArgs.add("-s");
  setMappersArgs.add("textinputformat.record.delimiter=;");

  BootstrapActionConfig mappersBootstrapConfig = createBootstrapAction("Set Hadoop Config",
    CONFIG_HADOOP_BOOTSTRAP_ACTION, setMappersArgs);

  RunJobFlowRequest request = new RunJobFlowRequest()
    .withBootstrapActions(mappersBootstrapConfig)
    .withName("Hive Interactive")
    .withSteps(enabledebugging, installHive)
    .withLogUri("s3://myawsbucket/")
    .withInstances(
      new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20")
        .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small"));

  RunJobFlowResult result = emr.runJobFlow(request);
 }

 private static BootstrapActionConfig createBootstrapAction(String bootstrapName, String bootstrapPath,
   List<String> args) {

  ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig();
  bootstrapScriptConfig.setPath(bootstrapPath);

  if (args != null) {
   bootstrapScriptConfig.setArgs(args);
  }

  BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig();
  bootstrapConfig.setName(bootstrapName);
  bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig);

  return bootstrapConfig;
 }

}

Any feedback, good or bad is most welcome.

Name

Email *

Message *