Wednesday, January 27, 2016

Introduction to Apache Spark with Examples and Use Cases View all articles



I first heard of Spark in late 2013 when I became interested in Scala, the language in which Spark is written. Some time later, I did a fun data science project trying to predict survival on the Titanic. This turned out to be a great way to get further introduced to Spark concepts and programming. I highly recommend it for any aspiring Spark developers looking for a place to get started.


Today, Spark is being adopted by major players like Amazon, eBay, and Yahoo! Many organizations run Spark on clusters with thousands of nodes. According to the Spark FAQ, the largest known cluster has over 8000 nodes. Indeed, Spark is a technology well worth taking note of and learning about.

apache spark tutorial
This article provides an introduction to Spark including use cases and examples. It contains information from the Apache Spark website as well as the book Learning Spark - Lightning-Fast Big Data Analysis.

What is Apache Spark? An Introduction


Spark is an Apache project advertised as “lightning fast cluster computing”. It has a thriving open-source community and is the most active Apache project at the moment.


Spark provides a faster and more general data processing platform. Spark lets you run programs up to 100x faster in memory, or 10x faster on disk, than Hadoop. Last year, Spark took over Hadoop by completing the 100 TB Daytona GraySort contest 3x faster on one tenth the number of machines and it also became the fastest open source engine for sorting a petabyte.


Spark also makes it possible to write code more quickly as you have over 80 high-level operators at your disposal. To demonstrate this, let’s have a look at the “Hello World!” of BigData: the Word Count example. Written in Java for MapReduce it has around 50 lines of code, whereas in Spark (and Scala) you can do it as simply as this:

sparkContext.textFile("hdfs://...")
            .flatMap(line => line.split(" "))
            .map(word => (word, 1)).reduceByKey(_ + _)
            .saveAsTextFile("hdfs://...")

Another important aspect when learning how to use Apache Spark is the interactive shell (REPL) which it provides out-of-the box. Using REPL, one can test the outcome of each line of code without first needing to code and execute the entire job. The path to working code is thus much shorter and ad-hoc data analysis is made possible.


Additional key features of Spark include:
  1. Currently provides APIs in Scala, Java, and Python, with support for other languages (such as R) on the way 
  2. Integrates well with the Hadoop ecosystem and data sources (HDFS, Amazon S3, Hive, HBase, Cassandra, etc.) 
  3. Can run on clusters managed by Hadoop YARN or Apache Mesos, and can also run standalone 


The Spark core is complemented by a set of powerful, higher-level libraries which can be seamlessly used in the same application. These libraries currently include SparkSQL, Spark Streaming, MLlib (for machine learning), and GraphX, each of which is further detailed in this article. Additional Spark libraries and extensions are currently under development as well.

spark libraries and extensions

Spark Core



Spark Core is the base engine for large-scale parallel and distributed data processing. It is responsible for:
memory management and fault recovery
scheduling, distributing and monitoring jobs on a cluster
interacting with storage systems


Spark introduces the concept of an RDD (Resilient Distributed Dataset), an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program.



RDDs support two types of operations:
  • Transformations are operations (such as map, filter, join, union, and so on) that are performed on an RDD and which yield a new RDD containing the result. 
  • Actions are operations (such as reduce, count, first, and so on) that return a value after running a computation on an RDD. 


Transformations in Spark are “lazy”, meaning that they do not compute their results right away. Instead, they just “remember” the operation to be performed and the dataset (e.g., file) to which the operation is to be performed. The transformations are only actually computed when an action is called and the result is returned to the driver program. This design enables Spark to run more efficiently. For example, if a big file was transformed in various ways and passed to first action, Spark would only process and return the result for the first line, rather than do the work for the entire file.


By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist or cache method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
SparkSQL


SparkSQL is a Spark component that supports querying data either via SQL or via the Hive Query Language. It originated as the Apache Hive port to run on top of Spark (in place of MapReduce) and is now integrated with the Spark stack. In addition to providing support for various data sources, it makes it possible to weave SQL queries with code transformations which results in a very powerful tool. Below is an example of a Hive compatible query:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Spark Streaming



Spark Streaming supports real time processing of streaming data, such as production web server log files (e.g. Apache Flume and HDFS/S3), social media like Twitter, and various messaging queues like Kafka. Under the hood, Spark Streaming receives the input data streams and divides the data into batches. Next, they get processed by the Spark engine and generate final stream of results in batches, as depicted below.

