Sample program with Hadoop Counters and Distributed Cache

Counters are very useful feature in hadoop. This helps us in tracking global events in our job, ie across map and reduce phases.
When we execute a mapreduce job, we can see a lot of counters listed in the logs. Other than the default built-in counters, we can create our own custom counters. The custom counters will be listed along with the built-in counters.
This helps us in several ways. Here I am explaining a scenario where I am using a custom counter for counting the number of good words and stop words in the given text files. The stop words in this program are provided at the run time using distributed cache.
This is a mapper only job. The property job.setNumReduceTasks(0) makes the it a mapper only job.

Here I am introducing another feature in hadoop called Distributed Cache.
Distributed cache will distribute application specific read only files efficiently through out the application.
My requirement is to filter the stop words from input text files. The stop words list may vary. So if I hard code the list in my program, I have to update the code everytime to make changes in the stop word list. This is not a good practice. I used distributed cache for this and the file containing the stop words is loaded to the distributed cache. This makes the file available to mapper as well as reducer. In this program, we don’t require any reducer.

The code is attached below. You can also get the code from the github.

Create a java project with the above java classes. Add the dependent java libraries.(Libraries will be present in your hadoop installation). Export the project as a runnable jar and execute. The file containing the stop words should be present in hdfs. The stop words should be added line by line in the stop word file. Sample format is given below.

is

the

am

are

with

was

were

Sample command to execute the program is given below.

hadoop jar <jar-name>  -skip  <stop-word-file-in hdfs>   <input-data-location>    <output-location>

Eg:  hadoop jar Skipper.jar  -skip /user/hadoop/skip/skip.txt     /user/hadoop/input     /user/hadoop/output

In the job logs, you can see the custom counters also. I am attaching a sample log below.

Counters

Program to compress a file in Snappy format

Hadoop supports various compression formats. Snappy is one among the compression formats supported by hadoop. I created a snappy compressed file using the google snappy library and used in hadoop.  But it gave me an error that the file is missing the Snappy identifier. I did a little research on this and found the workaround for that. The method I followed for finding the solution was as follows.
I compressed a file in snappy using the google snappy library and the snappy codecs present in hadoop. I verified the file size and checksum of both the files and found that It is having difference. The compressed file created using hadoop snappy is having some bytes more than that of the compressed file created using google snappy. It is some extra metadata that is consuming the extra bytes.
The code shown below will help you in creating snappy compressed file which will work perfectly in hadoop. This code requires the following dependent jars. This is available in your hadoop installation.
1)  hadoop-common.jar

2) guava-xx.jar

3) log4j.jar

4) commons-collections.jar

5) commons-logging.x.x.x.jar

You can download the code directly from github