Counters are very useful feature in hadoop. This helps us in tracking global events in our job, ie across map and reduce phases.
When we execute a mapreduce job, we can see a lot of counters listed in the logs. Other than the default built-in counters, we can create our own custom counters. The custom counters will be listed along with the built-in counters.
This helps us in several ways. Here I am explaining a scenario where I am using a custom counter for counting the number of good words and stop words in the given text files. The stop words in this program are provided at the run time using distributed cache.
This is a mapper only job. The property job.setNumReduceTasks(0) makes the it a mapper only job.

Here I am introducing another feature in hadoop called Distributed Cache.
Distributed cache will distribute application specific read only files efficiently through out the application.
My requirement is to filter the stop words from input text files. The stop words list may vary. So if I hard code the list in my program, I have to update the code everytime to make changes in the stop word list. This is not a good practice. I used distributed cache for this and the file containing the stop words is loaded to the distributed cache. This makes the file available to mapper as well as reducer. In this program, we don’t require any reducer.

The code is attached below. You can also get the code from the github.


package com.hadoop.skipper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SkipMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Text word = new Text();
private Set<String> stopWordList = new HashSet<String>();
private BufferedReader fis;
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.
* Mapper.Context)
*/
@SuppressWarnings("deprecation")
protected void setup(Context context) throws java.io.IOException,
InterruptedException {
try {
Path[] stopWordFiles = new Path[0];
stopWordFiles = context.getLocalCacheFiles();
System.out.println(stopWordFiles.toString());
if (stopWordFiles != null && stopWordFiles.length > 0) {
for (Path stopWordFile : stopWordFiles) {
readStopWordFile(stopWordFile);
}
}
} catch (IOException e) {
System.err.println("Exception reading stop word file: " + e);
}
}
/*
* Method to read the stop word file and get the stop words
*/
private void readStopWordFile(Path stopWordFile) {
try {
fis = new BufferedReader(new FileReader(stopWordFile.toString()));
String stopWord = null;
while ((stopWord = fis.readLine()) != null) {
stopWordList.add(stopWord);
}
} catch (IOException ioe) {
System.err.println("Exception while reading stop word file '"
+ stopWordFile + "' : " + ioe.toString());
}
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
if (stopWordList.contains(token)) {
context.getCounter(StopWordSkipper.COUNTERS.STOPWORDS)
.increment(1L);
} else {
context.getCounter(StopWordSkipper.COUNTERS.GOODWORDS)
.increment(1L);
word.set(token);
context.write(word, null);
}
}
}
}

view raw

SkipMapper.java

hosted with ❤ by GitHub


package com.hadoop.skipper;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
@SuppressWarnings("deprecation")
public class StopWordSkipper {
public enum COUNTERS {
STOPWORDS,
GOODWORDS
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();
Job job = new Job(conf, "StopWordSkipper");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setJarByClass(StopWordSkipper.class);
job.setMapperClass(SkipMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
List<String> other_args = new ArrayList<String>();
// Logic to read the location of stop word file from the command line
// The argument after -skip option will be taken as the location of stop
// word file
for (int i = 0; i < args.length; i++) {
if ("-skip".equals(args[i])) {
DistributedCache.addCacheFile(new Path(args[++i]).toUri(),
job.getConfiguration());
if (i+1 < args.length)
{
i++;
}
else
{
break;
}
}
other_args.add(args[i]);
}
FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
job.waitForCompletion(true);
Counters counters = job.getCounters();
System.out.printf("Good Words: %d, Stop Words: %d\n",
counters.findCounter(COUNTERS.GOODWORDS).getValue(),
counters.findCounter(COUNTERS.STOPWORDS).getValue());
}
}

Create a java project with the above java classes. Add the dependent java libraries.(Libraries will be present in your hadoop installation). Export the project as a runnable jar and execute. The file containing the stop words should be present in hdfs. The stop words should be added line by line in the stop word file. Sample format is given below.

is

the

am

are

with

was

were

Sample command to execute the program is given below.

hadoop jar <jar-name>  -skip  <stop-word-file-in hdfs>   <input-data-location>    <output-location>

Eg:  hadoop jar Skipper.jar  -skip /user/hadoop/skip/skip.txt     /user/hadoop/input     /user/hadoop/output

In the job logs, you can see the custom counters also. I am attaching a sample log below.

Counters

Advertisement