spark streaming


The Spark Streaming API closely matches that of the Spark Core, making it easy for programmers to work in the worlds of both batch and streaming data.

MLlib



MLlib is a machine learning library that provides various algorithms designed to scale out on a cluster for classification, regression, clustering, collaborative filtering, and so on (check out Toptal’s article on machine learning for more information on that topic). Some of these algorithms also work with streaming data, such as linear regression using ordinary least squares or k-means clustering (and more on the way). Apache Mahout (a machine learning library for Hadoop) has already turned away from MapReduce and joined forces on Spark MLlib.


Post BY RADEK OSTROWSKI

NB: This article was first featured in Toptal Engineering Blog.

Wednesday, February 18, 2015

Top 5 Performance Improvement Tips for Web Applications on AWS Cloud


Most of us are using AWS Cloud for developing and deploying web applications with a huge concern on the performance. A good performance is critical to ensure better user experience because faster performance leads to more user engagement on your web applications.
Here are the top 5 simple tips for improving your web applications performance and scalability.
  1. Elastic Load Balancer Level SSL Termination Many web applications use SSL for securely transmitting information from browser to web servers. However, SSL requires multiple handshakes, content encryption/decryption for each request. So, it would put an additional load on your web servers. If your web application is using SSL and deployed on AWS cloud then we recommend you to terminate SSL at Elastic Load Balancer (ELB) layer instead of your web/application servers. This will reduce the CPU load on your web servers and improve application response times.
  2. Enable Compression For Static Resources - Whether you are serving static content like images, JS, CSS or JSON/text files via CloudFront CDN or web servers, it’s recommended to ensure that static content compression is enabled. With content compression on server side, the static file sizes will be reduced to improve the download performance and help you save on bandwidth costs as well.
  3. Enable Browser Caching Headers - Modern browsers support client side caching so web applications can cache static resources (header images, logos, style sheets, javascripts etc) on browser side without downloading them from servers on every request. We recommend you to enable caching headers for all your static content whether it’s served via CloudFront or backend web servers. With client side caching headers, your web application can use browser side caching and reduce the load on backend infrastructure to improve application performance.
  4. Leverage Browser Parallelism - Most of the modern web browsers can make 3 to 6 parallel connections to a particular domain at a given point. If your application uses many static resources like images, files, styles etc, then it’s recommended to use multiple domains like images1.myappdomain.com,images2.myappdomain.comstatic.myappdomain.com to parallelise browser download of resources. All these domain can point to the same backend infrastructure like CloudFront distribution or ELB but browsers will essentially make more parallel connections to fetch resources as you are using multiple domains aliases for static resources and make your web application load faster.
  5. Performance Review Your Web Application - You should use tools like YSlow, PageSpeed, DynaTrace etc to analyse your web application performance and implement the recommended best practices. Many web application optimisation techniques are easy to implement and will help you in achieving a better user experience.
You can do this and lot more with this SaaS product named Botmetric, you can get started with a 14-day free trial.

Wednesday, June 11, 2014

What is the need to use job.setJarByClass in the driver section of Hadoop MapReduce solution?

Have you ever thought of this?

We do provide the jar to be executed by hadoop while executing hadoop command, don't we?

$ hadoop jar /some-jar.jar

So, why do we need to have the following line of code in our Driver section while declaring the Job object properties:
job.setJarByClass(SomeClass.class);

Answer to that is very simple. Here you help Hadoop to find out that which jar it should send to nodes to perform Map and Reduce tasks. Your some-jar.jar might have various other jars in it's classpath, also your driver code might be in a separate jar than that of your Mapper and Reducer classes.

Hence, using this setJarByClass method we tell Hadoop to find out the relevant jar by finding out that the class specified as it's parameter to be present as part of that jar. So usually you should provide either MapperImplementation.class or your Reducer implementation or any other class which is present in the same jar as that pf Mapper and Reducer. Also make sure that both Mapper and Reducer are part of the same jar.

Saturday, June 7, 2014

Alternative to deprecated DistributedCache class in Hadoop 2.2.0

As of Hadoop 2.2.0, if you use org.apache.hadoop.filecache.DistributedCache class to load files you want to add to your job as distributed cache, then your compiler will warn you regarding this class being deprecated.

In earlier versions of Hadoop, we used DistributedCache class in the following fashion to add files to be available to all mappers and reducers locally:
// In the main driver class using the new mapreduce API
Configuration conf = getConf();
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf);
...
Job job = new Job(conf);
...

