Configuring Fair Scheduler in Hadoop Cluster

Hadoop comes with various scheduling algorithms such as FIFO, Capacity, Fair, DRF etc. Here I am briefly explaining about setting up fair scheduler in hadoop. This can be performed in any distribution of hadoop. By default hadoop comes with FIFO scheduler, some distribution comes with Capacity Scheduler as the default scheduler. In multiuser environments, a scheduler other than the default FIFO is definitely required. FIFO will not help us in multiuser environments because it makes us to wait in a single queue based on the order of job submission. Creating multiple job queues and assigning a portion of the cluster capacity and adding users to these queues will help us to manage and utilize the cluster resources properly.
For setting up a fair scheduler manually, we have to make some changes in the resource manager node. One is a change in the yarn-site.xml and another is the addition of a new configuration file fair-scheduler.xml
The configurations for a basic set up are given below.

Step 1:
Specify the scheduler class in the yarn-site.xml. If this property exists, replace it with the below value else add this property to the yarn-site.xml

  
<property>
   <name>yarn.resourcemanager.scheduler.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

Step 2:
Specify the Fair Scheduler allocation file. This property has to be set in yarn-site.xml. The value should be the absolute location of fair-scheduler.xml file. This file should be present locally.

 
<property>
  <name>yarn.scheduler.fair.allocation.file</name>
  <value>/etc/hadoop/conf/fair-scheduler.xml</value>
</property>

Step 3:
Create the allocation configuration file
A sample allocation file is given below. We can have advanced configurations in this allocation file. This is an allocation file with a basic set of configurations
There are five types of elements which can be set up in an allocation file

Queue element :– Representing queues. It has the following properties:

  • minResources — Setting the minimum resources of a queue
  • maxResources — Setting the maximum resources of a queue
  • maxRunningApps — Setting the maximum number of apps from a queue to run at once
  • weight — Sharing the cluster non-proportional with other queues. Default to 1
  • schedulingPolicy — Values are “fair”/”fifo”/”drf” or any class that extends
  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy
  • aclSubmitApps — Listing the users who can submit apps to the queue. If specified, other users will not be able to submit apps to the queue.
  • minSharePreemptionTimeout — Specifying the number of seconds the queue is under its minimum share before it tries to preempt containers to take resources from other queues.

User elements :– Representing user behaviors. It can contain a single properties to set maximum number apps for a particular user.

userMaxAppsDefault element :– Setting the default running app limit for users if the limit is not otherwise specified.

fairSharePreemptionTimeout element :– Setting the number of seconds a queue is under its fair share before it tries to preempt containers to take resources from other queues.

defaultQueueSchedulingPolicy element :– Specifying the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified.

 <?xml version="1.0"?>
<allocations>
 
 <queue name="queueA">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>5000 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,amal</aclSubmitApps>
 <weight>2.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueB">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sahad,amal</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueC">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sree</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <user name="amal">
 <maxRunningApps>10</maxRunningApps>
 </user>
 
 <user name="hdfs">
 <maxRunningApps>5</maxRunningApps>
 </user>
 
 <user name="sree">
 <maxRunningApps>8</maxRunningApps>
 </user>
 
 <user name="sahad">
 <maxRunningApps>2</maxRunningApps>
 </user>
 
 <userMaxAppsDefault>5</userMaxAppsDefault>
 <fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>
 </allocations>

Here we created three queues queueA, queueB and queueC and mapped users to these queues. While submitting the job, the user should specify the queue name. Only the user who has access to the queue can submit jobs to a particular queue. This is defined in the acls. Another thing is scheduling rules. If we specify scheduling rules, the jobs from a particular user will be directed automatically to a particular queue based on the rule. I am not mentioning the scheduling rule part here.

After making these changes, restart the resource manager. 

Now go to the resource manager web ui. In the left side of the UI, you can see a section named Scheduler. Click on that section, you will be able to see the newly created queues.

Now submit a job by specifying a queue name. You can use the option as below. The below option will submit the job to queueA. All the queues that we created are the sub-pools of root queue. Because of that, we have to specify queue name in the fomat parentQueue.subQueue

-Dmapred.job.queue.name=root.queueA

Eg:  hadoop jar hadoop-examples.jar wordcount -Dmapred.job.queue.name=root.queueA  <input-location>  <output-location>

If you are running a hive query, you can set these property in the below format. This property should be set at the top.

set mapred.job.queue.name=root.queueA
Advertisements

