Enabling Log Aggregation in YARN

While checking the details of a YARN applications, if you are getting a message similar to “Log Aggregation not enabled”. You can follow the below steps to enable it. This issue occurs in EMR, because in most of the AMI’s the log aggregation is not enabled by default. It is very simple to enable it. Add the following configuration to the yarn-site.xml of all the yarn hosts and restart the cluster. (full cluster restart is not required. Restarting all the nodemanagers will be fine)


    <description>Where to aggregate logs to.</description>



Installing Cloudera Manager in an existing hadoop cluster

Cloudera Manager is an Infrastructure management and monitoring tool provided by cloudera. This has now became a very excellent tool to manage bigdata infrastructure. The pain of administrators has been reduced by 80% with this cloudera manager. Almost everything required for an administrator is integrated into this great software and is very user friendly. Cloudera Manager became this muhc powerful recently. So lot of existing clusters are still running without using cloudera manager. If you want to manage an existing cluster using cloudera manager, the following steps may help you. For this you have to completely uninstall the existing hadoop set up. No data loss will happen because we are not touching any data. The configurations also will remain the same. These are just pointers.

1) Stop all the services
2) Back up hive metastore, Namenode metadata and all the other required metastores (Eg hue, oozie)
3) Back up all the configurations
4) Note down the existing storage directories
5) Uninstall all the hadoop services (Never touch the data)
6) Install Cloudera Manager Server and Agent
7) Install all the services (It should be same version as that of previous to make installation smoother)
8) Add the configurations (Use the same configurations as that of previous. There is an option to add xml configs in CM)
9) Point the storage directories in the cloudera manager configurations.
10) Point the new installation to the existing metastore (hive, oozie, hue etc)
11) Start all the services (Don’t format the namenode)
12) Test the cluster

Migrating Namenode from one host to another host

Namenode is the heart of the hadoop cluster. So namenode will be installed in a good quality machine compared to the other nodes. If we want to migrate namenode from one node to another node, the following steps are required. This is a rare scenario.

Manual Approach

Method 1: (By migrating the harddrive)

  • Stop all the running jobs in the cluster
  • Enter into Namenode Safe
    • hdfs dfsadmin -safemode enter
  • Execute the following command to save the currrent namespace to the storage directories and reset editlogs..
    • hdfs dfsadmin -saveNamespace
  • Stop the entire cluster
  • Remove the hard disk from the old namenode host and attach it to the new namenode host
  • Release the ipaddress from the old namenode host and assign it to the new namenode host
  • Start the new namenode (DO NOT PERFORM FORMAT)
  • Start all the services

Method 2: (New Harddrive)

  • Stop all the running jobs in the cluster
  • Enter into Namenode Safe
    • hdfs dfsadmin -safemode enter
  • Execute the following command to save the currrent namespace to the storage directories and reset editlogs..
    • hdfs dfsadmin -saveNamespace
  • Stop the entire cluster
  • Login to the namenode host.
  • Navigate to the namenode storage directories.
  • Copy the namenode metadata. Always better to keep this as a compressed file. Notedown the folder and file permissions & ownership.
  • Take a back up of the configuration files.
  • Install namenode of the same version as that of the existing system to the new machine.
  • Ensure that the ipaddress of the old host is taken and assigned to the new host.
  • Copy the configuration files and metadata to the new namenode host
  • Create namenode storage directory structure in the new host.
  • Maintain the same folder permissions and ownership in the new host also.
  • If there are any changes in namenode directory structure, make the corresponding changes in config files.
  • Incase of a kerberised cluster, create appropriate principles for the new host and place the proper keytabs.
  • Start the new namenode. (DO NOT PERFORM FORMAT)
  • Start the remaining services.
  • Test the working of the cluster by executing file system operations as well as MR operations.

Automated Approach in a cluster managed using Cloudera Manager (CM above 5.4)

If you are using cloudera manager 5.4 or above, there is a new feature known as Namenode Role Migration that helps us to migrate namenode from one host to another. This requires HDFS HA to be enabled.

HadoopLens Toolkit – An end to end solution for fitting hadoop in an enterprise

Hadoop Lens Toolkit is a product from Knowledge Lens Pvt Limited which is an end to end solution for hadoop cluster/data center migration. This is a very useful toolkit required for any customer who is using hadoop. This is an end to end solution for hadoop cluster back up, data compression, migration and recovery. HadoopLens toolkit has the solution for all the problems/drawbacks of hadoop while implementing in an enterprise.

Pdf Input Format implementation for Hadoop Mapreduce