// In the mapper class, mostly in the setup method
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);

But now, with Hadoop 2.2.0, the functionality of addition of files to distributed cache has been moved to the org.apache.hadoop.mapreduce.Job class. You may also notice that the constructor we used to use for the Job  class has also been deprecated and instead we should be using the new factory method getInstance(Configuration conf). The alternative solution would look as follows:

// In the main driver class using the new mapreduce API
Configuration conf = getConf();
...
Job job = Job.getInstance(conf);
...
job.addCacheFile(new URI(filename));

// In the mapper class, mostly in the setup method
URI[] localPaths = context.getCacheFiles();

Wednesday, May 21, 2014

DataWarehousing with Redshift: Benefits of a columnar data storage



This a short article stating the benefits of a columnar storage architecture.
In traditional database systems, records are stored into disk blocks by row. Each row is stored one after other. Each row sits in a block. Within those blocks each column sits one after another.
Following diagram illustrates this row-based storage architecture:




This kind of data storage architecture has been created keeping in mind that often a complete record is queried. This is a good fit for scenarios where in CRUD operations are in picture, and data is sought often for a complete row. This architecture is good for OLTP type of applications. 


However, for data analytics and data warehousing use-cases this type of storage architecture isn't a good fit. Amazon's Redshift takes up this issue and have come up with a columnar storage which helps in optimizing data warehousing related queries. In this architecture values for each column are stored sequentially into disk blocks.

Following diagram illustrates this column-based storage architecture:




Row-wise storage disadvantages:

  • If block size is smaller than the size of a record, storage for an entire record may take more than one block.
  • If block size is larger than the size of a record, storage for an entire record may take less than one block, resulting in an inefficient use of disk space.

So, if you can see, storing records in row-wise fashion isn't storage space optimized.


Column-wise storage advantages:


  • Each data block holds column field values for as many as three times as many records as row-based storage.
  • Requires a third of the I/O operations compared to row-wise storage. In practice, using tables with very large numbers of columns and very large row counts, storage efficiency is even greater.
  • Since each block holds the same type of data, it can use a compression scheme selected specifically for the column data type, further reducing disk space and I/O.
Hence, by using a columnar storage architecture Redshift has reduced the disk space being used to store same data resulting in reduced number of seeks it would be required to read the stored data in order to process an user query, ultimately increasing the query performance on large datasets.

Redshift achieves high performance over queries on very huge amount of data. Along with the columnar architecture there are other features Redshift has added in order to achieve this performance, they are:

  • Massively parallel processing
  • Data compression
  • Query optimization
  • Compiled code

I will take these up in detail in one of future posts. Try out Redshift if you need to analyze your data using some of the popular BI tools like Tableau and Jaspersoft. If your dataset size is huge and you are struggling to get good query performace, then I would certainly suggest you to try out Redshift once. Read more about it here.

Disclaimer: Many details including diagrams were taken from official AWS documentations.

Tuesday, October 8, 2013

Interesting Big-Data news from around the world : October 2013 - Week 1

Bigdata is in fashion. It has become the in-thing! So it is bound to make some news.

Here are some of the interesting reads I have for you from around the world for the first week of October 2013:
  1. Work4 Exploits Facebook's Graph Search - Who's Next?
    Think the only person that can solicit you on Facebook is a friend or friend of a friend? Think again.
    Today, Work4, a Facebook recruiting solution, is unveiling Graph Search Recruiter, a service which gives companies the ability to search for and contact potential job candidates from across Facebook’s entire membership. Except those whose privacy settings prevent it, that is.
    From a recruiter’s perspective, this seems to be as sexy as it gets.
    Read more...

  2. Why big data has made your privacy a thing of the past?
    Despite the efforts of European regulators to protect citizens' personal data, predictive analytics has made it too easy to piece together information about individuals regardless of the law.
    Read more...

  3. Big data blocks gaming fraud
    The explosion of online games has resulted in the creation of a new industry: in-game currency, currently valued at $1 billion in the U.S. alone. But game developers, particularly startups that are rising and falling on a single game, are losing significant revenue as savvy social game players figure out how to “game” the system by stealing currency rather than buying it.
    Read more...

  4. How to Find Out What Big Data Knows About You?
    The world of Big Data is a world of pervasive data collection and aggressive analytics. Some see the future and cheer it on; others rebel. Behind it all lurks a question most of us are asking—does it really matter? I had a chance to find out recently, as I got to see what Acxiom, a large-scale commercial data aggregator, had collected about me.
    At least in theory large scale data collection matters quite a bit. Large data sets can be used to create social network maps and can form the seeds for link analysis of connections between individuals. Some see this as a good thing; others as a bad one—but whatever your viewpoint, we live in a world which sees increasing power and utility in Big Data’s large scale data sets.
    Read more...

  5. Deutsche Telekom speeds up big data with hosted HANA
    Enterprises have another option for accessing SAP’s in-memory database technology: Deutsche Telekom subsidiary T-Systems has been approved to offer HANA Enterprise Cloud.
    The in-memory database technology can process large data volumes from business applications more quickly than standard server implementations, and also supports new integrated analytical methods, according to T-Systems.
    Read more...