Installing Cloudera Manager in an existing hadoop cluster

Cloudera Manager is an Infrastructure management and monitoring tool provided by cloudera. This has now became a very excellent tool to manage bigdata infrastructure. The pain of administrators has been reduced by 80% with this cloudera manager. Almost everything required for an administrator is integrated into this great software and is very user friendly. Cloudera Manager became this muhc powerful recently. So lot of existing clusters are still running without using cloudera manager. If you want to manage an existing cluster using cloudera manager, the following steps may help you. For this you have to completely uninstall the existing hadoop set up. No data loss will happen because we are not touching any data. The configurations also will remain the same. These are just pointers.

1) Stop all the services
2) Back up hive metastore, Namenode metadata and all the other required metastores (Eg hue, oozie)
3) Back up all the configurations
4) Note down the existing storage directories
5) Uninstall all the hadoop services (Never touch the data)
6) Install Cloudera Manager Server and Agent
7) Install all the services (It should be same version as that of previous to make installation smoother)
8) Add the configurations (Use the same configurations as that of previous. There is an option to add xml configs in CM)
9) Point the storage directories in the cloudera manager configurations.
10) Point the new installation to the existing metastore (hive, oozie, hue etc)
11) Start all the services (Don’t format the namenode)
12) Test the cluster

Notification on completion of Mapreduce jobs

Heavy mapreduce jobs may run for several hours. There can be several jobs and checking the status of mapreduce jobs manually will be a boring task. I don’t like this  J. If we try to manage java programs using a script, it will not be a clean approach. Using scripts for managing java programs is bad approach. I consider these kind of designs as worst designs.

My requirement was to get notification on completion of mapreduce jobs. These are some critical mapreduce jobs and I don’t want to frequently check the status and wait for its completion.

Hadoop is providing a useful configuration to solve my problem. It is very easy to achieve this solution. Just a few lines of code will help us. Add these three lines to the Driver class

conf.set("job.end.notification.url", "http://myserverhost:8888/notification/jobId=$jobId?status=$jobStatus");
conf.setInt("job.end.retry.attempts", 3);
conf.setInt("job.end.retry.interval", 1000);

By setting these properties, hadoop sends an http request on completion of the job. We need a small piece of code for creating a webservice that accepts this http request and send email. For creating the webservice and email utility I used python language because of simplicity. Once the mapreduce job completes, it sends an http request to the URL mentioned by the configuration job.end.notification.url. The variables jobId and jobStatus will be replaced with the actual values. Once a request comes to the webservice, it will parse the arguments and call the email sending module. This is a very simple example. Instead of email, we can make different kind of notifications such as sms, phone call or triggering some other application etc. The property job.end.notification.url  is very helpful in tracking asynchronous mapreduce jobs. We can trigger another action also using this trigger. This is a clean approach because we are not running any other script to track the status of the job. The job itself is providing the status. We are using the python program for just collecting the status and making notifications using the status.

The python code for the webservice and email notification are given below.

Migrating Namenode from one host to another host

Namenode is the heart of the hadoop cluster. So namenode will be installed in a good quality machine compared to the other nodes. If we want to migrate namenode from one node to another node, the following steps are required. This is a rare scenario.

Manual Approach

Method 1: (By migrating the harddrive)

  • Stop all the running jobs in the cluster
  • Enter into Namenode Safe
    • hdfs dfsadmin -safemode enter
  • Execute the following command to save the currrent namespace to the storage directories and reset editlogs..
    • hdfs dfsadmin -saveNamespace
  • Stop the entire cluster
  • Remove the hard disk from the old namenode host and attach it to the new namenode host
  • Release the ipaddress from the old namenode host and assign it to the new namenode host
  • Start the new namenode (DO NOT PERFORM FORMAT)
  • Start all the services

Method 2: (New Harddrive)

  • Stop all the running jobs in the cluster
  • Enter into Namenode Safe
    • hdfs dfsadmin -safemode enter
  • Execute the following command to save the currrent namespace to the storage directories and reset editlogs..
    • hdfs dfsadmin -saveNamespace
  • Stop the entire cluster
  • Login to the namenode host.
  • Navigate to the namenode storage directories.
  • Copy the namenode metadata. Always better to keep this as a compressed file. Notedown the folder and file permissions & ownership.
  • Take a back up of the configuration files.
  • Install namenode of the same version as that of the existing system to the new machine.
  • Ensure that the ipaddress of the old host is taken and assigned to the new host.
  • Copy the configuration files and metadata to the new namenode host
  • Create namenode storage directory structure in the new host.
  • Maintain the same folder permissions and ownership in the new host also.
  • If there are any changes in namenode directory structure, make the corresponding changes in config files.
  • Incase of a kerberised cluster, create appropriate principles for the new host and place the proper keytabs.
  • Start the new namenode. (DO NOT PERFORM FORMAT)
  • Start the remaining services.
  • Test the working of the cluster by executing file system operations as well as MR operations.

Automated Approach in a cluster managed using Cloudera Manager (CM above 5.4)

If you are using cloudera manager 5.4 or above, there is a new feature known as Namenode Role Migration that helps us to migrate namenode from one host to another. This requires HDFS HA to be enabled.

Find the file name corresponding to a record in hive

Every table in hive has two virtual columns. They are

  • INPUT__FILE__NAME
  • BLOCK__OFFSET__INSIDE__FILE

INPUT__FILE__NAME give the name of the file.

BLOCK__OFFSET__INSIDE__FILE is the current global file position.

Suppose if we want to find the name of the file corresponding to each record in a file. We can use the INPUT__FILE__NAME column. This feature is available from hive versions above 0.8. A small example is given below.

Table Customer Details

create table customer_details ( name string, phone_number string) row format delimited fields terminated by ',';

Sample data set

datafile1.csv

amal,9876543210
sreesankar,8976543210
sahad,7896543210

datafile2.csv

sivaram76896543210
rupak,7568943210
venkatesh,9087654321

Query

select INPUT__FILE__NAME, name from customer_data;

This will give us the file name corresponding to each record. If you want to get the file names corresponding to a hive table, the below query will help you.

select distinct(INPUT__FILE__NAME) from customer_data;

Load Balancers – HA Proxy and ELB

ELB

HAProxy

Earlier I wondered how the sites like google handles the large number of requests reaching there. Later I came to know that there is a concept of load balancing. Using load balancing we can keep multiple servers in the back end and route the incoming requests to the back end servers. This will ensure faster response as well as high availability. This Load balancers play a very important role. There are a lot of opensource load balancers as well as paid services. HAProxy is one of the opensource load balancer. Amazon is providing a Load Balancer as a service known as Elastic Load Balancer (ELB).Using the load balancer, we can handle very large number of requests in a very reliable and optimal way. We can use this load balancer in Impala for load balancing the requests hitting the impala server. For on-premise environments, we can configure HAProxy and for cloud environments, we can use ELB.The ELB is a ready to use service, we just have to add the details of ports to be forwarded and the listener machines. HA Proxy is a very simple application that is available in the linux repositories. It is very easy to configure also.

Hive error in a sentry enabled cluster – “add jar” command throws “Insufficient privileges to execute add” –

Apache Sentry is a system for enforcing fine grained role based authorization to data and metadata stored on a Hadoop cluster. This is a very useful system for securing a cluster. Using sentry we can configure fine grained access to databases, directories, tables in hive and impala. Before sentry, the only way to limit access is using hdfs directory permissions and that is also not effective.

In a sentry enabled cluster, while adding jars using the command “add jar”, you will face an exception as below.

"Insufficient privileges to execute add"

You will not be able to perform add jar command from admin user also. Sentry is limiting the access the privilege to add jar from hue or beeline. For this problem, the solution is to add jar with the help of an admin globally using hive.aux.jars.path.

Impala Scratch directory issue

In impala while running queries over large data, sometimes we may get an error like this.

WARNINGS: Create file /tmp/impala-scratch/94869b99d0d6457:765d2bc009a914ad_94869b99d0d6457:765d2bc009a914af_516bff1b-7342-434e-8c95-c777bb7f237e failed with errno=2 description=Error(2): No such file or directory

Backend 1:Create file /tmp/impala-scratch/94869b99d0d6457:765d2bc009a914ad_94869b99d0d6457:765d2bc009a914af_516bff1b-7342-434e-8c95-c777bb7f237e failed with errno=2 description=Error(2): No such file or directory

One of my friends faced this issue and on investigation I found that this issue is because of the unexpected deletion of files inside the impala scratch directory. The intermediate files used during large sort, join, aggregation, or analytic function operations are stored in this scratch directory. By default the impala scratch directory is /tmp/impala-scratch. These directoroes will be deleted after the query execution. The best solution for this problem is to change the scratch directory to some other directory. This can be done by starting the impala daemon with the option –scratch_dirs=”path_to_directory”. This directory is in the local linux file system. Not in the hdfs. Impala will not start if it is not having proper read/write access to the files in the “scratch” directory.

If you are using impala in EMR cluster, to modify the start up options, make the changes in the bootstrap action. If you want to modify this conf in an existing EMR cluster, stop the service nanny in all the nodes and restart the impala with this scratch directory property. If service nanny is running, you will not be able to restart the impala with this new argument because the service nanny will perform the service restart before your restart .. 🙂

Introduction to Apache Spark

Big Data is very hot in market. Hadoop is one of the top rated technologies in big data. Hadoop became very popular in the market because of its elegant design, its capability to handle large structured/unstructured/semi-structured data and the better community support. Hadoop is a batch processing framework that can process data of any size. The only thing Hadoop guarantees is it will not fail because of load. Initially the requirement was to handle large data without any failure. This led to the design and development of frameworks such as hadoop. After that people started thinking about the performance improvements that can be made in this processing. This led to the development of a technology called spark.

Spark is an open source technology for processing large data in a distributed manner with some extra features compared to mapreduce. The processing speed of spark is higher than that of mapreduce. Most current cluster programming models are based on directed acyclic data flow from stable storage to stable storage. Acyclic data flow is inefficient for applications that repeatedly reuse a working set of data. The main motivation behind the development of spark is because of the inefficient handling of two types of applications such as Iterative Algorithms and Interactive data mining tools in the current computing frameworks. With current frameworks, applications reload data from stable storage on each query. If the reload of the same data happens multiple times, it will consume more time. This affects the processing speed. If this happens in case of large data processing, the time loss will be high. If we store the intermediate results of a process in memory and share the in-memory copy of results across the cluster resources, the time delay will be less which will results in performance improvement. In this way we can say that the performance improvement is higher with in-memory computations. The inability to keep intermediate results in memory is one of the major drawback in most of the popular distributed data processing technologies. This is requirement of in-memory computation is mainly in iterative Algorithms and data mining applications. Spark achieves this in memory computation with RDDs. The back end of spark is RDD (Resilient Distributed Datasets).

RDD is a distributed memory abstraction that helps programmers to perform in-memory computation on very large clusters in an error free manner. An RDD is a read-only, partitioned collection of records. An RDD has enough information about how it was derived from other datasets. RDDs are immutable collections of objects spread across a cluster.

spark

Spark is rich with several features because of the modules build on spark.

  • Spark Streaming: processing real-time data streams
  • Spark SQL and DataFrames: support for structured data and relational queries
  • MLlib: built-in machine learning library
  • GraphX: Spark’s new API for graph processing
  • Bagel (Pregel on Spark): older, simple graph processing model

Is spark a replacement of hadoop ?

Spark is not a replacement for hadoop. It can work along with hadoop. It can use hadoop’s file system-HDFS as the storage layer. It can run on the existing hadoop cluster. Now spark became one of the most active projects in the hadoop ecosystem. The comparison happens only with the processing layer-Mapreduce. As per the current test results, spark is performing much better than mapreduce. Spark has several advantages of mapreduce. Spark is still under development and more features are coming up. The realtime stream processing is better in spark compared to other ecosystem components in hadoop. The detailed performance report of spark is available in the following url.

http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html

Is Spark free or Licensed?

Spark is a 100 % open source project. Now it became an apache project with several committers and contributors across the world.

What are the attracting features in spark in comparison with Mapreduce ?

  • Spark is having Java, Scala and Python APIs.
  • Programming spark is simpler as compared to programming mapreduce. This reduces the development time.
  • The performance of spark is better compared to mapreduce. It is best suited for computations such as realtime processing, iterative computations etc on similar data.
  • Caching is one main feature in spark. Spark stores the intermediate result in memory across its distributed workers. Mapreduce stores the intermediate results on disk. The in memory caching feature of spark makes it faster. The spark streaming provides a realtime data processing feature on the fly of data flow which is missing in case of mapreduce.
  • With spark, it is possible to obtain batch processing, streaming processing, graph processing and machine learning in the same cluster. This provides better resource utilization and easy resource management.
  • Spark has an excellent feature of spilling the data partitions to disk if the node is not having sufficient RAM for storing the data partitions.
  • All these features made spark a very powerful member in the bigdata technology stack and this will be the one of the hottest technologies that is going to capture the market.