Pdf Input Format implementation for Hadoop Mapreduce

In my opinion Hadoop is not a cooked tool or framework with readymade features, but it is an efficient framework which allows a lot of customizations  based on our usecases. It is our choice to modify it. By modification, I am not meaning about the modification of the architecture or working, but the modification of its functionality and features.

By default hadoop accepts text files.  But in practical scenarios, our input files may not be text files. It can be pdf, ppt, pst, image or anything. So we need to make hadoop compatible with this various types of input formats.

Here I am explaining about the creation of a custom input format for hadoop. I am explain the code for implementing pdf reader logic inside hadoop. Similarly  you can create any custom reader of your choice. The fundamental is same.

A simple pdf to text conversion program using java is  explained in my previous post PDF to Text. This is a simple pdf parser which converts the text content of the pdf only. If you want more features, you can modify it accordingly. My intention here is to explain about the creation of a custom input format reader for hadoop.

For doing this logic, we need to write or modify two classes.

One is we need a similar class like the default TextInputFormat.class for pdf. We can call it as PdfInputFormat.class.

Second one is we need a similar class like the default LineRecordReader for handling pdf. We can call it as PdfRecordReader.class

If you examine the source code of hadoop, you can see that the default TextInputFormat class is extended from a parent class called FileInputFormat.

So in our case, we can also create a PdfinputFormat class extending the FileInputFormat class.

This will contain a method called createRecordReader which it got from the parent class.

We are calling our custom PdfRecordReader class from this createRecordReader method.

The code for PdfInputFormat is given below.

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat {

	@Override
	public RecordReader createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {

		return new PdfRecordReader();
	}

}

The PdfRecordReader is a custom class created by us extending the RecordReader.

This mainly contains  five methods which is inherited from the parent RecordReader class.

Initialize(), nextKeyValue(),getCurrentKey(),getCurrentValue(), getProgress(), close().

The logic I am using is explained below.

We are applying our pdf parsing logic in this method. This method will get the input split and we parses the input split using our pdf parser logic. The output of the pdf parser will be a text which will be stored in a variable. Then we splits the text into multiple lines by using ‘/n’ as the splitter and we will store this lines in  an array.

We need to send this as a key-value pair. So I am planning to send line number as the key and each line as value. So the logic for checking  getting from the array, setting it as key and value , checking for the completion condition etc are written in the code.

The PdfRecordReader  code is written below.

package com.amal.pdf;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader {

	private String[] lines = null;
	private LongWritable key = null;
	private Text value = null;

	@Override
	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {

		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		final Path file = split.getPath();

		/*
		 * The below code contains the logic for opening the file and seek to
		 * the start of the split. Here we are applying the Pdf Parsing logic
		 */

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		PDDocument pdf = null;
		String parsedText = null;
		PDFTextStripper stripper;
		pdf = PDDocument.load(fileIn);
		stripper = new PDFTextStripper();
		parsedText = stripper.getText(pdf);
		this.lines = parsedText.split("\n");

	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {

		if (key == null) {
			key = new LongWritable();
			key.set(1);
			value = new Text();
			value.set(lines[0]);
		} else {
			int temp = (int) key.get();
			if (temp < (lines.length - 1)) {
				int count = (int) key.get();
				value = new Text();
				value.set(lines[count]);
				count = count + 1;
				key = new LongWritable(count);
			} else {
				return false;
			}

		}
		if (key == null || value == null) {
			return false;
		} else {
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {

		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {

		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {

		return 0;
	}

	@Override
	public void close() throws IOException {

	}

}

For using this in a program, you need to specify this in the driver class. We need to set our custom input format class in the “InputFormatClass” property of our mapreduce program.
For getting a complete picture, I am associating this  with the basic word count mapreduce program.

Driver Class

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf, args);
		args = parser.getRemainingArgs();
		Job job = new Job(conf, "Pdfwordcount");
		job.setJarByClass(PdfInputDriver.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		job.setInputFormatClass(PdfInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		System.out.println(job.waitForCompletion(true));
	}
}

Mapper Class

package com.amal.pdf;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class WordCountMapper extends
		Mapper {
	private Text word = new Text();
	private final static LongWritable one = new LongWritable(1);

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.progress();
			context.write(word, one);
		}
	}
}

Reducer Class

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
		Reducer {
	protected void reduce(Text key, Iterable values,
			Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (LongWritable value : values) {
			sum += value.get();

		}
		context.write(key, new LongWritable(sum));
	}
}

In a similar way, you can write any custom input format as you wish. 🙂 🙂

Advertisements

About amalgjose
I am an Electrical Engineer by qualification, now I am working as a Software Engineer. I am very much interested in Electrical, Electronics, Mechanical and now in Software fields. I like exploring things in these fields. I like travelling, long drives and very much addicted to music.

22 Responses to Pdf Input Format implementation for Hadoop Mapreduce

  1. vigneshwaran says:

    Thanks for that.very nice article.

  2. AK says:

    Thank you for this write up!

    However I do not get how to add these classes to my Hadoop environment (I have a Hortonworks 2.1 Sandbox on my Windows machine) so that I could use this just like a normal Map Reduce program that comes with the Sandbox.

    My ultimately goal will be to try and execute it using a high level language like Pig Latin, since my main line of work is with the Data warehouses, databases and reporting.

  3. Revanth says:

    Hi Boss it is an excellent article . I am tried to implement tha same and i had exported this as a jar file but i am getting the below error while excuting .

    I had downoaded the pdfbox-app-1.8.9.jar and configured in my buildpath , even though after exporting it as a jaj file and while executing in hadoop i am facing the below error , could to please help me on this.

    [cloudera@localhost ~]$ hadoop jar /home/cloudera/Downloads/PDFWordCountApp.jar com.hadoop.mapReduce.PDF.PdfInputDriver /user/cloudera/PDF_input/RevanthCV.pdf /user/cloudera/VotesCountApp_output

    15/05/04 10:14:19 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    15/05/04 10:14:19 INFO input.FileInputFormat: Total input paths to process : 1
    15/05/04 10:14:20 INFO mapred.JobClient: Running job: job_201505020109_0004
    15/05/04 10:14:21 INFO mapred.JobClient: map 0% reduce 0%
    15/05/04 10:14:40 INFO mapred.JobClient: Task Id : attempt_201505020109_0004_m_000000_0, Status : FAILED
    Error: java.lang.ClassNotFoundException: org.apache.pdfbox.pdmodel.PDDocument
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at com.hadoop.mapReduce.PDF.PdfRecordReader.initialize(PdfRecordReader.java:41)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.ha
    15/05/04 10:14:53 INFO mapred.JobClient: Task Id : attempt_201505020109_0004_m_000000_1, Status : FAILED
    Error: java.lang.ClassNotFoundException: org.apache.pdfbox.pdmodel.PDDocument
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at com.hadoop.mapReduce.PDF.PdfRecordReader.initialize(PdfRecordReader.java:41)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)

    Thanks
    Revanth.

    • test says:

      Hi Revanth,

      Please let us know if find any solution for this issue. Even i am stuck with the same issue

    • saket says:

      Did you find a solution to this?

      • amalgjose says:

        The class not found issue is because of missing jar. First get a normal pdf parser working, then implement this program. I tested this program and it is working fine.

      • saket says:

        @amalgjose–the PDFBof.1.8.11 app jar is already added in the build path..getting the error while running the jar in hdfs..

      • saket says:

        The line throwing error is “pdf = PDDocument.load(fileIn);” under PdfRecordReader class. Don’t understand why..

      • amalgjose says:

        Did u added fontbox jar ?

      • saket says:

        Yes, I did…still the same issue

      • saket says:

        Everything is working fine now. But, I have an issue which I don’t understand. The Last Line of the input PDF is being missed out somehow. It is not being processed. Do you have any idea about the same?

      • saket says:

        I changed this line if (temp < (lines.length – 1)) in your PDF Record Code to if (temp < (lines.length)) and it worked fine. Why were you subtracting -1 from it exclusively? Any particular reasons of skipping the last split?

    • saket says:

      Did you find out the fix to the error?

    • Jose says:

      I am facing the same issue . Did anybody found the solution

      • Jose says:

        1) Place the jar file of pdfbox in hadoop lib folder too.(make library jar available to hadoop at runtime).

        2) Restart hadoop cluster.

        Or

        1) Make sure that your pdfbox library is available to hadoop by placing it in distributed cache.

  4. Isaiah Babu says:

    Thankyou Very much brother.. you have given us a root map to understand the way to read the data into Mapreduce as per our requirement.

  5. Varun Kulkarni says:

    Thank you brother…It is really helpful

  6. SHANDRY K K says:

    Nice article. I need to search for a word in pdf document. Can you share the code for the same

  7. Jigar Shah says:

    Thank you Amal for this Article!

    I am using Oracle VM VirtualBox Manager. BigDataLite-4.2.1. However I do not get how to add these classes to my Hadoop environment. Can you help me for this?

    My ultimately goal will be to try and execute it using a high level language like Pig Latin, since my main line of work is with the Data warehouses, databases and reporting.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: