Configuring Fair Scheduler in Hadoop Cluster

Hadoop comes with various scheduling algorithms such as FIFO, Capacity, Fair, DRF etc. Here I am briefly explaining about setting up fair scheduler in hadoop. This can be performed in any distribution of hadoop. By default hadoop comes with FIFO scheduler, some distribution comes with Capacity Scheduler as the default scheduler. In multiuser environments, a scheduler other than the default FIFO is definitely required. FIFO will not help us in multiuser environments because it makes us to wait in a single queue based on the order of job submission. Creating multiple job queues and assigning a portion of the cluster capacity and adding users to these queues will help us to manage and utilize the cluster resources properly.
For setting up a fair scheduler manually, we have to make some changes in the resource manager node. One is a change in the yarn-site.xml and another is the addition of a new configuration file fair-scheduler.xml
The configurations for a basic set up are given below.

Step 1:
Specify the scheduler class in the yarn-site.xml. If this property exists, replace it with the below value else add this property to the yarn-site.xml

  
<property>
   <name>yarn.resourcemanager.scheduler.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

Step 2:
Specify the Fair Scheduler allocation file. This property has to be set in yarn-site.xml. The value should be the absolute location of fair-scheduler.xml file. This file should be present locally.

 
<property>
  <name>yarn.scheduler.fair.allocation.file</name>
  <value>/etc/hadoop/conf/fair-scheduler.xml</value>
</property>

Step 3:
Create the allocation configuration file
A sample allocation file is given below. We can have advanced configurations in this allocation file. This is an allocation file with a basic set of configurations
There are five types of elements which can be set up in an allocation file

Queue element :– Representing queues. It has the following properties:

  • minResources — Setting the minimum resources of a queue
  • maxResources — Setting the maximum resources of a queue
  • maxRunningApps — Setting the maximum number of apps from a queue to run at once
  • weight — Sharing the cluster non-proportional with other queues. Default to 1
  • schedulingPolicy — Values are “fair”/”fifo”/”drf” or any class that extends
  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy
  • aclSubmitApps — Listing the users who can submit apps to the queue. If specified, other users will not be able to submit apps to the queue.
  • minSharePreemptionTimeout — Specifying the number of seconds the queue is under its minimum share before it tries to preempt containers to take resources from other queues.

User elements :– Representing user behaviors. It can contain a single properties to set maximum number apps for a particular user.

userMaxAppsDefault element :– Setting the default running app limit for users if the limit is not otherwise specified.

fairSharePreemptionTimeout element :– Setting the number of seconds a queue is under its fair share before it tries to preempt containers to take resources from other queues.

defaultQueueSchedulingPolicy element :– Specifying the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified.

 <?xml version="1.0"?>
<allocations>
 
 <queue name="queueA">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>5000 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,amal</aclSubmitApps>
 <weight>2.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueB">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sahad,amal</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueC">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sree</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <user name="amal">
 <maxRunningApps>10</maxRunningApps>
 </user>
 
 <user name="hdfs">
 <maxRunningApps>5</maxRunningApps>
 </user>
 
 <user name="sree">
 <maxRunningApps>8</maxRunningApps>
 </user>
 
 <user name="sahad">
 <maxRunningApps>2</maxRunningApps>
 </user>
 
 <userMaxAppsDefault>5</userMaxAppsDefault>
 <fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>
 </allocations>

Here we created three queues queueA, queueB and queueC and mapped users to these queues. While submitting the job, the user should specify the queue name. Only the user who has access to the queue can submit jobs to a particular queue. This is defined in the acls. Another thing is scheduling rules. If we specify scheduling rules, the jobs from a particular user will be directed automatically to a particular queue based on the rule. I am not mentioning the scheduling rule part here.

After making these changes, restart the resource manager. 

Now go to the resource manager web ui. In the left side of the UI, you can see a section named Scheduler. Click on that section, you will be able to see the newly created queues.

Now submit a job by specifying a queue name. You can use the option as below. The below option will submit the job to queueA. All the queues that we created are the sub-pools of root queue. Because of that, we have to specify queue name in the fomat parentQueue.subQueue

-Dmapred.job.queue.name=root.queueA