In my opinion Hadoop is not a cooked tool or framework with readymade features, but it is an efficient framework which allows a lot of customizations  based on our usecases. It is our choice to modify it. By modification, I am not meaning about the modification of the architecture or working, but the modification of its functionality and features.

By default hadoop accepts text files.  But in practical scenarios, our input files may not be text files. It can be pdf, ppt, pst, image or anything. So we need to make hadoop compatible with this various types of input formats.

Here I am explaining about the creation of a custom input format for hadoop. I am explain the code for implementing pdf reader logic inside hadoop. Similarly  you can create any custom reader of your choice. The fundamental is same.

A simple pdf to text conversion program using java is  explained in my previous post PDF to Text. This is a simple pdf parser which converts the text content of the pdf only. If you want more features, you can modify it accordingly. My intention here is to explain about the creation of a custom input format reader for hadoop.

For doing this logic, we need to write or modify two classes.

One is we need a similar class like the default TextInputFormat.class for pdf. We can call it as PdfInputFormat.class.

Second one is we need a similar class like the default LineRecordReader for handling pdf. We can call it as PdfRecordReader.class

If you examine the source code of hadoop, you can see that the default TextInputFormat class is extended from a parent class called FileInputFormat.

So in our case, we can also create a PdfinputFormat class extending the FileInputFormat class.

This will contain a method called createRecordReader which it got from the parent class.

We are calling our custom PdfRecordReader class from this createRecordReader method.

The code for PdfInputFormat is given below.

package com.amal.pdf;


