In my opinion Hadoop is not a cooked tool or framework with readymade features, but it is an efficient framework which allows a lot of customizations based on our usecases. It is our choice to modify it. By modification, I am not meaning about the modification of the architecture or working, but the modification of its functionality and features.
By default hadoop accepts text files. But in practical scenarios, our input files may not be text files. It can be pdf, ppt, pst, image or anything. So we need to make hadoop compatible with this various types of input formats.
Here I am explaining about the creation of a custom input format for hadoop. I am explain the code for implementing pdf reader logic inside hadoop. Similarly you can create any custom reader of your choice. The fundamental is same.
A simple pdf to text conversion program using java is explained in my previous post PDF to Text. This is a simple pdf parser which converts the text content of the pdf only. If you want more features, you can modify it accordingly. My intention here is to explain about the creation of a custom input format reader for hadoop.
For doing this logic, we need to write or modify two classes.
One is we need a similar class like the default TextInputFormat.class for pdf. We can call it as PdfInputFormat.class.
Second one is we need a similar class like the default LineRecordReader for handling pdf. We can call it as PdfRecordReader.class
If you examine the source code of hadoop, you can see that the default TextInputFormat class is extended from a parent class called FileInputFormat.
So in our case, we can also create a PdfinputFormat class extending the FileInputFormat class.
This will contain a method called createRecordReader which it got from the parent class.
We are calling our custom PdfRecordReader class from this createRecordReader method.
The code for PdfInputFormat is given below.
package com.amal.pdf; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class PdfInputFormat extends FileInputFormat { @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new PdfRecordReader(); } }
The PdfRecordReader is a custom class created by us extending the RecordReader.
This mainly contains five methods which is inherited from the parent RecordReader class.
Initialize(), nextKeyValue(),getCurrentKey(),getCurrentValue(), getProgress(), close().
The logic I am using is explained below.
We are applying our pdf parsing logic in this method. This method will get the input split and we parses the input split using our pdf parser logic. The output of the pdf parser will be a text which will be stored in a variable. Then we splits the text into multiple lines by using ‘/n’ as the splitter and we will store this lines in an array.
We need to send this as a key-value pair. So I am planning to send line number as the key and each line as value. So the logic for checking getting from the array, setting it as key and value , checking for the completion condition etc are written in the code.
The PdfRecordReader code is written below.
package com.amal.pdf; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.pdfbox.pdmodel.PDDocument; import org.apache.pdfbox.util.PDFTextStripper; public class PdfRecordReader extends RecordReader { private String[] lines = null; private LongWritable key = null; private Text value = null; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); final Path file = split.getPath(); /* * The below code contains the logic for opening the file and seek to * the start of the split. Here we are applying the Pdf Parsing logic */ FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); PDDocument pdf = null; String parsedText = null; PDFTextStripper stripper; pdf = PDDocument.load(fileIn); stripper = new PDFTextStripper(); parsedText = stripper.getText(pdf); this.lines = parsedText.split("\n"); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = new LongWritable(); key.set(1); value = new Text(); value.set(lines[0]); } else { int temp = (int) key.get(); if (temp < (lines.length - 1)) { int count = (int) key.get(); value = new Text(); value.set(lines[count]); count = count + 1; key = new LongWritable(count); } else { return false; } } if (key == null || value == null) { return false; } else { return true; } } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
For using this in a program, you need to specify this in the driver class. We need to set our custom input format class in the “InputFormatClass” property of our mapreduce program.
For getting a complete picture, I am associating this with the basic word count mapreduce program.
Driver Class
package com.amal.pdf; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class PdfInputDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); GenericOptionsParser parser = new GenericOptionsParser(conf, args); args = parser.getRemainingArgs(); Job job = new Job(conf, "Pdfwordcount"); job.setJarByClass(PdfInputDriver.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(PdfInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); System.out.println(job.waitForCompletion(true)); } }
Mapper Class
package com.amal.pdf; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class WordCountMapper extends Mapper { private Text word = new Text(); private final static LongWritable one = new LongWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.progress(); context.write(word, one); } } }
Reducer Class
package com.amal.pdf; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer { protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } }
In a similar way, you can write any custom input format as you wish. 🙂 🙂
Hi Amal,
Nice Blog!
I’m trying to implement the pdf stripper into a pig UDF loadfunc extension.
Any advice?
Antin
Thanks for that.very nice article.
is it working hadoop2.x, what jars you have added in hadoop lib directory.
This will work on hadoop2 also.
Thank you for this write up!
However I do not get how to add these classes to my Hadoop environment (I have a Hortonworks 2.1 Sandbox on my Windows machine) so that I could use this just like a normal Map Reduce program that comes with the Sandbox.
My ultimately goal will be to try and execute it using a high level language like Pig Latin, since my main line of work is with the Data warehouses, databases and reporting.
Hi Boss it is an excellent article . I am tried to implement tha same and i had exported this as a jar file but i am getting the below error while excuting .
I had downoaded the pdfbox-app-1.8.9.jar and configured in my buildpath , even though after exporting it as a jaj file and while executing in hadoop i am facing the below error , could to please help me on this.
[cloudera@localhost ~]$ hadoop jar /home/cloudera/Downloads/PDFWordCountApp.jar com.hadoop.mapReduce.PDF.PdfInputDriver /user/cloudera/PDF_input/RevanthCV.pdf /user/cloudera/VotesCountApp_output
15/05/04 10:14:19 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/05/04 10:14:19 INFO input.FileInputFormat: Total input paths to process : 1
15/05/04 10:14:20 INFO mapred.JobClient: Running job: job_201505020109_0004
15/05/04 10:14:21 INFO mapred.JobClient: map 0% reduce 0%
15/05/04 10:14:40 INFO mapred.JobClient: Task Id : attempt_201505020109_0004_m_000000_0, Status : FAILED
Error: java.lang.ClassNotFoundException: org.apache.pdfbox.pdmodel.PDDocument
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at com.hadoop.mapReduce.PDF.PdfRecordReader.initialize(PdfRecordReader.java:41)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.ha
15/05/04 10:14:53 INFO mapred.JobClient: Task Id : attempt_201505020109_0004_m_000000_1, Status : FAILED
Error: java.lang.ClassNotFoundException: org.apache.pdfbox.pdmodel.PDDocument
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at com.hadoop.mapReduce.PDF.PdfRecordReader.initialize(PdfRecordReader.java:41)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
Thanks
Revanth.
Hi Revanth,
Please let us know if find any solution for this issue. Even i am stuck with the same issue
Did you find a solution to this?
The class not found issue is because of missing jar. First get a normal pdf parser working, then implement this program. I tested this program and it is working fine.
@amalgjose–the PDFBof.1.8.11 app jar is already added in the build path..getting the error while running the jar in hdfs..
The line throwing error is “pdf = PDDocument.load(fileIn);” under PdfRecordReader class. Don’t understand why..
Did u added fontbox jar ?
Yes, I did…still the same issue
Everything is working fine now. But, I have an issue which I don’t understand. The Last Line of the input PDF is being missed out somehow. It is not being processed. Do you have any idea about the same?
I changed this line if (temp < (lines.length – 1)) in your PDF Record Code to if (temp < (lines.length)) and it worked fine. Why were you subtracting -1 from it exclusively? Any particular reasons of skipping the last split?
Did you find out the fix to the error?
I have the same Problem. How did you find the solution? I have already added PDFBox and Fonxbox jars to the Build Path
I am facing the same issue . Did anybody found the solution
1) Place the jar file of pdfbox in hadoop lib folder too.(make library jar available to hadoop at runtime).
2) Restart hadoop cluster.
Or
1) Make sure that your pdfbox library is available to hadoop by placing it in distributed cache.
Hi Nice article, even i am getting same error can you please reply immediately
Hi. This is very useful article for me. I am also facing same issue which is explained by Revanth. I added below jars to the project
1. commons-cli-1.2.jar
2. commons-logging-api-1.1.1
3. pdfbox-1.3.1
Could you please help me on this.
Thanks,
Sravan
Thankyou Very much brother.. you have given us a root map to understand the way to read the data into Mapreduce as per our requirement.
Thank you brother…It is really helpful
Nice article. I need to search for a word in pdf document. Can you share the code for the same
This is simple, you need a mapper alone code. You just search for the word in the mapper class and mark it.
HI Mate
Getting below error :-
16/04/16 23:48:32 INFO mapred.JobClient: Task Id : attempt_201604162323_0004_m_000000_2, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
I am getting false as output and not files are created in output dir
Thank you Amal for this Article!
I am using Oracle VM VirtualBox Manager. BigDataLite-4.2.1. However I do not get how to add these classes to my Hadoop environment. Can you help me for this?
My ultimately goal will be to try and execute it using a high level language like Pig Latin, since my main line of work is with the Data warehouses, databases and reporting.
Hi,
Im trying to execute below code in eclipse ,but its showing errors at below steps
1.for (LongWritable value : values) { –type mismatch: cannot convert from element type Object to LongWritable. This one is from reducer class
2. this.lines = parsedText.split(“\n } —String literal is not properly closed by a double quote- i tried to give double quote but no luck .This one is from PdfRecordRedaer Class
Can any help on this
ERROR:
MAPREDUCE JOB
which jar file i need to download it is not working for hadoop 2.x but it is working for hadoop 1.x
i have also added all jarfiles to share/common/lib
fontbox-1.5.0
fontbox-1.6.0-for-pdfbox
pdfbox-1.8.2
pdfbox-2.0.0
pdfbox-1.8.10
hadoop-ant
hadoop-core
hadoop-example
this jar files working for hadoop 1.x
17/04/12 19:51:04 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032
17/04/12 19:51:07 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/04/12 19:51:08 INFO input.FileInputFormat: Total input paths to process : 1
17/04/12 19:51:09 INFO mapreduce.JobSubmitter: number of splits:1
17/04/12 19:51:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1492050090340_0001
17/04/12 19:51:12 INFO impl.YarnClientImpl: Submitted application application_1492050090340_0001
17/04/12 19:51:13 INFO mapreduce.Job: The url to track the job: http://ubuntu:8088/proxy/application_1492050090340_0001/
17/04/12 19:51:13 INFO mapreduce.Job: Running job: job_1492050090340_0001
17/04/12 19:52:01 INFO mapreduce.Job: Job job_1492050090340_0001 running in uber mode : false
17/04/12 19:52:01 INFO mapreduce.Job: map 0% reduce 0%
17/04/12 19:52:39 INFO mapreduce.Job: map 100% reduce 0%
17/04/12 19:52:40 INFO mapreduce.Job: map 0% reduce 0%
17/04/12 19:52:41 INFO mapreduce.Job: Task Id : attempt_1492050090340_0001_m_000000_0, Status : FAILED
Error: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
Container killed by the ApplicationMaster.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
17/04/12 19:53:04 INFO mapreduce.Job: Task Id : attempt_1492050090340_0001_m_000000_1, Status : FAILED
can anyone help for this error
thanks in advanse.
What do you mean by ” First get a normal pdf parser working” How do I do that?
Please help. Thanks
hi Amal Jose i am trying to implement this but unsuccessful , my problem statement : i have a TB of PDFs on my server and i dont know what each file contains , so i want to create a program by which i can extract title , keywords from the first page of the research papers i am parsing , create a database for further search queries , after that creating a web app by which people can search and download research papers , have to implemented on hadoop , i am stuck.