Friday, December 28, 2012

What is "Distributed Cache" in Hadoop?

The bulk of the data that you process in a MapReduce job will probably be stored in large files spread across the HDFS. You can reliably store petabytes of information in HDFS and individual jobs can process several terabytes at a time. The HDFS access model, however, assumes that the data from a file should be read into a single mapper. The individual files stored in HDFS are very large and can possibly be broken into different chunks for processing in parallel.

Sometimes it is necessary for every Mapper to read a single file; for example, a distributed spell-check application would require every Mapper to read in a copy of the dictionary before processing documents. The dictionary will be small (only a few megabytes), but needs to be widely available so that all nodes can reach it.

Hadoop provides a mechanism specifically for this purpose, called the distributed cache. The distributed cache can contain small data files needed for initialization or libraries of code that may need to be accessed on all nodes in the cluster.

To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system. The file names are specified as URI objects; unless qualified otherwise, they assume that the file is present on the HDFS in the path indicated. You can copy local files to HDFS with the FileSystem.copyFromLocalFile() method.

When you want to retrieve files from the distributed cache (e.g., when the mapper is in its configure() step and wants to load config data like the dictionary mentioned above), use the DistributedCache.getLocalCacheFiles() method to retrieve the list of paths local to the current node for the cached files. These are copies of all cached files, placed in the local file system of each worker machine. (They will be in a subdirectory of mapred.local.dir.) Each of the paths returned by getLocalCacheFiles() can be accessed via regular Java file I/O mechanisms, such as java.io.FileInputStream.

As a cautionary note: If you use the local JobRunner in Hadoop (i.e., what happens if you call JobClient.runJob() in a program with no or an empty hadoop-conf.xml accessible), then no local data directory is created; the getLocalCacheFiles() call will return an empty set of results. Unit test code should take this into account.

Suppose that we were writing an inverted index builder. We do not want to include very common words such "the," "a," "and," etc. These so-called stop words might all be listed in a file. All the mappers should read the stop word list when they are initialized, and then filter the index they generate against this list. We can disseminate a list of stop words to all the Mappers with the following code. The first listing will put the stop-words file into the distributed cache:

public static final String LOCAL_STOPWORD_LIST =
      "/home/aaron/stop_words.txt";
 
  public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";
 
  void cacheStopWordList(JobConf conf) throws IOException {
    FileSystem fs = FileSystem.get(conf);
    Path hdfsPath = new Path(HDFS_STOPWORD_LIST);
 
    // upload the file to hdfs. Overwrite any existing copy.
    fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
        hdfsPath);
 
    DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
  }

This code copies the local stop_words.txt file into HDFS, and then tells the distributed cache to send the HDFS copy to all nodes in the system. The next listing actually uses the file in the mapper:

class IndexMapperExample implements Mapper {
  void configure(JobConf conf) {
    try {
      String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).getName();
      Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
      if (null != cacheFiles && cacheFiles.length > 0) {
        for (Path cachePath : cacheFiles) {
          if (cachePath.getName().equals(stopwordCacheName)) {
            loadStopWords(cachePath);
            break;
          }
        }
      }
    } catch (IOException ioe) {
      System.err.println("IOException reading from distributed cache");
      System.err.println(ioe.toString());
    }
  }
 
  void loadStopWords(Path cachePath) throws IOException {
    // note use of regular java.io methods here - this is a local file now
    BufferedReader wordReader = new BufferedReader(
        new FileReader(cachePath.toString()));
    try {
      String line;
      this.stopWords = new HashSet();
      while ((line = wordReader.readLine()) != null) {
        this.stopWords.add(line);
      }
    } finally {
      wordReader.close();
    }
  }
 
  /* actual map() method, etc go here */
}
The code above belongs in the Mapper instance associated with the index generation process. We retrieve the list of files cached in the distributed cache. We then compare the basename of each file (using Path.getName()) with the one we expect for our stop word list. Once we find this file, we read the words, one per line, into a Set instance that we will consult during the mapping process.

The distributed cache has additional uses too. For instance, you can use the DistributedCache.addArchiveToClassPath() method to send a .jar file to all the nodes. It will be inserted into the classpath as well, so that classes in the archive can be accessed by all the nodes.

Source: http://developer.yahoo.com/hadoop/tutorial/module5.html

10 comments:

  1. pháp sư trong nhất thời cũng phải chấn động tinh thần nói gì đến một trọng kỵ binh năm đầu . Hắn cuồng bạo áp sát đối thủ lại là cơ hội để đối thủ ra tay , đại não trở nên hoàn toàn trống rỗng .
    Mười thước khoảng cách đối với Ai lý khắc mẫn long chỉ cần một lần chớp mát đã tới nhưng khi nó đến nơi thì đối thủ của nó đã biến mất . Diệp Âm Trúc lúc này đã nhảy lên trên cao , quang mang màu vàng bao phủ toàn thân , tư thế giữa không trung nhìn thật ưu nhà , nhìn xa phảng phất như một đám mây màu vàng hiện ra trước mắt đối tác của Diệp Âm Trúc chẳng có gì phức tạp cả , chỉ là nhảy lên trên cao vị trí mà mình đang đứng ba đồng tâm
    game mu
    cho thuê nhà trọ
    cho thuê phòng trọ
    nhac san cuc manh
    số điện thoại tư vấn pháp luật miễn phí
    văn phòng luật
    tổng đài tư vấn pháp luật
    dịch vụ thành lập công ty trọn gói
    http://we-cooking.com/
    chém gióthước thôi . Trọng kỵ sĩ đại não còn chưa hồi phục trong khi tọa kỵ Ai lý khắc mẫn long của hắn chưa đủ trí thông minh để nhận biết đối thủ ở đâu , đang ngơ ngác nhìn xung quanh để tìm kiếm .
    Oanh !
    Khải giáp trước ngực trọng kỵ sĩ chấn động do bị Diệp Âm Trúc công kích từ trên cao . Thân hình cao hai thước từ trên lưng AI lý khắc mẫn long bị chấn bay đến mười thước có dư , rơi ầm một tiếng xuống dưới đất , khải giáp trước ngực có lỗ thủng lớn , máu tươi trong miệng trào ra ,

    ReplyDelete
  2. Wondeful information you provided it is cleared my doubt on cahe in hadoop Best Friend Quotes Good Morning Quotes ibpss.in

    ReplyDelete
  3. what a nice definition of the Cache in Hadoop technology.
    career

    ReplyDelete
  4. This information is very wonderful. nice Cache in Hadoop technology.
    careerbix

    ReplyDelete
  5. wonderful information. i never seen like this information about the Bigdata.
    Thanks for sharing.
    Digital Marketing Training in Hyderabad

    ReplyDelete
  6. your website is very applicable to the data formation process.
    Tuitions

    ReplyDelete
  7. very good information is available in your website and it is very helpful to the all people
    home tuitions in karimnagar

    ReplyDelete

Any feedback, good or bad is most welcome.

Name

Email *

Message *