Enabling Log Aggregation in YARN

While checking the details of a YARN applications, if you are getting a message similar to “Log Aggregation not enabled”. You can follow the below steps to enable it. This issue occurs in EMR, because in most of the AMI’s the log aggregation is not enabled by default. It is very simple to enable it. Add the following configuration to the yarn-site.xml of all the yarn hosts and restart the cluster. (full cluster restart is not required. Restarting all the nodemanagers will be fine)

<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>

<property>
    <description>Where to aggregate logs to.</description>
    <name>yarn.nodemanager.remote-app-log-dir</name>
    <value>/tmp/logs</value>
</property>

<property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>259200</value>
</property>

<property>
    <name>yarn.log-aggregation.retain-check-interval-seconds</name>
    <value>3600</value>
</property>
Advertisements

Configuring Fair Scheduler in Hadoop Cluster

Hadoop comes with various scheduling algorithms such as FIFO, Capacity, Fair, DRF etc. Here I am briefly explaining about setting up fair scheduler in hadoop. This can be performed in any distribution of hadoop. By default hadoop comes with FIFO scheduler, some distribution comes with Capacity Scheduler as the default scheduler. In multiuser environments, a scheduler other than the default FIFO is definitely required. FIFO will not help us in multiuser environments because it makes us to wait in a single queue based on the order of job submission. Creating multiple job queues and assigning a portion of the cluster capacity and adding users to these queues will help us to manage and utilize the cluster resources properly.
For setting up a fair scheduler manually, we have to make some changes in the resource manager node. One is a change in the yarn-site.xml and another is the addition of a new configuration file fair-scheduler.xml
The configurations for a basic set up are given below.

Step 1:
Specify the scheduler class in the yarn-site.xml. If this property exists, replace it with the below value else add this property to the yarn-site.xml

  
<property>
   <name>yarn.resourcemanager.scheduler.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

Step 2:
Specify the Fair Scheduler allocation file. This property has to be set in yarn-site.xml. The value should be the absolute location of fair-scheduler.xml file. This file should be present locally.

 
<property>
  <name>yarn.scheduler.fair.allocation.file</name>
  <value>/etc/hadoop/conf/fair-scheduler.xml</value>
</property>

Step 3:
Create the allocation configuration file
A sample allocation file is given below. We can have advanced configurations in this allocation file. This is an allocation file with a basic set of configurations
There are five types of elements which can be set up in an allocation file

Queue element :– Representing queues. It has the following properties:

  • minResources — Setting the minimum resources of a queue
  • maxResources — Setting the maximum resources of a queue
  • maxRunningApps — Setting the maximum number of apps from a queue to run at once
  • weight — Sharing the cluster non-proportional with other queues. Default to 1
  • schedulingPolicy — Values are “fair”/”fifo”/”drf” or any class that extends
  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy
  • aclSubmitApps — Listing the users who can submit apps to the queue. If specified, other users will not be able to submit apps to the queue.
  • minSharePreemptionTimeout — Specifying the number of seconds the queue is under its minimum share before it tries to preempt containers to take resources from other queues.

User elements :– Representing user behaviors. It can contain a single properties to set maximum number apps for a particular user.

userMaxAppsDefault element :– Setting the default running app limit for users if the limit is not otherwise specified.

fairSharePreemptionTimeout element :– Setting the number of seconds a queue is under its fair share before it tries to preempt containers to take resources from other queues.

defaultQueueSchedulingPolicy element :– Specifying the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified.

 <?xml version="1.0"?>
<allocations>
 
 <queue name="queueA">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>5000 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,amal</aclSubmitApps>
 <weight>2.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueB">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sahad,amal</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueC">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sree</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <user name="amal">
 <maxRunningApps>10</maxRunningApps>
 </user>
 
 <user name="hdfs">
 <maxRunningApps>5</maxRunningApps>
 </user>
 
 <user name="sree">
 <maxRunningApps>8</maxRunningApps>
 </user>
 
 <user name="sahad">
 <maxRunningApps>2</maxRunningApps>
 </user>
 
 <userMaxAppsDefault>5</userMaxAppsDefault>
 <fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>
 </allocations>

Here we created three queues queueA, queueB and queueC and mapped users to these queues. While submitting the job, the user should specify the queue name. Only the user who has access to the queue can submit jobs to a particular queue. This is defined in the acls. Another thing is scheduling rules. If we specify scheduling rules, the jobs from a particular user will be directed automatically to a particular queue based on the rule. I am not mentioning the scheduling rule part here.

After making these changes, restart the resource manager. 

Now go to the resource manager web ui. In the left side of the UI, you can see a section named Scheduler. Click on that section, you will be able to see the newly created queues.

Now submit a job by specifying a queue name. You can use the option as below. The below option will submit the job to queueA. All the queues that we created are the sub-pools of root queue. Because of that, we have to specify queue name in the fomat parentQueue.subQueue

-Dmapred.job.queue.name=root.queueA

Eg:  hadoop jar hadoop-examples.jar wordcount -Dmapred.job.queue.name=root.queueA  <input-location>  <output-location>

If you are running a hive query, you can set these property in the below format. This property should be set at the top.

set mapred.job.queue.name=root.queueA

Impala Scratch directory issue

In impala while running queries over large data, sometimes we may get an error like this.

WARNINGS: Create file /tmp/impala-scratch/94869b99d0d6457:765d2bc009a914ad_94869b99d0d6457:765d2bc009a914af_516bff1b-7342-434e-8c95-c777bb7f237e failed with errno=2 description=Error(2): No such file or directory

Backend 1:Create file /tmp/impala-scratch/94869b99d0d6457:765d2bc009a914ad_94869b99d0d6457:765d2bc009a914af_516bff1b-7342-434e-8c95-c777bb7f237e failed with errno=2 description=Error(2): No such file or directory

One of my friends faced this issue and on investigation I found that this issue is because of the unexpected deletion of files inside the impala scratch directory. The intermediate files used during large sort, join, aggregation, or analytic function operations are stored in this scratch directory. By default the impala scratch directory is /tmp/impala-scratch. These directoroes will be deleted after the query execution. The best solution for this problem is to change the scratch directory to some other directory. This can be done by starting the impala daemon with the option –scratch_dirs=”path_to_directory”. This directory is in the local linux file system. Not in the hdfs. Impala will not start if it is not having proper read/write access to the files in the “scratch” directory.

If you are using impala in EMR cluster, to modify the start up options, make the changes in the bootstrap action. If you want to modify this conf in an existing EMR cluster, stop the service nanny in all the nodes and restart the impala with this scratch directory property. If service nanny is running, you will not be able to restart the impala with this new argument because the service nanny will perform the service restart before your restart .. 🙂

Service Nanny in AWS EMR

Service nanny is a service that runs in all the nodes of AWS EMR that controls the operation of daemons in each node.If a process gets killed because of OOM killer or overload etc, it restarts immediately and ensures that the service is alive. This service ensures that the cluster services are always alive without the problems created by unexpected exists in the services. So even if you kill a process or stop a process, it will get automatically restarted.

Recently I faced an issue with impala in AWS EMR. I was getting an error as described in this post. I was using a small  3 node EMR cluster. Instead of creating a new cluster I thought of restarting the impala daemon by specifying the additional arguments. But I was not able to perform this because the service nanny was performing the daemon start before I performing the start. So I stopped the service nanny in all the nodes and restarted impala with extra arguments and then restarted the service nanny.

We can modify service nanny control behavior by editing the config files present in /etc/service-nanny/ directory. You can see config files for each service controlled by service nanny. You can add/remove/modify the control actions  by adding/removing/modifying the config files.

Rhipe on AWS (YARN and MRv1)

Rhipe is an R library that runs on top of hadoop. Rhipe is using hadoop streaming concept for running R programs in hadoop. To know more about Rhipe, please check my older post. My previous post on Rhipe was the basic explanation and the installation steps for running Rhipe in cdh4(MRv1). Now yarn became popular and almost everyone are using YARN. So a lot of people asked me assistance for installing Rhipe in YARN. Rhipe works on yarn very well. Here I am just giving a pointer on how to install Rhipe on AWS (Amazon Web Services). I checked this script and it is working fine. This contains the bootstrap script and installables that installs Rhipe automatically in AWS. For those who are new to AWS, I will explain the basics of AWS EMR and bootstrap script. Amazon Web Services are providing a lot of cloud services. Among that Elastic Mapreduce(EMR) is a service that provides a hadoop cluster. This is one of the best solution for users who don’t want to maintain a data center and don’t want to take the headaches of hadoop administration. AWS is providing a list of components for installing in the hadoop cluster. Those services we can choose while installing the hadoop cluster through the web console. Examples for such components are hive, pig, impala, hue, hbase, hunk etc. But in most of the cases, user may require some extra softwares also. This extra requirement depends on user. If the user try to install the extra service manually in the cluster, it will take lot of time. The automated cluster launch will take less than 10 minutes.( I tried for around 100 nodes). But if you install the software in all of these nodes manually, it will take several hours. For this problem, amazon is providing a solution. User can provide any custom shell scripts and these scripts will be executed on all the nodes while installing the hadoop. This script is called bootstrap script. Here we are installing Rhipe using a bootstrap script. For users who want to install Rhipe on  AWS Hadoop MRv1, you can follow this url. Please ensure that you are using the correct AMI. AMI is Amazon Machine Image. This is just a version of the image that they are providing. For those users who want to install Rhipe on AWS Hadoop MRv2 (YARN), you can follow this url. This will work perfectly on AWS AMI 3.2.1. You can download the github repo in your local and put it your S3. Then launch the cluster by specifying the details mentioned in the installation doc.

For non-aws users

For those users who want to install Rhipe on yarn (Any hadoop cluster), you can either build the Rhipe for their corresponding version of hadoop and put that jar inside Rhipe directory or you can directly try using the ready made rhipe for YARN. All the Rhipe versions are available in a common repository. You can download the installable from this location. You have to follow the steps mentioned in the all the shell scripts present in the given repository. This is a manual activity and you have to do this activity on all the nodes in your hadoop cluster.

List and Tag all EC2 & EBS instances

Now a days most of the software engineers are using cloud services. Amazon is one of the key players among cloud service providers.
While dealing with the cloud services, we have to monitor the usage and billing periodically.
Otherwise, end of the month we may get huge bill because of some continuously running unused instances.
Since we can launch instances in multiple regions, it may become difficult to track all the running instances every day.

Here I developed a small piece of code in python that lists all the running EC2 instances (including EMR) and EBS volumes in all regions/specific regions.
This code also has a method to tag all the instances with your custom Tags.
Tagging helps in identifying machines used by different people/team in a shared environment.
If the number of instances are less, manual tagging is not a difficult task. But tagging large number of instances manually is a tedious task.
This program may help you in tagging any number of instances within few seconds.

Launching an EMR cluster using Python program

Amazon’s EMR is a very easiest way to launch hadoop cluster. Amazon is providing a console as well as api interface for launching clusters. Boto is a python library for dealing with amazon web services. Boto is not only for EMR, it is for most of the amazon web services. Here I am sharing with you a small program for launching an EMR cluster using python boto. This program helps us in situations where automation is required.
In this program, the hadoop cluster will be launched with services such as Pig, Hive, Impala, Ganglia and with some user defined installation.
Bootstrapping is a process in which we can add our own custom installations while launching the cluster.
Suppose if we want to install our own custom installations in the emr cluster in all the nodes, doing the same process manually will be difficult. The bootstrap option helps us to solve this problem in a very simple way. The only thing we need to do is to write a shell script containing the custom steps and put it in an S3 bucket and specify that bucket while installation.

For writing all the amazon related programs, you can check the python boto api. All the methods and coding conventions are taken from the python boto. The best way to learn python coding conventions is by following the conventions used in the boto source code. Boto is an open source python library. I wrote this code by referring the python boto documentation. The coding standards used in this code is also similar to that in the boto.

In this program, if you don’t have a bootstrap step, you can keep it as None.

The code is given below. You can get the code from github also.

What is EMR ?

What is  EMR.?

EMR is a cloud service provided by amazon. Its full form is Elastic Mapreduce.

We can launch hadoop clusters of our desired size in few minutes using this service.

We can simply increase or decrease the number of nodes in the cluster while running without any disturbance. That is why it is called as Elastic. It is very simple to operate and doesn’t require much administration skills. Pay for whatever we use, no need of server room , cooling mechanism, power backup etc. We will get everything very fast for affordable amount. We can configure hadoop , hadoop ecosystem components such as hive, pig, impala etc in an emr cluster.

Now shark and spark are also available with EMR. If we need any additional services to be iinstalled in our cluster, we can create our own custom bootstrap script for installing those services in the cluster and add the script while launching the cluster.

There are three types of nodes in an EMR cluster. Master, Core and Task.

Master node contains the master daemons in hadoop cluster such as Namenode and Jobtracker for MRv1 and Namenode and Resource Manager in case of YARN. Core node contains Datanode and Tasktracker for MRv1 and Datanode and Node manager for YARN. Task nodes contains the processing daemons only,ie tasktracker or nodemanager. After launching a cluster we can increase the number of Core nodes and Task nodes, but we can decrease only the number of task nodes. We can’t reduce the number of core nodes, because core nodes contains datanodes which will store, decreasing the number of datanodes may result in data loss.

A super cool library called Boto is available in python for dealing with EMR.

Why EMR cannot be launched in all type of VPCs.?

For launching an EMR, the VPC should have an internet gateway and a subnet. So if internet is restricted in the VPC, EMR cannot be launched. The reason for this is, while launching an EMR, it contacts with some remote locations for downloading the required softwares and installation scripts. So if internet is not available, that connection will be blocked which results in installation failure.