import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat {

	public RecordReader createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {

		return new PdfRecordReader();


The PdfRecordReader is a custom class created by us extending the RecordReader.

This mainly contains  five methods which is inherited from the parent RecordReader class.

Initialize(), nextKeyValue(),getCurrentKey(),getCurrentValue(), getProgress(), close().

The logic I am using is explained below.

We are applying our pdf parsing logic in this method. This method will get the input split and we parses the input split using our pdf parser logic. The output of the pdf parser will be a text which will be stored in a variable. Then we splits the text into multiple lines by using ‘/n’ as the splitter and we will store this lines in  an array.

We need to send this as a key-value pair. So I am planning to send line number as the key and each line as value. So the logic for checking  getting from the array, setting it as key and value , checking for the completion condition etc are written in the code.

The PdfRecordReader  code is written below.

package com.amal.pdf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader {

	private String[] lines = null;
	private LongWritable key = null;
	private Text value = null;

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {

		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		final Path file = split.getPath();

		 * The below code contains the logic for opening the file and seek to
		 * the start of the split. Here we are applying the Pdf Parsing logic

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn =;
		PDDocument pdf = null;
		String parsedText = null;
		PDFTextStripper stripper;
		pdf = PDDocument.load(fileIn);
		stripper = new PDFTextStripper();
		parsedText = stripper.getText(pdf);
		this.lines = parsedText.split("\n");


	public boolean nextKeyValue() throws IOException, InterruptedException {

		if (key == null) {
			key = new LongWritable();
			value = new Text();
		} else {
			int temp = (int) key.get();
			if (temp < (lines.length - 1)) {
				int count = (int) key.get();
				value = new Text();
				count = count + 1;
				key = new LongWritable(count);
			} else {
				return false;

		if (key == null || value == null) {
			return false;
		} else {
			return true;

	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {

		return key;

	public Text getCurrentValue() throws IOException, InterruptedException {

		return value;

	public float getProgress() throws IOException, InterruptedException {

		return 0;

	public void close() throws IOException {



For using this in a program, you need to specify this in the driver class. We need to set our custom input format class in the “InputFormatClass” property of our mapreduce program.
For getting a complete picture, I am associating this  with the basic word count mapreduce program.

Driver Class

package com.amal.pdf;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf, args);
		args = parser.getRemainingArgs();
		Job job = new Job(conf, "Pdfwordcount");
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));


Mapper Class

package com.amal.pdf;

import java.util.StringTokenizer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class WordCountMapper extends
		Mapper {
	private Text word = new Text();
	private final static LongWritable one = new LongWritable(1);

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			context.write(word, one);

Reducer Class

package com.amal.pdf;


import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
		Reducer {
	protected void reduce(Text key, Iterable values,
			Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (LongWritable value : values) {
			sum += value.get();

		context.write(key, new LongWritable(sum));

In a similar way, you can write any custom input format as you wish. 🙂 🙂

Namenode High Availability

Last week I tried Namenode HA using Cloudera Distribution of Hadoop (CDH 4.5.0).

It is very easy to configure and automatic fail over is happening smoothly. Tried the redundancy by pulling the power cable of one of the namenodes. It was successful. For more details, visit the official website of Cloudera.

I have tried this HA in a 64 TB hadoop cluster.

Decommissioning a Datanode in a Hadoop cluster

Sometimes we may require to remove a node from a hadoop cluster without loosing the data.
For this we have to do the decommissioning procedures.
Decommisioning will exclude a node from the cluster after replicating the data present in the decommissioning node to the other active nodes.

The decommissioning is very simple. The steps are explained below.
First stop the tasktracker in the node to be decommissioned.
In the namenode machine add the below property to the hdfs-site.xml


where dfs.exclude is a file that we have to create and place it in a safe location. Better to keep it in HADOOP_CONF_DIR (/etc/hadoop/conf).

Create a file named dfs.exclude and add the hostnames of machines that need to be decommissioned line by line.

Eg: dfs.exclude


After doing this, execute the following command from the superuser in the namenode machine.

hadoop dfsadmin -refreshNodes

After this, check the namenode UI. ie http://namenode:50070
You will be able to see the machines under decommissioning nodes.
The decommissioning process will take some time.
After the re-replication gets completed, the machine will be added to decommissioned nodes list.
After this, the decommissioned node can be safely removed from the cluster. 🙂

An Introduction to Apache Hive

Hive is an important member of hadoop ecosystem. It runs on top of hadoop.  Hive uses a SQL type query language to process the data in hdfs. Hive is very simple as compared to writing several lines of mapreduce codes using programming languages such as Java. Hive was developed by facebook in a vision to support their SQL experts to handle big data without much difficulty.  Hive queries are easy to learn for people who don’t know any programming languages.  People having experience in SQL can go straight forward with hive queries. The queries fired into hive will ultimately run as mapreduce.

Hive runs in two execution modes, local and distributed mode.

In local, the hive queries run as a single process and uses the local file system. In distributed mode, the mapper and reducer runs as different process and uses the hadoop distributed file system.

The installation of hive was explained well in my previous post Hive Installation.

Hive stores its contents in hdfs and table details (metadata) in some databases. By default the metadata is stored in derby database, but this is just for play around setups only and cannot be used for multiuser environments. For multiuser environments, we can use databases such as mysql, postgresql , oracle etc for storing the hive metadata. The data are stored in hdfs and it is contained in a location called hive warehouse directory which is defined by the property hive.metastore.warehouse.dir. By default this will be /user/hive/warehouse

We can fire queries into hive using a command line interface or using clients written in different programming languages. Hive server exposes a thrift service making hive accessible from various programming languages .

The simplicity and power of hive can be explained by comparing the word count program written in java program and in hive query.

The word count program written in java is well explained in my previous post A Simple Mapreduce Program – Wordcount . For that have to write a lot of lines of code and it will take time and it needs some good programming knowledge also.

The same word count can be done using hive query in a few lines of hive query.

CREATE TABLE word_counts AS
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, '\s')) AS word FROM docs) word
ORDER BY word;

Hadoop Mapreduce- A Good Presentation…

This is a video  I found from youtube that explains hadoop mapreduce very clearly using the wordcount example.

Migrating hive from one hadoop cluster to another cluster

Recently I have migrated a hive installation from one cluster to another cluster. I havent find any
document regarding this migration. So I did it with my experience and knowledge.
Hive stores the metadata in some databases, ie it stores the data about the tables in some database.
For developement/ production grade installations, we normally use mysql/oracle/postgresql databases.
Here I am explaining about the migration of the hive with its metastore database in mysql.
The metadata contains the information about the tables. The contents of the table are stored in hdfs.
So the metadata contains hdfs uri and other details. So if we migrate hive from one cluster to another
cluster, we have to point the metadata to the hdfs of new cluster. If we haven’t do this, it will point
to the hdfs of older cluster.

For migrating a hive installation, we have to do the following things.

1) Install hive in the new hadoop cluster
2) Transfer the data present in the hive metastore directory (/user/hive/warehouse) to the new hadoop
3) take the mysql metastore dump.
4) Install mysql in the new hadoop cluster
5) Open the hive mysql-metastore dump using text readers such as notepad, notepad++ etc and search for
hdfs://ip-address-old-namenode:port and replace with hdfs://ip-address-new-namenode:port and save it.

Where ip-address-old-namenode is the ipaddress of namenode of old hadoop cluster and ip-address-
is the ipaddress of namenode of new hadoop cluster.

6) After doing the above steps, restore the editted mysql dump into the mysql of new hadoop cluster.
7) Configure hive as normal and do the hive schema upgradations if needed.

This is a solution that I discovered when I faced the migration issues. I dont know whether any other
standard methods are available.
This worked for me perfectly. 🙂