Eg:  hadoop jar hadoop-examples.jar wordcount -Dmapred.job.queue.name=root.queueA  <input-location>  <output-location>

If you are running a hive query, you can set these property in the below format. This property should be set at the top.

set mapred.job.queue.name=root.queueA
Advertisements

Deployment and Management of Hadoop Clusters

Changing the Hive Warehouse Directory

By default the hive warehouse directory is located at  the hdfs location /user/hive/warehouse

If you want to change this location, you can add the following property to hive-site.xml.

Everyone using hive should have appropriate read/write permissions to this warehouse directory.

<property>
   <name>hive.metastore.warehouse.dir</name>
   <value>/user/hivestore/warehouse </value>
   <description>location of the warehouse directory</description>
 </property>

Custom Text Input Format Record Delimiter for Hadoop

By default mapreduce program accepts text file and it reads line by line. Technically speaking the default input format is text input format and the default delimiter is ‘/n’ (new line).

In several cases, we need to override this property. For example if you have a large text file and you want to read the contents between ‘.’  in each read.

In this case, using the default delimiter will be difficult.

For example.

If you have a file like this


Ice cream (derived from earlier iced cream or cream ice) is a frozen  title="Dessert"  usually made from dairy products, such as milk and cream and often combined with fruits or other ingredients and flavours. Most varieties contain sugar, although some are made with other sweeteners. In some cases, artificial flavourings and colourings are used in addition to, or instead of, the natural ingredients. The mixture of chosen ingredients is stirred slowly while cooling, in order to incorporate air and to prevent large ice crystals from forming. The result is a smoothly textured semi-solid foam that is malleable and can be scooped.

And we want each record as


1) Ice cream (derived from earlier iced cream or cream ice) is a frozen dessert usually made from dairy products, such as milk and cream and often combined with fruits or other ingredients and flavours

2) Most varieties contain sugar, although some are made with other sweeteners

3) In some cases, artificial flavourings and colourings are used in addition to, or instead of, the natural ingredients

4) The mixture of chosen ingredients is stirred slowly while cooling, in order to incorporate air and to prevent large ice crystals from forming

5) The result is a smoothly textured semi-solid foam that is malleable and can be scooped

This we can do it by overriding one property textinputformat.record.delimiter

We can either set this property in the driver class or just changing the value of delimiter in the TextInputFormat class.

The first method is the easiest way.

Setting the textinputformat.record.delimiter in Driver class

The format for setting it in the program (Driver class)  is


conf.set(“textinputformat.record.delimiter”, “delimiter”)

The value you are setting by this method is ultimately going into the TextInputFormat class. This is explained below.

Editting the TextInputFormat class

.

Default TextInputFormat class

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
// By default,textinputformat.record.delimiter = ‘/n’(Set in configuration file)
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }
}

Editted TextInputFormat class


public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {

// Hardcoding this value as “.”
// You can add any delimiter as your requirement

    String delimiter = “.”;
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }
}

HDFS Operations Using Java Program

We are familiar with Hadoop Distributed File System operations such as copyFromLocal, copyToLocal, mv, cp, rmr etc.
Here I am explaining the method to do these operations using Java API. Currently I am explaining the programs to do copyFromLocal and copyToLocal functions only.

Here I used eclipse IDE for programming which is installed in my windows desktop machine.
I have a hadoop cluster. The cluster machines and my destop machine are in the same network.

First create a java project and inside that create a folder named conf. Copy the hadoop configuration files (core-site.xml, mapred-site.xml, hdfs-site.xml) from your hadoop installation to this conf folder.

Create another folder named source which we are using as the input location and put a text file inside that source folder.
One thing you have to remember is that the source and destination locations will be given appropriate permissions. Otherwise read/write will be blocked.

Copying a File from Local to HDFS

The command is
hadoop fs -copyFromLocal

package com.amal.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 * @author amalgjose
 *
 */
public class CopyFromLocal {

	public static void main(String[] args) throws IOException {
		
		Configuration conf =new Configuration();
		conf.addResource(new Path("conf/core-site.xml"));
		conf.addResource(new Path("conf/mapred-site.xml"));
		conf.addResource(new Path("conf/hdfs=site.xml"));
		FileSystem fs = FileSystem.get(conf);
		Path sourcePath = new Path("source");
		Path destPath = new Path("/user/training");
		if(!(fs.exists(destPath)))
		{
			System.out.println("No Such destination exists :"+destPath);
			return;
		}
		
		fs.copyFromLocalFile(sourcePath, destPath);
		
	}
}

Copying a File from HDFS to Local

The command is
hadoop fs -copyToLocal

package com.amal.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * @author amalgjose
 *
 */
public class CopyToLocal {
public static void main(String[] args) throws IOException {
		
		Configuration conf =new Configuration();
		conf.addResource(new Path("conf/core-site.xml"));
		conf.addResource(new Path("conf/mapred-site.xml"));
		conf.addResource(new Path("conf/hdfs=site.xml"));
		FileSystem fs = FileSystem.get(conf);
		Path sourcePath = new Path("/user/training");
		Path destPath = new Path("destination");
		if(!(fs.exists(sourcePath)))
		{
			System.out.println("No Such Source exists :"+sourcePath);
			return;
		}
		
		fs.copyToLocalFile(sourcePath, destPath);
		
	}
}

Rhipe Installation

Rhipe was first developed by Saptarshi Guha.
Rhipe needs R and Hadoop. So first install R and hadooop. Installation of R and hadoop are well explained in my previous posts. The latest version of Rhipe as of now is Rhipe-0.73.1. and  latest available version of R is R-3.0.0. If you are using CDH4 (Cloudera distribution of hadoop) , use Rhipe-0.73 or later versions, because older versions may not work with CDH4.
Rhipe is an R and Hadoop integrated programming environment. Rhipe integrates R and Hadoop. Rhipe is very good for statistical and analytical calculations of very large data. Because here R is integrated with hadoop, so it will process in distributed mode, ie  mapreduce.
Futher explainations of Rhipe are available in http://www.datadr.org/

Prerequisites

Hadoop, R, protocol buffers and rJava should be installed before installing Rhipe.
We are installing Rhipe in a hadoop cluster. So the job submitted may execute in any of the tasktracker nodes. So we have to install R and Rhipe in all the tasktracker nodes, otherwise you will face an exception “Cannot find R” or something similar to that.

Installing Protocol Buffer

Download the protocol buffer 2.4.1 from the below link

http://protobuf.googlecode.com/files/protobuf-2.4.1.tar.gz

tar -xzvf protobuf-2.4.1.tar.gz

cd protobuf-2.4.1

chmod -R 755 protobuf-2.4.1

./configure

make

make install

Set the environment variable PKG_CONFIG_PATH

nano /etc/bashrc

export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig

save and exit

Then executed the following commands to check the installation

pkg-config --modversion protobuf

This will show the version number 2.4.1
Then execute

pkg-config --libs protobuf

This will display the following things

-pthread -L/usr/local/lib -lprotobuf -lz –lpthread

If these two are working fine, This means that the protobuf is properly installed.

Set the environment variables for hadoop

For example

nano /etc/bashrc

export HADOOP_HOME=/usr/lib/hadoop

export HADOOP_BIN=/usr/lib/hadoop/bin

export HADOOP_CONF_DIR=/etc/hadoop/conf

save and exit

Then


cd /etc/ld.so.conf.d/

nano Protobuf-x86.conf

/usr/local/lib   # add this value as the content of Protobuf-x86.conf

Save and exit

/sbin/ldconfig

Installing rJava

Download the rJava tarball from the below link.

http://cran.r-project.org/web/packages/rJava/index.html

The latest version of rJava available as of now is rJava_0.9-4.tar.gz

install rJava using the following command

R CMD INSTALL rJava_0.9-4.tar.gz

Installing Rhipe

Rhipe can be downloaded from the following link
https://github.com/saptarshiguha/RHIPE/blob/master/code/Rhipe_0.73.1.tar.gz

R CMD INSTALL Rhipe_0.73.1.tar.gz

This will install Rhipe

After this type R in the terminal

You will enter into R terminal

Then type

library(Rhipe)

#This will display

------------------------------------------------

| Please call rhinit() else RHIPE will not run |

————————————————

rhinit()

#This will display

Rhipe: Detected CDH4 jar files, using RhipeCDH4.jar
Initializing Rhipe v0.73
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/client-0.20/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/client/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
Initializing mapfile caches

Now you can execute you Rhipe scripts.

Hadoop Installation in Isolated Environments

Introduction

Apache Hadoop is an open-source software framework that supports data-intensive distributed applications, licensed under the Apache v2 license. It supports the running of applications on large clusters of commodity hardware. Hadoop was derived from Google’s MapReduce and Google File System(GFS) papers.

Hadoop development needs a hadoop cluster.
A trial hadoop cluster can be setted up in minutes. In my previous blog post, I have mentioned about hadoop cluster set up using tarball.

But for production environments, tarball installation is not a good approach.
Because it will become complex while installing other hadoop ecosystem components.
If we have a high speed internet connection, we can install hadoop directly from the internet using yum install in minutes.
But most of the production environments are isolated. So internet connection may not be there.
There we can perform this yum install by creating a local yum repository.
Creation of local yum repository is explained well in my previous blog “Creating A Local YUM Repository”
Yum install will work with REDHAT or CentOS linux distributions.
Here I am giving the explanation of Cloudera Distribution of Hadoop installation.

Prerequisites

OPERATING SYSTEM

RedHat or CentOS (32 or 64 bit)

PORTS

The ports necessary for hadoop should be opened. So we need to set appropriate firewall rules. The ports used by hadoop are listed in the last part of this post.

If you are not interested in, you can simply switch off the firewall.

The command for turning off the firewall is

service iptables stop

JAVA

Sun java is required.

Download java from oracle website (32 or 64 bit depending on OS)

Install the java.

Simple installation and setting JAVA_HOME may not points the newly installed java as the default one.

It may still point to the openjdk if it is present.

So to point to the new java.

Do the following steps.

alternatives --config java

This will a list of java installed in the machine and to which java, it is currently pointing.

This will ask you to choose any java from the list.

Exit from this by pressing cntrl+c.

To add our sun java to this list. Do the following step.

/usr/sbin/alternatives --install /usr/bin/java java <JAVA_HOME>/bin/java 2

This will add our newly installed java to the list.

Then do

alternatives --config java

and choose the newly installed java. Now java –version will show sun java.

SETTING UP THE LOCAL HADOOP REPOSITORY

Download the Cloudera rpm repository from a place where you have internet access.

Download the repository corresponding to your OS version.
The repo file corresponding to different operating systems are listed below. Copy the repo file and download the repository.

For OS Version Click this Link
Red Hat/CentOS/Oracle 5 Red Hat/CentOS/Oracle 5
Red Hat/CentOS 6 (32-bit) Red Hat/CentOS/Oracle 6
Red Hat/CentOS 6 (64-bit) Red Hat/CentOS/Oracle 6

You can download it rpm by rpm or do a repo-sync.

Repo-sync is explained in my previous post Creating A Local YUM Repository.

Once this is done, create a local repository in one of the machines.

Then create a repo file corresponding to the newly created repository and add that repo file to all the cluster machines.

After this we can do yum install similar like a machine having internet access.

Now do a yum clean all in all the cluster machines.

Do the following steps in the corresponding nodes. The following steps explains the installation of MRV1 only.

Installation

NAMENODE MACHINE

yum install hadoop-hdfs-namenode

SECONDARY NAMENODE

yum install hadoop-hdfs-secondarynamenode

DATANODE

yum install hadoop-hdfs-datanode

JOBTRACKER

yum install hadoop-0.20-mapreduce-jobtracker

TASKTRACKER

yum install hadoop-0.20-mapreduce-tasktracker

IN ALL CLIENT MACHINES

yum install hadoop-client

Normally we run datanode and tasktracker in the same node. Ie these are co-located for data locality.

Now edit the core-site.xml, mapred-site.xml and hdfs-site.xml in all the machines.

Configurations

Sample configurations are given below.

But for production set up, you can set other properties.

core-site.xml


<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/app/hadoop/tmp</value>
<description>A Base dir for storing other temp directories</description>
</property>

<property>
<name>fs.default.name</name>
<value>hdfs://<namenode-hostname>:9000</value>
<description>The name of default file system</description>
</property>
</configuration>

mapred-site.xml

