Pdf Input Format implementation for Hadoop Mapreduce

In my opinion Hadoop is not a cooked tool or framework with readymade features, but it is an efficient framework which allows a lot of customizations  based on our usecases. It is our choice to modify it. By modification, I am not meaning about the modification of the architecture or working, but the modification of its functionality and features.

By default hadoop accepts text files.  But in practical scenarios, our input files may not be text files. It can be pdf, ppt, pst, image or anything. So we need to make hadoop compatible with this various types of input formats.

Here I am explaining about the creation of a custom input format for hadoop. I am explain the code for implementing pdf reader logic inside hadoop. Similarly  you can create any custom reader of your choice. The fundamental is same.

A simple pdf to text conversion program using java is  explained in my previous post PDF to Text. This is a simple pdf parser which converts the text content of the pdf only. If you want more features, you can modify it accordingly. My intention here is to explain about the creation of a custom input format reader for hadoop.

For doing this logic, we need to write or modify two classes.

One is we need a similar class like the default TextInputFormat.class for pdf. We can call it as PdfInputFormat.class.

Second one is we need a similar class like the default LineRecordReader for handling pdf. We can call it as PdfRecordReader.class

If you examine the source code of hadoop, you can see that the default TextInputFormat class is extended from a parent class called FileInputFormat.

So in our case, we can also create a PdfinputFormat class extending the FileInputFormat class.

This will contain a method called createRecordReader which it got from the parent class.

We are calling our custom PdfRecordReader class from this createRecordReader method.

The code for PdfInputFormat is given below.

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat {

	@Override
	public RecordReader createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {

		return new PdfRecordReader();
	}

}

The PdfRecordReader is a custom class created by us extending the RecordReader.

This mainly contains  five methods which is inherited from the parent RecordReader class.

Initialize(), nextKeyValue(),getCurrentKey(),getCurrentValue(), getProgress(), close().

The logic I am using is explained below.

We are applying our pdf parsing logic in this method. This method will get the input split and we parses the input split using our pdf parser logic. The output of the pdf parser will be a text which will be stored in a variable. Then we splits the text into multiple lines by using ‘/n’ as the splitter and we will store this lines in  an array.

We need to send this as a key-value pair. So I am planning to send line number as the key and each line as value. So the logic for checking  getting from the array, setting it as key and value , checking for the completion condition etc are written in the code.

The PdfRecordReader  code is written below.

