Counters are very useful feature in hadoop. This helps us in tracking global events in our job, ie across map and reduce phases.
When we execute a mapreduce job, we can see a lot of counters listed in the logs. Other than the default built-in counters, we can create our own custom counters. The custom counters will be listed along with the built-in counters.
This helps us in several ways. Here I am explaining a scenario where I am using a custom counter for counting the number of good words and stop words in the given text files. The stop words in this program are provided at the run time using distributed cache.
This is a mapper only job. The property job.setNumReduceTasks(0) makes the it a mapper only job.
Here I am introducing another feature in hadoop called Distributed Cache.
Distributed cache will distribute application specific read only files efficiently through out the application.
My requirement is to filter the stop words from input text files. The stop words list may vary. So if I hard code the list in my program, I have to update the code everytime to make changes in the stop word list. This is not a good practice. I used distributed cache for this and the file containing the stop words is loaded to the distributed cache. This makes the file available to mapper as well as reducer. In this program, we don’t require any reducer.
The code is attached below. You can also get the code from the github.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.hadoop.skipper; | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.HashSet; | |
import java.util.Set; | |
import java.util.StringTokenizer; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class SkipMapper extends Mapper<LongWritable, Text, Text, NullWritable> { | |
private Text word = new Text(); | |
private Set<String> stopWordList = new HashSet<String>(); | |
private BufferedReader fis; | |
/* | |
* (non-Javadoc) | |
* | |
* @see | |
* org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce. | |
* Mapper.Context) | |
*/ | |
@SuppressWarnings("deprecation") | |
protected void setup(Context context) throws java.io.IOException, | |
InterruptedException { | |
try { | |
Path[] stopWordFiles = new Path[0]; | |
stopWordFiles = context.getLocalCacheFiles(); | |
System.out.println(stopWordFiles.toString()); | |
if (stopWordFiles != null && stopWordFiles.length > 0) { | |
for (Path stopWordFile : stopWordFiles) { | |
readStopWordFile(stopWordFile); | |
} | |
} | |
} catch (IOException e) { | |
System.err.println("Exception reading stop word file: " + e); | |
} | |
} | |
/* | |
* Method to read the stop word file and get the stop words | |
*/ | |
private void readStopWordFile(Path stopWordFile) { | |
try { | |
fis = new BufferedReader(new FileReader(stopWordFile.toString())); | |
String stopWord = null; | |
while ((stopWord = fis.readLine()) != null) { | |
stopWordList.add(stopWord); | |
} | |
} catch (IOException ioe) { | |
System.err.println("Exception while reading stop word file '" | |
+ stopWordFile + "' : " + ioe.toString()); | |
} | |
} | |
/* | |
* (non-Javadoc) | |
* | |
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, | |
* org.apache.hadoop.mapreduce.Mapper.Context) | |
*/ | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
String line = value.toString(); | |
StringTokenizer tokenizer = new StringTokenizer(line); | |
while (tokenizer.hasMoreTokens()) { | |
String token = tokenizer.nextToken(); | |
if (stopWordList.contains(token)) { | |
context.getCounter(StopWordSkipper.COUNTERS.STOPWORDS) | |
.increment(1L); | |
} else { | |
context.getCounter(StopWordSkipper.COUNTERS.GOODWORDS) | |
.increment(1L); | |
word.set(token); | |
context.write(word, null); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.hadoop.skipper; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Counters; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
@SuppressWarnings("deprecation") | |
public class StopWordSkipper { | |
public enum COUNTERS { | |
STOPWORDS, | |
GOODWORDS | |
} | |
public static void main(String[] args) throws Exception { | |
Configuration conf = new Configuration(); | |
GenericOptionsParser parser = new GenericOptionsParser(conf, args); | |
args = parser.getRemainingArgs(); | |
Job job = new Job(conf, "StopWordSkipper"); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setJarByClass(StopWordSkipper.class); | |
job.setMapperClass(SkipMapper.class); | |
job.setNumReduceTasks(0); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
List<String> other_args = new ArrayList<String>(); | |
// Logic to read the location of stop word file from the command line | |
// The argument after -skip option will be taken as the location of stop | |
// word file | |
for (int i = 0; i < args.length; i++) { | |
if ("-skip".equals(args[i])) { | |
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), | |
job.getConfiguration()); | |
if (i+1 < args.length) | |
{ | |
i++; | |
} | |
else | |
{ | |
break; | |
} | |
} | |
other_args.add(args[i]); | |
} | |
FileInputFormat.setInputPaths(job, new Path(other_args.get(0))); | |
FileOutputFormat.setOutputPath(job, new Path(other_args.get(1))); | |
job.waitForCompletion(true); | |
Counters counters = job.getCounters(); | |
System.out.printf("Good Words: %d, Stop Words: %d\n", | |
counters.findCounter(COUNTERS.GOODWORDS).getValue(), | |
counters.findCounter(COUNTERS.STOPWORDS).getValue()); | |
} | |
} | |
Create a java project with the above java classes. Add the dependent java libraries.(Libraries will be present in your hadoop installation). Export the project as a runnable jar and execute. The file containing the stop words should be present in hdfs. The stop words should be added line by line in the stop word file. Sample format is given below.
is the am are with was were
Sample command to execute the program is given below.
hadoop jar <jar-name> -skip <stop-word-file-in hdfs> <input-data-location> <output-location>
Eg: hadoop jar Skipper.jar -skip /user/hadoop/skip/skip.txt /user/hadoop/input /user/hadoop/output
In the job logs, you can see the custom counters also. I am attaching a sample log below.
The method getLocalCacheFiles() is undefined for the type Mapper.Context
i tried to compile this code bt i found the above error before compiling only… could u please rply me what shoulb be done to resolve this error.
Which version and distribution of hadoop are you using.?
Thanks for the excellent write up
While adding multiple Paths to DistributedCache in main function. There should be while statement rather than if
if (“-skip”.equals(args[i])) –> while (“-skip”.equals(args[i]))
Thanks for the comment. The if condition worked for my requirement.