Friday, December 28, 2012

Top 5 Open Source Big Data Technologies to Watch

With more and more work is being put into leveraging the plethora of data we have, a number of technologies have come up and are slowly getting mature. Here is a list of top 5 such technologies you should have a watch on if ever you need any of your workflows to handle huge amounts of data.

  1. Apache Hadoop
    Apache Hadoop is an open source software framework for data-intensive distributed applications originally created by Doug Cutting to support his work on Nutch, an open source Web search engine. To meet Nutch's multimachine processing requirements, Cutting implemented a MapReduce facility and a distributed file system that together became Hadoop. He named it after his son's toy elephant. Through MapReduce, Hadoop distributes Big Data in pieces over a series of nodes running on commodity hardware. Hadoop is now among the most popular technologies for storing the structured, semi-structured and unstructured data that comprise Big Data. Hadoop is available under the Apache License 2.0.

  2. R
    R is an open source programming language and software environment designed for statistical computing and visualization. R was designed by Ross Ihaka and Robert Gentleman at the University of Auckland, New Zealand beginning in 1993 and is rapidly becoming the go-to tool for statistical analysis of very large data sets. It has been commercialized by a company called Revolution Analytics, which is pursuing a services and support model inspired by Red Hat's support for Linux. R is available under the GNU General Public License.

  3. Scribe
    Scribe is a server developed by Facebook and released in 2008. It is intended for aggregating log data streamed in real time from a large number of servers. Facebook designed it to meet its own scaling challenges, and it now uses Scribe to handle tens of billions of messages a day. It is available under the Apache License 2.0.

  4. ElasticSearch
    Developed by Shay Banon and based upon Apache Lucene, ElasticSearch is a distributed, RESTful open source search server. It's a scalable solution that supports near real-time search and multitenancy without a special configuration. It has been adopted by a number of companies, including StumbleUpon and Mozilla. ElasticSearch is available under the Apache License 2.0.

  5. Apache HBase
    Written in Java and modeled after Google's BigTable,Apache HBase is an open source, non-relational columnar distributed database designed to run on top of Hadoop Distributed Filesystem (HDFS). It provides fault-tolerant storage and quick access to large quantities of sparse data. HBase is one of a multitude of NoSQL data stores that have become available in the past several years. In 2010, Facebook adopted HBase to serve its messaging platform. It is available under the Apache License 2.0.

What is "Distributed Cache" in Hadoop?

The bulk of the data that you process in a MapReduce job will probably be stored in large files spread across the HDFS. You can reliably store petabytes of information in HDFS and individual jobs can process several terabytes at a time. The HDFS access model, however, assumes that the data from a file should be read into a single mapper. The individual files stored in HDFS are very large and can possibly be broken into different chunks for processing in parallel.

Sometimes it is necessary for every Mapper to read a single file; for example, a distributed spell-check application would require every Mapper to read in a copy of the dictionary before processing documents. The dictionary will be small (only a few megabytes), but needs to be widely available so that all nodes can reach it.

Hadoop provides a mechanism specifically for this purpose, called the distributed cache. The distributed cache can contain small data files needed for initialization or libraries of code that may need to be accessed on all nodes in the cluster.

To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system. The file names are specified as URI objects; unless qualified otherwise, they assume that the file is present on the HDFS in the path indicated. You can copy local files to HDFS with the FileSystem.copyFromLocalFile() method.

When you want to retrieve files from the distributed cache (e.g., when the mapper is in its configure() step and wants to load config data like the dictionary mentioned above), use the DistributedCache.getLocalCacheFiles() method to retrieve the list of paths local to the current node for the cached files. These are copies of all cached files, placed in the local file system of each worker machine. (They will be in a subdirectory of mapred.local.dir.) Each of the paths returned by getLocalCacheFiles() can be accessed via regular Java file I/O mechanisms, such as java.io.FileInputStream.

As a cautionary note: If you use the local JobRunner in Hadoop (i.e., what happens if you call JobClient.runJob() in a program with no or an empty hadoop-conf.xml accessible), then no local data directory is created; the getLocalCacheFiles() call will return an empty set of results. Unit test code should take this into account.

Suppose that we were writing an inverted index builder. We do not want to include very common words such "the," "a," "and," etc. These so-called stop words might all be listed in a file. All the mappers should read the stop word list when they are initialized, and then filter the index they generate against this list. We can disseminate a list of stop words to all the Mappers with the following code. The first listing will put the stop-words file into the distributed cache:

public static final String LOCAL_STOPWORD_LIST =
      "/home/aaron/stop_words.txt";
 
  public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";
 
  void cacheStopWordList(JobConf conf) throws IOException {
    FileSystem fs = FileSystem.get(conf);
    Path hdfsPath = new Path(HDFS_STOPWORD_LIST);
 
    // upload the file to hdfs. Overwrite any existing copy.
    fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
        hdfsPath);
 
    DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
  }

This code copies the local stop_words.txt file into HDFS, and then tells the distributed cache to send the HDFS copy to all nodes in the system. The next listing actually uses the file in the mapper:

class IndexMapperExample implements Mapper {
  void configure(JobConf conf) {
    try {
      String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).getName();
      Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
      if (null != cacheFiles && cacheFiles.length > 0) {
        for (Path cachePath : cacheFiles) {
          if (cachePath.getName().equals(stopwordCacheName)) {
            loadStopWords(cachePath);
            break;
          }
        }
      }
    } catch (IOException ioe) {
      System.err.println("IOException reading from distributed cache");
      System.err.println(ioe.toString());
    }
  }
 
  void loadStopWords(Path cachePath) throws IOException {
    // note use of regular java.io methods here - this is a local file now
    BufferedReader wordReader = new BufferedReader(
        new FileReader(cachePath.toString()));
    try {
      String line;
      this.stopWords = new HashSet();
      while ((line = wordReader.readLine()) != null) {
        this.stopWords.add(line);
      }
    } finally {
      wordReader.close();
    }
  }
 
  /* actual map() method, etc go here */
}
The code above belongs in the Mapper instance associated with the index generation process. We retrieve the list of files cached in the distributed cache. We then compare the basename of each file (using Path.getName()) with the one we expect for our stop word list. Once we find this file, we read the words, one per line, into a Set instance that we will consult during the mapping process.

The distributed cache has additional uses too. For instance, you can use the DistributedCache.addArchiveToClassPath() method to send a .jar file to all the nodes. It will be inserted into the classpath as well, so that classes in the archive can be accessed by all the nodes.

Source: http://developer.yahoo.com/hadoop/tutorial/module5.html

EMR Streaming job using Java code for mapper and reducer [Creatingcustom jar]

Here is a basic sample of how to create a custom jar for an EMR streaming job.

Let's assume that the mapper code needs to reads from a csv file (which will be read into EMR's distributed cache) as well as it reads from the input s3 bucket which also has some csv files, does some calculations and prints a csv output lines to standard output.
There will be one Main class which would contain one implementation each of the following classes:

org.apache.hadoop.mapreduce.Mapper;
org.apache.hadoop.mapreduce.Reducer;


Each of these have to override methods map() and reduce() to do the desired job.

The Java class for Mapper would look like following:

public class SomeJob extends Configured implements Tool {

private static final String JOB_NAME = "My Job";

/**
* This is Mapper.
*/
public static class MapJob extends Mapper {

private Text outputKey = new Text();
private Text outputValue = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {

// Get the cached file
Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];

File fileObject = new File (file.toString());
// Do whatever required with file data
}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
outputKey.set("Some key calculated or derived");
outputVey.set("Some Value calculated or derived");
context.write(outputKey, outputValue);
}
}

/**
* This is Reducer.
*/
public static class ReduceJob extends Reducer {

private Text outputKey = new Text();
private Text outputValue = new Text();

@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException,
InterruptedException {
outputKey.set("Some key calculated or derived");
outputVey.set("Some Value calculated or derived");
context.write(outputKey, outputValue);
}
}

@Override
public int run(String[] args) throws Exception {

try {
Configuration conf = getConf();
DistributedCache.addCacheFile(new URI(args[2]), conf);
Job job = new Job(conf);

job.setJarByClass(TaxonomyOverviewReportingStepOne.class);
job.setJobName(JOB_NAME);

job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}

}

public static void main(String[] args) throws Exception {

if (args.length < 3) {
System.out
.println("Usage: SomeJob   ");
System.exit(-1);
}

int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args);
System.exit(result);
}

}

Now in order to spawn the cluster the command should look like:

ruby elastic-mapreduce --create --alive --plain-output --master-instance-type m1.xlarge --slave-instance-type m1.xlarge --num-instances 11  --name "Java Pipeline" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "--mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml"

This command should return a job ID, which shall be used in order to add steps to be executed in orderly fashion by the cluster in distributed fashion.

To add Job Steps:

Step 1:

ruby elastic-mapreduce --jobflow  --jar s3://somepath/job-one.jar --arg s3://somepath/input-one --arg s3://somepath/output-one --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
Step2:

ruby elastic-mapreduce --jobflow  --jar s3://somepath/job-two.jar --arg s3://somepath/output-one --arg s3://somepath/output-two --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

How to copy files from S3 to Amazon EMR's HDFS?

Amazon itself has a wrapper implemented over distcp, namely : s3distcp . S3DistCp is an extension of DistCp that is optimized to work with Amazon Web Services (AWS), particularly Amazon Simple Storage Service (Amazon S3). You use S3DistCp by adding it as a step in a job flow. Using S3DistCp, you can efficiently copy large amounts of data from Amazon S3 into HDFS where it can be processed by subsequent steps in your Amazon Elastic MapReduce (Amazon EMR) job flow. You can also use S3DistCp to copy data between Amazon S3 buckets or from HDFS to Amazon S3 I found that s3distcp is a very powerful tool. In addition to being able to use it to copy a large amount of files in and out of S3, you can also perform fast cluster-to-cluster copies with large data sets. Instead of pushing all the data through a single node, distcp uses multiple nodes in parallel to perform the transfer. This makes distcp considerably faster when transferring large amounts of data, compared to the alternative of copying everything to the local file system as an intermediary. Example Copy log files from Amazon S3 to HDFS : This following example illustrates how to copy log files stored in an Amazon S3 bucket into HDFS. In this example the --srcPattern option is used to limit the data copied to the daemon logs.
elastic-mapreduce --jobflow j-3GY8JC4179IOJ --jar \
s3://us-east-1.elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar \
--args '--src,s3://myawsbucket/logs/j-3GY8JC4179IOJ/node/,\
--dest,hdfs:///output,\
--srcPattern,.*daemons.*-hadoop-.*'</pre>

Any feedback, good or bad is most welcome.

Name

Email *

Message *