package com.amal.pdf;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader {

	private String[] lines = null;
	private LongWritable key = null;
	private Text value = null;

	@Override
	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {

		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		final Path file = split.getPath();

		/*
		 * The below code contains the logic for opening the file and seek to
		 * the start of the split. Here we are applying the Pdf Parsing logic
		 */

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		PDDocument pdf = null;
		String parsedText = null;
		PDFTextStripper stripper;
		pdf = PDDocument.load(fileIn);
		stripper = new PDFTextStripper();
		parsedText = stripper.getText(pdf);
		this.lines = parsedText.split("\n");

	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {

		if (key == null) {
			key = new LongWritable();
			key.set(1);
			value = new Text();
			value.set(lines[0]);
		} else {
			int temp = (int) key.get();
			if (temp < (lines.length - 1)) {
				int count = (int) key.get();
				value = new Text();
				value.set(lines[count]);
				count = count + 1;
				key = new LongWritable(count);
			} else {
				return false;
			}

		}
		if (key == null || value == null) {
			return false;
		} else {
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {

		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {

		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {

		return 0;
	}

	@Override
	public void close() throws IOException {

	}

}

For using this in a program, you need to specify this in the driver class. We need to set our custom input format class in the “InputFormatClass” property of our mapreduce program.
For getting a complete picture, I am associating this  with the basic word count mapreduce program.

Driver Class

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf, args);
		args = parser.getRemainingArgs();
		Job job = new Job(conf, "Pdfwordcount");
		job.setJarByClass(PdfInputDriver.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		job.setInputFormatClass(PdfInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		System.out.println(job.waitForCompletion(true));
	}
}

Mapper Class

package com.amal.pdf;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class WordCountMapper extends
		Mapper {
	private Text word = new Text();
	private final static LongWritable one = new LongWritable(1);

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.progress();
			context.write(word, one);
		}
	}
}

Reducer Class

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
		Reducer {
	protected void reduce(Text key, Iterable values,
			Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (LongWritable value : values) {
			sum += value.get();

		}
		context.write(key, new LongWritable(sum));
	}
}

In a similar way, you can write any custom input format as you wish. 🙂 🙂

About amalgjose
I am an Electrical Engineer by qualification, now I am working as a Software Architect. I am very much interested in Electrical, Electronics, Mechanical and now in Software fields. I like exploring things in these fields. I love travelling, long drives and music.

32 Responses to Pdf Input Format implementation for Hadoop Mapreduce

  1. antin says:

    Hi Amal,

    Nice Blog!

    I’m trying to implement the pdf stripper into a pig UDF loadfunc extension.

    Any advice?

    Antin

  2. vigneshwaran says:

    Thanks for that.very nice article.

  3. AK says:

    Thank you for this write up!

    However I do not get how to add these classes to my Hadoop environment (I have a Hortonworks 2.1 Sandbox on my Windows machine) so that I could use this just like a normal Map Reduce program that comes with the Sandbox.

    My ultimately goal will be to try and execute it using a high level language like Pig Latin, since my main line of work is with the Data warehouses, databases and reporting.

  4. Revanth says:

    Hi Boss it is an excellent article . I am tried to implement tha same and i had exported this as a jar file but i am getting the below error while excuting .

    I had downoaded the pdfbox-app-1.8.9.jar and configured in my buildpath , even though after exporting it as a jaj file and while executing in hadoop i am facing the below error , could to please help me on this.

    [cloudera@localhost ~]$ hadoop jar /home/cloudera/Downloads/PDFWordCountApp.jar com.hadoop.mapReduce.PDF.PdfInputDriver /user/cloudera/PDF_input/RevanthCV.pdf /user/cloudera/VotesCountApp_output

    15/05/04 10:14:19 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    15/05/04 10:14:19 INFO input.FileInputFormat: Total input paths to process : 1
    15/05/04 10:14:20 INFO mapred.JobClient: Running job: job_201505020109_0004
    15/05/04 10:14:21 INFO mapred.JobClient: map 0% reduce 0%
    15/05/04 10:14:40 INFO mapred.JobClient: Task Id : attempt_201505020109_0004_m_000000_0, Status : FAILED
    Error: java.lang.ClassNotFoundException: org.apache.pdfbox.pdmodel.PDDocument
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at com.hadoop.mapReduce.PDF.PdfRecordReader.initialize(PdfRecordReader.java:41)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.ha
    15/05/04 10:14:53 INFO mapred.JobClient: Task Id : attempt_201505020109_0004_m_000000_1, Status : FAILED
    Error: java.lang.ClassNotFoundException: org.apache.pdfbox.pdmodel.PDDocument
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at com.hadoop.mapReduce.PDF.PdfRecordReader.initialize(PdfRecordReader.java:41)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)

    Thanks
    Revanth.

    • test says:

      Hi Revanth,

      Please let us know if find any solution for this issue. Even i am stuck with the same issue

    • saket says:

      Did you find a solution to this?

      • amalgjose says:

        The class not found issue is because of missing jar. First get a normal pdf parser working, then implement this program. I tested this program and it is working fine.

      • saket says:

        @amalgjose–the PDFBof.1.8.11 app jar is already added in the build path..getting the error while running the jar in hdfs..

      • saket says:

        The line throwing error is “pdf = PDDocument.load(fileIn);” under PdfRecordReader class. Don’t understand why..

      • amalgjose says:

        Did u added fontbox jar ?

      • saket says:

        Yes, I did…still the same issue

      • saket says:

        Everything is working fine now. But, I have an issue which I don’t understand. The Last Line of the input PDF is being missed out somehow. It is not being processed. Do you have any idea about the same?

      • saket says:

        I changed this line if (temp < (lines.length – 1)) in your PDF Record Code to if (temp < (lines.length)) and it worked fine. Why were you subtracting -1 from it exclusively? Any particular reasons of skipping the last split?

    • saket says:

      Did you find out the fix to the error?

    • Bigdata says:

      I have the same Problem. How did you find the solution? I have already added PDFBox and Fonxbox jars to the Build Path

    • Jose says:

      I am facing the same issue . Did anybody found the solution

      • Jose says:

        1) Place the jar file of pdfbox in hadoop lib folder too.(make library jar available to hadoop at runtime).

        2) Restart hadoop cluster.

        Or

        1) Make sure that your pdfbox library is available to hadoop by placing it in distributed cache.

  5. sravanthi says:

    Hi Nice article, even i am getting same error can you please reply immediately

  6. Sravan says:

    Hi. This is very useful article for me. I am also facing same issue which is explained by Revanth. I added below jars to the project

    1. commons-cli-1.2.jar
    2. commons-logging-api-1.1.1
    3. pdfbox-1.3.1

    Could you please help me on this.

    Thanks,
    Sravan

  7. Isaiah Babu says:

    Thankyou Very much brother.. you have given us a root map to understand the way to read the data into Mapreduce as per our requirement.

  8. Varun Kulkarni says:

    Thank you brother…It is really helpful

  9. SHANDRY K K says:

    Nice article. I need to search for a word in pdf document. Can you share the code for the same

  10. Himanshu says:

    HI Mate
    Getting below error :-

    16/04/16 23:48:32 INFO mapred.JobClient: Task Id : attempt_201604162323_0004_m_000000_2, Status : FAILED
    java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

  11. I am getting false as output and not files are created in output dir

  12. Jigar Shah says:

    Thank you Amal for this Article!

    I am using Oracle VM VirtualBox Manager. BigDataLite-4.2.1. However I do not get how to add these classes to my Hadoop environment. Can you help me for this?

    My ultimately goal will be to try and execute it using a high level language like Pig Latin, since my main line of work is with the Data warehouses, databases and reporting.

  13. ganesh says:

    Hi,
    Im trying to execute below code in eclipse ,but its showing errors at below steps

    1.for (LongWritable value : values) { –type mismatch: cannot convert from element type Object to LongWritable. This one is from reducer class

    2. this.lines = parsedText.split(“\n } —String literal is not properly closed by a double quote- i tried to give double quote but no luck .This one is from PdfRecordRedaer Class

    Can any help on this

  14. naveem reddy says:

    ERROR:
    MAPREDUCE JOB
    which jar file i need to download it is not working for hadoop 2.x but it is working for hadoop 1.x

    i have also added all jarfiles to share/common/lib
    fontbox-1.5.0
    fontbox-1.6.0-for-pdfbox
    pdfbox-1.8.2
    pdfbox-2.0.0
    pdfbox-1.8.10
    hadoop-ant
    hadoop-core
    hadoop-example

    this jar files working for hadoop 1.x

    17/04/12 19:51:04 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032
    17/04/12 19:51:07 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/04/12 19:51:08 INFO input.FileInputFormat: Total input paths to process : 1
    17/04/12 19:51:09 INFO mapreduce.JobSubmitter: number of splits:1
    17/04/12 19:51:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1492050090340_0001
    17/04/12 19:51:12 INFO impl.YarnClientImpl: Submitted application application_1492050090340_0001
    17/04/12 19:51:13 INFO mapreduce.Job: The url to track the job: http://ubuntu:8088/proxy/application_1492050090340_0001/
    17/04/12 19:51:13 INFO mapreduce.Job: Running job: job_1492050090340_0001
    17/04/12 19:52:01 INFO mapreduce.Job: Job job_1492050090340_0001 running in uber mode : false
    17/04/12 19:52:01 INFO mapreduce.Job: map 0% reduce 0%
    17/04/12 19:52:39 INFO mapreduce.Job: map 100% reduce 0%
    17/04/12 19:52:40 INFO mapreduce.Job: map 0% reduce 0%
    17/04/12 19:52:41 INFO mapreduce.Job: Task Id : attempt_1492050090340_0001_m_000000_0, Status : FAILED
    Error: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
    Container killed by the ApplicationMaster.
    Container killed on request. Exit code is 143
    Container exited with a non-zero exit code 143

    17/04/12 19:53:04 INFO mapreduce.Job: Task Id : attempt_1492050090340_0001_m_000000_1, Status : FAILED

    can anyone help for this error
    thanks in advanse.

  15. Bigdata says:

    What do you mean by ” First get a normal pdf parser working” How do I do that?
    Please help. Thanks

  16. Manu says:

    hi Amal Jose i am trying to implement this but unsuccessful , my problem statement : i have a TB of PDFs on my server and i dont know what each file contains , so i want to create a program by which i can extract title , keywords from the first page of the research papers i am parsing , create a database for further search queries , after that creating a web app by which people can search and download research papers , have to implemented on hadoop , i am stuck.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: