Advertisements

Simple Word Count using Apache Pig

The basic hello world program in hadoop is the word count program. I have explained the word count implementation using java mapreduce and hive queries in my previous posts. Here I am explaining the implementation of basic word count logic using pig script.
TOKENIZE is a build in function available in apache pig which tokenizes a line into words.
Similarly FLATTEN and COUNT are also built-in functions available in apache pig.
You can check the output or flow of each step by using DUMP command after every step.
The below pig scripts will do the count of words in the input file.

A = load 'data.txt' as (line:chararray);
B = foreach A generate TOKENIZE(line) as tokens;
C = foreach B generate flatten(tokens) as words;
D = group C by words;
E = foreach D generate group, COUNT(C);
F = order E by $1;
dump F;
Advertisements

Hadoop Mapreduce- A Good Presentation…

This is a video  I found from youtube that explains hadoop mapreduce very clearly using the wordcount example.

A Simple Mapreduce Program – Wordcount

Hello world is the trial program for almost all programming languages. Like that for hadoop-mapreduce, the trial program is wordcount, which is the basic simple mapreduce program. This program helps us in getting a good understanding of parallel processing of hadoop.

It consists of three classes.

1)      Driver class- which is the main class

2)      Mapper class- which does the map functions

3)      Reducer class- which does the reduce functions

 

Driver Class

 

import java.io.IOException;
import java.util.Date;
import java.util.Formatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountDriver {

public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();

Job job = new Job(conf, "wordcount");

job.setJarByClass(WordCountDriver.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Formatter formatter = new Formatter();
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

System.out.println(job.waitForCompletion(true));
}
}

 

Mapper Class

 

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
private final static IntWritable one = new IntWritable(1);
 
protected void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
 word.set(tokenizer.nextToken());
 context.write(word, one);
}
}
}

 

Reducer Class

 

import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
 
public class WordCountReducer extends
 Reducer<Text, IntWritable, Text, IntWritable> {
 protected void reduce(Text key, Iterable<IntWritable> values,
 Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable value : values) {
 sum += value.get();
 }
 context.write(key, new IntWritable(sum));
 }
}

The jars necessary for this code is taken from the same version of hadoop package which is installed in the cluster. If the version is different, then it will result in error.
Here the mapper class reads the input file line by line. Then inside the mapper class, we convert the line to string after that we tokenize it into words. ie each line is split into individual words. The output of the mapper class is given to the reducer. the output of the mapper is in the form of a pair.
The context.write method actually gives a key-value pair to the reducer. Here the key is the word and value is “one” which is a variable assigned to the value 1.

In the Reducer, we merges these words and counts the values attached to similar words.
For example if we give an input file as

Hi all I am fine
Hi all I am good
hello world

After Mapper class the output will be,

Hi 1
all 1
I 1
am 1
fine 1
Hi 1
all 1
I 1
am 1
good 1
hello 1
world 1

After Reducer class the output will be

Hi 2
all 2
I 2
am 2
fine 1
good 1
hello 1
world 1