<configuration>
<property>
<name>mapred.job.tracker</name>
<value><jobtracker-hostname>:9001</value>
<description>Job Tracker port</description>
</property>

<property>
<name>mapred.local.dir</name>
<value>/app/hadoop/mapred_local</value>
<description>local dir for mapreduce jobs</description>
</property>


<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>6</value>
  <description>The maximum number of map tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>
<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>2</value>
  <description>The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>
</configuration>

hdfs-site.xml

<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
<description>replication factor</description>
</property>
</configuration>

Then create hadoop.tmp.dir in all the machines. Hadoop stores its files in this folder.

Here we are using the location /app/hadoop/tmp

mkdir –p /app/hadoop/tmp

mkdir /app/hadoop/mapred_local

This type of installation automatically creates two users

1) hdfs

2) mapred

The directories should be owned by hdfs, so we need to change the ownership

chown –R hdfs:hadoop /app/hadoop/tmp

chmod –R 777 /app/hadoop/tmp

chown mapred:hadoop /app/hadoop/mapred_local

The properties

mapred.tasktracker.map.tasks.maximum :- this will set the number of map slots in each node.

mapred.tasktracker.reduce.tasks.maximum :- this will set the number of reduce slots in each node.

This number is set by doing a calculation on the available RAM and jvm size of each task slot. The default size of the task slot is 200 MB. So if you have 4GB RAM free after sharing to OS requirements and other processes. We can have 4*1024 MB/200 number of task slots in that node.

ie 4*1024/200 = 20

So we can have 20 task slots which we can divide into map slots and reduce slots.
Usually we give higher number of map slots than reduce slots.
In hdfs-site.xml we are giving the replication factor. Default value is 3.

Formatting Namenode

Now go to the namenode machine and login as root user.

Then from cli, switch to hdfs user.

su – hdfs

Then format the namenode.

hadoop namenode –format

Starting Services

STARTING NAMENODE

In the namenode machine, execute the following command as root user.

/etc/init.d/hadoop-hdfs-namenode start

You can check whether the service is running or not by using the command jps.

Jps will work only if sun java is installed and added to path.

STARTING SECONDARY NAMENODE

In the Secondary Namenode machine, execute the following command as root user.

/etc/init.d/hadoop-hdfs-secondarynamenode start

STARTING DATANODE

In the Datanode machines, execute the following command as root user.

/etc/init.d/hadoop-hdfs-datanode start

Now the hdfs is started. Now we will be able to execute hdfs tasks.

STARTING JOBTRACKER

In the jobtracker machine login as root user, then switch to hdfs user.

su – hdfs

Then create the following hdfs directory structure and permissions.

hadoop fs –mkdir /app

hadoop fs –mkdir /app/hadoop

hadoop fs –mkdir /app/hadoop/tmp

hadoop fs –mkdir /app/hadoop/tmp/mapred

hadoop fs –mkdir /app/hadoop/tmp/mapred/staging

hadoop fs –chmod –R 1777 /app/hadoop/tmp/mapred/staging

hadoop fs –chown mapred:hadoop /app/hadoop/tmp/mapred

After doing this start the jobtracker

/etc/init.d/hadoop-0.20-mapreduce-jobtracker start

STARTING TASKTRACKER

In the tasktracker nodes, start the tasktracker by executing the following command

/etc/init.d/hadoop-0.20-mapreduce-tasktracker start

You can check the namenode webUI using a browser.

The URL is

http://<namenode-hostname>:50070

The Jobtracker web UI is

http://<jobtracker-hostname>:50030

If hostname resolution is not happened correctly, the hostname:port may not work.

In these situtaions you can use http://ip-address:port

Now our hadoop cluster is ready for use.

With this method, we can create hadoop cluster of any size within short time.

Here we have the entire hadoop ecosystem repository, so installing other components such as hive, pig, Hbase, sqoop etc can be done very easily.

Ports Used By Hadoop

blogports

A Simple Mapreduce Program – Wordcount

Hello world is the trial program for almost all programming languages. Like that for hadoop-mapreduce, the trial program is wordcount, which is the basic simple mapreduce program. This program helps us in getting a good understanding of parallel processing of hadoop.

It consists of three classes.

1)      Driver class- which is the main class

2)      Mapper class- which does the map functions