Debugging Hadoop MR Java code in local eclipse dev environment

I have been asked multiple times to blog about this. One of my esteemed colleague has already blogged about it, so here I am just re-blogging it.

The basic thing to remember here is that debugging a Hadoop MR job is going to be similar to any remotely debugged application in Eclipse.

A debugger or debugging tool is a computer program that is used to test and debug other programs (the “target” program). It is greatly useful specially for a Hadoop environment wherein there is little room for error and one small error can cause a huge loss.

Debugging Custom Java code for Hadoop in your local eclipse environment is pretty straight forward and does not take much time to setup.

As you would know, Hadoop can be run in the local environment in 3 different modes :

1. Local Mode
2. Pseudo Distributed Mode
3. Fully Distributed Mode (Cluster)

Typically you will be running your local hadoop setup in Pseudo Distributed Mode to leverage HDFS and Map Reduce(MR). However you cannot debug MR programs in this mode as each Map/Reduce task will be running in a separate JVM process so you need to switch back to Local mode where you can run your MR programs in a single JVM process.

Here are the quick and simple steps to debug this in your local environment:

1. Run hadoop in local mode for debugging so mapper and reducer tasks run in a single JVM instead of separate JVMs. Below steps help you do it.

2. Configure HADOOP_OPTS to enable debugging so when you run your Hadoop job, it will be waiting for the debugger to connect. Below is the command to debug the same at port 8080.

(export HADOOP_OPTS=”-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8008“)

3. Configure fs.default.name value in core-site.xml to file:/// from hdfs://. You won’t be using hdfs in local mode.

4. Configure mapred.job.tracker value in mapred-site.xml to local. This will instruct Hadoop to run MR tasks in a single JVM.

5. Create debug configuration for Eclipse and set the port to 8008 – typical stuff. For that go to the debugger configurations and create a new Remote Java Application type of configuration and set the port as 8080 in the settings.

7. Run your hadoop job (it will be waiting for the debugger to connect) and then launch Eclipse in debug mode with the above configuration. Do make sure to put a break-point first.

That is all you need to do.

Thursday, September 26, 2013

Hadoop: Exit reduce function for the rest of input on some condition

Say we have a reduce function where in we want to halt the reduce function after processing some 'n' keys. We have set a counter to increment on each key, and on condition being satisfied return from the reduce function without bothering about rest of the keys.

Fortunately with the new mapreduce API, we have the ability to do that now.

You can achieve this by overriding the run() of the Reducer class.

Let's straight get to the code:

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

  //reduce method here

  // Override the run()
  @override
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    int count = 0;
    while (context.nextKey()) {
        if (count++ < n) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        } else {
            // exit or do whatever you want
        }
    }
    cleanup(context);
  }
}

Hadoop's speculative task execution

One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program.

Tasks may be slow for various reasons, including hardware degradation, or software mis-configuration, but the causes may be hard to detect since the tasks still complete successfully, albeit after a longer time than expected. Hadoop doesn’t try to diagnose and fix slow-running tasks; instead, it tries to detect when a task is running slower than expected and launches another, equivalent, task as a backup. This is termed speculative execution of tasks.

For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution JobConf options to false, respectively using old API, while with newer API you may consider changing mapreduce.map.speculative and mapreduce.reduce.speculative.

If your jobs' mapper/redcuer takes very less time then it is better to disable the speculative execution. But in cases where your jobs' each mapper/reducer takes a considerable amount of time, you would be better off having speculative execution enabled as if after running for say 45 minutes, if a mapper/reducer fails due to loss of that particular node then another node has to restart it's execution from scratch.

Reference: http://developer.yahoo.com/hadoop/tutorial/module4.html

Sunday, July 28, 2013

Custom Writable : SetWritable

Recently, I needed to have the output key of mapper to be a Set.

While there are chances that in newer versions of hadoop we will have this pre-implemented, but it isn't available yet, so I wrote my own.

In addition to overriding readFields() and write() , other methods need to be overridden are:
  • compareTo()
  • equals()
  • hashCode()

Following is the SetWritable class:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.io.WritableComparable;

/**
 * 
 * @author amar
 */
public class SetWritable implements WritableComparable<SetWritable> {

 private Set<Integer> itemSet;

 /**
  * Constructor.
  */
 public SetWritable() {

 }

 /**
  * Constructor.
  * 
  * @param itemSet
  */
 public SetWritable(Set<Integer> itemSet) {

  this.itemSet = itemSet;
 }

 @Override
 public String toString() {

  return itemSet.toString();
 }

 @Override
 public void readFields(DataInput in) throws IOException {

  // First clear the set. Otherwise we will just accumulate
  // entries every time this method is called.
  if (this.itemSet != null) {
   this.itemSet.clear();
  } else {
   this.itemSet = new HashSet<Integer>();
  }
  int count = in.readInt();
  while (count-- > 0) {
   itemSet.add(in.readInt());
  }
 }

 @Override
 public void write(DataOutput out) throws IOException {

  out.writeInt(itemSet.size());
  for (int item : itemSet) {
   out.writeInt(item);
  }
 }

 @Override
 public int compareTo(ItemSetKey o) {

  if (itemSet.equals(o.itemSet))
   return 0;
  else
   return 1;
 }

 @Override
 public boolean equals(Object other) {

  if (this == other)
   return true;

  if (other == null || (this.getClass() != other.getClass())) {
   return false;
  }

  ItemSetKey guest = (ItemSetKey) other;
  return (this.itemSet.equals(guest.itemSet));
 }

 @Override
 public int hashCode() {

  int result = 0;
  result = this.itemSet.hashCode();
  return result;
 }

 /**
  * Gets the itemSet.
  * 
  * @return itemSet.
  */
 public Set<Integer> getItemSet() {

  return itemSet;
 }

 public void setItemSet(Set<Integer> itemSet) {

  this.itemSet = itemSet;
 }
}

Now, in run(), MapOutputKeyClass needs to be set as follows:

job.setMapOutputKeyClass(ItemSetKey.class);

Saturday, June 1, 2013

Launch Amazon EMR cluster using AWS Java SDK

The Java SDK for AWS has utilities to launch and manage EMR clusters.

You can do the following :
  1. Define and create steps to be added to the EMR cluster.
  2. Define and create bootstrap actions.
  3. Define and launch cluster with above created bootstrap actions and steps
Certainly, there are much more features in the SDK that can be explored.

Let's delve into code without much ado.

Define and create steps:

The important classes which we shall know about here are:
  • com.amazonaws.services.elasticmapreduce.model.StepConfig
  • com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig
For example, if we need to add a step to install hive, we can get the object of HadoopJarStepConfigclass having details for installing hive(like jar location, arguments to be passed, etc.) using StepFactory, which has a number of predefined hadoop jar steps.
Code would look as below:
StepFactory stepFactory = new StepFactory();
StepConfig installHive = new StepConfig().withName("Install Hive")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newInstallHiveStep());

In order to add our custom jars, we shall create object of HadoopJarStepConfig like follows and then using that create the required StepFactory:
HadoopJarStepConfig customJarStep = new HadoopJarStepConfig("Path-to-jar-on-s3");
customJarStep.setArgs("argumnet-list-to-be-passed-to-jar");
StepConfig doSomething = new StepConfig().withName("Do Some Task")
.withHadoopJarStep(customJarStep);

Define and create bootstrap actions:

To quote amazon EMR's docs:
You can use bootstrap actions to install additional software and to change the configuration of applications on the cluster. Bootstrap actions are scripts that are run on the cluster nodes when Amazon EMR launches the cluster. They run before Hadoop starts and before the node begins processing data. You can write custom bootstrap actions, or use predefined bootstrap actions provided by Amazon EMR. A common use of bootstrap actions is to change Hadoop configuration settings. 
Let's see how we can create a BootstrapActionConfig.
The important classes which we shall know about here are:
  • com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig
  • com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig

Following code should be self-explanatory if you have already know about using bootstrap to set custom hadoop settings, if not, read more here.
String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop";
ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig();
 bootstrapScriptConfig.setPath(bootstrapPath);
List<String> setMappersArgs = new ArrayList<String>();
setMappersArgs.add("-s");
setMappersArgs.add("textinputformat.record.delimiter=;");
bootstrapScriptConfig.setArgs(args);
BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig();
bootstrapConfig.setName("Set Hadoop Config");
bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig);

Define and launch cluster:

The important class which we shall know about here is :
  • com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest 
Here you may define the various aspects of a cluster, like:
  • Cluster size (Instance count) 
  • Master instance type 
  • Slave instance type 
  • Steps to be added 
  • Bootstrap actions 
  • Log location on S3 
Code is as follows:
RunJobFlowRequest request = new RunJobFlowRequest()
        .withBootstrapActions(mappersBootstrapConfig)
        .withName("Hive Interactive")
        .withSteps(enabledebugging, installHive)
        .withLogUri("s3://myawsbucket/")
        .withInstances(
                new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20")
                        .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true)
                        .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small"));

In order to launch the cluster, execute the following:
RunJobFlowResult result = emr.runJobFlow(request);
That's it. Complete code show-casing above mentioned features is as below:
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;

/**
 * 
 * @author amar
 * 
 */
public class RunEMRJobFlow {

 private static final String CONFIG_HADOOP_BOOTSTRAP_ACTION = "s3://elasticmapreduce/bootstrap-actions/configure-hadoop";

 public static void main(String[] args) {

  String accessKey = "yourAccessKeyHere";
  String secretKey = "yourSecretKeyHere";
  AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
  AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);

  StepFactory stepFactory = new StepFactory();

  StepConfig enabledebugging = new StepConfig().withName("Enable debugging")
    .withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep());

  StepConfig installHive = new StepConfig().withName("Install Hive").withActionOnFailure("TERMINATE_JOB_FLOW")
    .withHadoopJarStep(stepFactory.newInstallHiveStep());
  
  List<String> setMappersArgs = new ArrayList<String>();
  setMappersArgs.add("-s");
  setMappersArgs.add("textinputformat.record.delimiter=;");

  BootstrapActionConfig mappersBootstrapConfig = createBootstrapAction("Set Hadoop Config",
    CONFIG_HADOOP_BOOTSTRAP_ACTION, setMappersArgs);

  RunJobFlowRequest request = new RunJobFlowRequest()
    .withBootstrapActions(mappersBootstrapConfig)
    .withName("Hive Interactive")
    .withSteps(enabledebugging, installHive)
    .withLogUri("s3://myawsbucket/")
    .withInstances(
      new JobFlowInstancesConfig().withEc2KeyName("keypair").withHadoopVersion("0.20")
        .withInstanceCount(5).withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m1.small").withSlaveInstanceType("m1.small"));

  RunJobFlowResult result = emr.runJobFlow(request);
 }

 private static BootstrapActionConfig createBootstrapAction(String bootstrapName, String bootstrapPath,
   List<String> args) {

  ScriptBootstrapActionConfig bootstrapScriptConfig = new ScriptBootstrapActionConfig();
  bootstrapScriptConfig.setPath(bootstrapPath);

  if (args != null) {
   bootstrapScriptConfig.setArgs(args);
  }

  BootstrapActionConfig bootstrapConfig = new BootstrapActionConfig();
  bootstrapConfig.setName(bootstrapName);
  bootstrapConfig.setScriptBootstrapAction(bootstrapScriptConfig);

  return bootstrapConfig;
 }

}

Thursday, May 23, 2013

Amazon EMR : How to add More than 256 Steps to a Cluster?

If you have been using Amazon EMR for long and complex tasks, you might know that EMR currently limits the number of steps which can be added to 256.

But at times, it becomes a tad difficult to limit the number of steps to 256. It might be because the problem at hand being complex and needed to be broken into several steps and needed to be run over varied sets of data. Or one might have long running jobs taking care of multiple tasks for a Hive-based data warehouse. Whatever, may be the reason, you shouldn't get depressed about it! As there is a simple workaround for this:

Manually connect to the master node and submit job steps! Just like you run it on you local machine!
Yeah, that's it. Simple.

EMR's CLI already has ways which can facilitate things for us here.
Assuming that you already have a cluster spawned and have it's JobFlowID, follow the below steps to submit job steps directly to the master node:

  1. Move your executables to the master node
    In order to run your job step, you will need to have the jar and or other files required by your job to be moved to the master node. This can be done as follows using EMR CLI's --scp:
    ruby elastic-mapreduce --jobflow JobFlowID --scp myJob.jar
  2. Execute hadoop command, just like you do in local machine.
    This can also be done as follows using EMR CLI's --ssh:
    ruby elastic-mapreduce --jobflow JobFlowID --ssh ' hadoop jar myJob.jar inputPath outputPath otherArguments'
There are other ways also. Refer here for more.


Tuesday, May 14, 2013

How to read a file in HDFS using Hadoop

You may want to read programmatically from HDFS in your mapper/reducer. Following on my post on How to write a file in HDFS using Hadoop, here is the simple code to read a file from HDFS:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ReadFromHDFS {

 public static void main(String[] args) throws Exception {

  if (args.length < 1) {
   System.out.println("Usage: ReadFromHDFS ");
   System.out.println("Example: ReadFromHDFS 'hdfs:/localhost:9000/myFirstSelfWriteFile'");
   System.exit(-1);
  }

  try {
   Path path = new Path(args[0]);
   FileSystem fileSystem = FileSystem.get(new Configuration());
   BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
   String line = bufferedReader.readLine();
   while (line != null) {
    System.out.println(line);
    line = bufferedReader.readLine();
   }
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

Sunday, April 28, 2013

Reduce EMR costs : use spot instances and yes, without risk!

As aws states, Spot Instances can significantly lower your computing costs for time-flexible, interruption-tolerant tasks. But, hadoop jobs being run on EMR aren't generally interruption-tolerant. So, how do we use spot instances and still do not lose our clusters while being in between of a running task?

The answer lies in not having all the machines in a cluster to be spot-instances : some on-demand and some spot instances.

Now, how to achieve this?

There are 3 essential instance groups into which the nodes launched as a part of cluster falls into:
  1. master instance group : This is a must have with a single master node
  2. core instance group : If there are any slave node then there must be at least one node in this instance group. Nodes in this group act as both Tasknode and Datanode.
  3. task instance group : If a cluster has core instance group, it can also have a task instance group containing one or more Tasknodes. They do not have Datanodes (do not contain HDFS).

You can choose to use either on-demand or spot-instances for each of your job flows. This is valid for all of the above instance groups. However, from the definition above if you lose a master or core machine then your job is bound to fail. Theoretically, you can have something like:

elastic-mapreduce –create –alive –plain-output
...
–instance-group master –instance-type m1.small –instance-count 1 –bid-price 0.091 \
–instance-group core –instance-type m1.small –instance-count 10 –bid-price 0.031 \
–instance-group task –instance-type m1.small –instance-count 30 –bid-price 0.021
...

But realistically, as you know, if you request spot instances, keep in mind that if the current spot price exceeds your max bid, either instances will not be provisioned or will be removed from the current job flow. Thus, if at any time the bid price goes higher and you lose any of your CORE or MASTER node then the job will fail. Both CORE and TASKS nodes run TaskTrackers but only CORE nodes run DataNodes so you would need at least one CORE node.

To hedge the complete loss of a jobflow, multiple instance groups can be created where the `CORE` group is a smaller complement of traditional on-demand systems and the `TASK` group is the group of spot instances. In this configuration, the `TASK` group will only benefit the mapper phases of a job flow as work from the `TASK` group is “hand back up” to the `CORE` group for reduction.

So, say if you have to run a job which would ideally need 40 slave machines, then you can have, say 10 machines(CORE group) as the traditional instance while other 30 as spot instances(TASK group). The syntax for creating the multiple instance groups is below:

elastic-mapreduce –create –alive –plain-output
...
–instance-group master –instance-type m1.small –instance-count 1 \
–instance-group core –instance-type m1.small –instance-count 10 \
–instance-group task –instance-type m1.small –instance-count 30 –bid-price 0.021.

This will help you to save cost by running spot instances as your nodes and at the same time make sure that job does not fail.

PS:
However, keep in mind that it is possible, depending upon your price and the time taken to complete the job, the spot instances may come and go so might in the worst case end up incurring the same cost and taking longer time to complete the job. It will all depend on your bid price so choose the price wisely. We have also been successful in running short tasks (20 minutes) with all the machines as spot instances!

Any feedback, good or bad is most welcome.

Name

Email *

Message *