3)      Reducer class- which does the reduce functions

 

Driver Class

 

import java.io.IOException;
import java.util.Date;
import java.util.Formatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountDriver {

public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();

Job job = new Job(conf, "wordcount");

job.setJarByClass(WordCountDriver.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Formatter formatter = new Formatter();
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

System.out.println(job.waitForCompletion(true));
}
}

 

Mapper Class

 

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
private final static IntWritable one = new IntWritable(1);
 
protected void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
 word.set(tokenizer.nextToken());
 context.write(word, one);
}
}
}

 

Reducer Class

 

import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
 
public class WordCountReducer extends
 Reducer<Text, IntWritable, Text, IntWritable> {
 protected void reduce(Text key, Iterable<IntWritable> values,
 Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable value : values) {
 sum += value.get();
 }
 context.write(key, new IntWritable(sum));
 }
}

The jars necessary for this code is taken from the same version of hadoop package which is installed in the cluster. If the version is different, then it will result in error.
Here the mapper class reads the input file line by line. Then inside the mapper class, we convert the line to string after that we tokenize it into words. ie each line is split into individual words. The output of the mapper class is given to the reducer. the output of the mapper is in the form of a pair.
The context.write method actually gives a key-value pair to the reducer. Here the key is the word and value is “one” which is a variable assigned to the value 1.

In the Reducer, we merges these words and counts the values attached to similar words.
For example if we give an input file as

Hi all I am fine
Hi all I am good
hello world

After Mapper class the output will be,

Hi 1
all 1
I 1
am 1
fine 1
Hi 1
all 1
I 1
am 1
good 1
hello 1
world 1

After Reducer class the output will be

Hi 2
all 2
I 2
am 2
fine 1
good 1
hello 1
world 1

Setting Up Multiple Users in Hadoop Clusters

 

 Need for multiple users

In hadoop we run different tasks and store data in  HDFS.

If several users are doing tasks using the same user account, it will be difficult to trace the jobs and track the tasks/defects done by each user.

Also the other issue is with the security.

If all are given the same user account, all users will have the same privilege and all can access everyone’s  data, can modify it, can perform execution, can delete it also.

This is a very serious issue.

For this we need to create multiple user accounts.

Benefits of Creating multiple users

1)      The directories/files of other users cannot be modified by a user.

2)      Other users cannot add new files to a user’s directory.

3)      Other users cannot perform any tasks (mapreduce etc) on a user’s files.

In short data is safe and is accessible only to the assigned user and the superuser.

Steps for setting up multiple User accounts

For adding new user capable of performing hadoop operations, do the following steps.

Step 1

Creating a New User

For Ubuntu

sudo  adduser  --ingroup   <groupname>   <username>

For RedHat variants

useradd  -g <groupname>   <username>

passwd <username>

Then enter the user details and password.

Step 2

we need to change the permission of a directory in HDFS where hadoop stores its temporary data.

Open the core-site.xml file

Find the value of hadoop.tmp.dir.

In my core-site.xml, it is /app/hadoop/tmp. In the proceeding steps, I will be using /app/hadoop/tmp as my directory for storing hadoop data ( ie value of hadoop.tmp.dir).

Then from the superuser account do the following step.

hadoop fs –chmod -R  1777 /app/hadoop/tmp/mapred/staging

Step 3

The next step is to give write permission to our user group on hadoop.tmp.dir (here /app/hadoop/tmp. Open core-site.xml to get the path for hadoop.tmp.dir). This should be done only in the machine(node) where the new user is added.

chmod 777 /app/hadoop/tmp

Step 4

The next step is to create a directory structure in HDFS for the new user.

For that from the superuser, create a directory structure.

Eg: hadoop  fs –mkdir /user/username/

Step 5

With this we will not be able to run mapreduce programs, because the ownership of the newly created directory structure is with superuser. So change the ownership of newly created directory in HDFS  to the new user.

hadoop  fs –chown –R username:groupname   <directory to access in HDFS>

Eg: hadoop fs –chown –R username:groupname  /user/username/

Step 6

login as the new user and perform hadoop jobs..

su  – username

Note: Run hadoop tasks in the assigned hdfs paths directory only ie /user/username.
Enjoy…. 🙂