Heterogeneous storages in HDFS

From hadoop 2.3.0 onwards, hdfs supports heterogeneous storage. What is this heterogeneous storage? What are the advantages of using this?.

Hadoop came as a processing system for processing and storing huge data, a scalable batch processing system. But now it became the platform for DataLake for Enterprises. In large enterprises, various types of data needs to be stored and processed for advanced analytics. Some of these data are required frequently, some are not required frequently, some are required very rarely. If we store all these in the same platform or hardware, the cost will be more. For example, if we are using a cluster in AWS. We have EC2 nodes for our cluster nodes. EC2 uses EBS and ephemeral storage. Depending upon the type of storage, the cost varies. S3 storage is cheaper than EBS storage, but access speed will be less. Similarly glacier will be cheaper compared to S3, but again the data retrieval will take time. Similarly, if we want to keep data in different storage types depending upon the priority and requirement, we can use this feature in hadoop. This feature was not available in earlier versions of hadoop. This is available in hadoop version 2.3.0 onwards. Now datanode can be defined as a collection of storages. Various storage policies available in hadoop are Hot, Warm, Cold, All_SSD, One_SSD and Lazy_Persist.

  • Hot – for both storage and compute. The data that is popular and still being used for processing will stay in this policy. When a block is hot, all replicas are stored in DISK.
  • Cold – only for storage with limited compute. The data that is no longer being used, or data that needs to be archived is moved from hot storage to cold storage. When a block is cold, all replicas are stored in ARCHIVE.
  • Warm – partially hot and partially cold. When a block is warm, some of its replicas are stored in DISK and the remaining replicas are stored in ARCHIVE.
  • All_SSD – for storing all replicas in SSD.
  • One_SSD – for storing one of the replicas in SSD. The remaining replicas are stored in DISK.
  • Lazy_Persist – for writing blocks with single replica in memory. The replica is first written in RAM_DISK and then it is lazily persisted in DISK.
Advertisements

Unauthorized connection for super-user: hue from IP “x.x.x.x”

If you are getting the following error in hue,

Unauthorized connection for superuser: hue from IP “x.x.x.x”

Add the following property in the core-site.xml of your hadoop cluster and restart the cluster

<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>

<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>

You may face similar error with oozie also. In that case add a similar conf for oozie user in the core-sire.xml

<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>

<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>

Configuring Fair Scheduler in Hadoop Cluster

Hadoop comes with various scheduling algorithms such as FIFO, Capacity, Fair, DRF etc. Here I am briefly explaining about setting up fair scheduler in hadoop. This can be performed in any distribution of hadoop. By default hadoop comes with FIFO scheduler, some distribution comes with Capacity Scheduler as the default scheduler. In multiuser environments, a scheduler other than the default FIFO is definitely required. FIFO will not help us in multiuser environments because it makes us to wait in a single queue based on the order of job submission. Creating multiple job queues and assigning a portion of the cluster capacity and adding users to these queues will help us to manage and utilize the cluster resources properly.
For setting up a fair scheduler manually, we have to make some changes in the resource manager node. One is a change in the yarn-site.xml and another is the addition of a new configuration file fair-scheduler.xml
The configurations for a basic set up are given below.

Step 1:
Specify the scheduler class in the yarn-site.xml. If this property exists, replace it with the below value else add this property to the yarn-site.xml

  
<property>
   <name>yarn.resourcemanager.scheduler.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

Step 2:
Specify the Fair Scheduler allocation file. This property has to be set in yarn-site.xml. The value should be the absolute location of fair-scheduler.xml file. This file should be present locally.

 
<property>
  <name>yarn.scheduler.fair.allocation.file</name>
  <value>/etc/hadoop/conf/fair-scheduler.xml</value>
</property>

Step 3:
Create the allocation configuration file
A sample allocation file is given below. We can have advanced configurations in this allocation file. This is an allocation file with a basic set of configurations
There are five types of elements which can be set up in an allocation file

Queue element :– Representing queues. It has the following properties:

  • minResources — Setting the minimum resources of a queue
  • maxResources — Setting the maximum resources of a queue
  • maxRunningApps — Setting the maximum number of apps from a queue to run at once
  • weight — Sharing the cluster non-proportional with other queues. Default to 1
  • schedulingPolicy — Values are “fair”/”fifo”/”drf” or any class that extends
  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy
  • aclSubmitApps — Listing the users who can submit apps to the queue. If specified, other users will not be able to submit apps to the queue.
  • minSharePreemptionTimeout — Specifying the number of seconds the queue is under its minimum share before it tries to preempt containers to take resources from other queues.

User elements :– Representing user behaviors. It can contain a single properties to set maximum number apps for a particular user.

userMaxAppsDefault element :– Setting the default running app limit for users if the limit is not otherwise specified.

fairSharePreemptionTimeout element :– Setting the number of seconds a queue is under its fair share before it tries to preempt containers to take resources from other queues.

defaultQueueSchedulingPolicy element :– Specifying the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified.

 <?xml version="1.0"?>
<allocations>
 
 <queue name="queueA">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>5000 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,amal</aclSubmitApps>
 <weight>2.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueB">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sahad,amal</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <queue name="queueC">
 <minResources>1000 mb, 1 vcores</minResources>
 <maxResources>2500 mb, 1 vcores</maxResources>
 <maxRunningApps>10</maxRunningApps>
 <aclSubmitApps>hdfs,sree</aclSubmitApps>
 <weight>1.0</weight>
 <schedulingPolicy>fair</schedulingPolicy>
 </queue>
 
 <user name="amal">
 <maxRunningApps>10</maxRunningApps>
 </user>
 
 <user name="hdfs">
 <maxRunningApps>5</maxRunningApps>
 </user>
 
 <user name="sree">
 <maxRunningApps>8</maxRunningApps>
 </user>
 
 <user name="sahad">
 <maxRunningApps>2</maxRunningApps>
 </user>
 
 <userMaxAppsDefault>5</userMaxAppsDefault>
 <fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>
 </allocations>

Here we created three queues queueA, queueB and queueC and mapped users to these queues. While submitting the job, the user should specify the queue name. Only the user who has access to the queue can submit jobs to a particular queue. This is defined in the acls. Another thing is scheduling rules. If we specify scheduling rules, the jobs from a particular user will be directed automatically to a particular queue based on the rule. I am not mentioning the scheduling rule part here.

After making these changes, restart the resource manager. 

Now go to the resource manager web ui. In the left side of the UI, you can see a section named Scheduler. Click on that section, you will be able to see the newly created queues.

Now submit a job by specifying a queue name. You can use the option as below. The below option will submit the job to queueA. All the queues that we created are the sub-pools of root queue. Because of that, we have to specify queue name in the fomat parentQueue.subQueue

-Dmapred.job.queue.name=root.queueA

Eg:  hadoop jar hadoop-examples.jar wordcount -Dmapred.job.queue.name=root.queueA  <input-location>  <output-location>

If you are running a hive query, you can set these property in the below format. This property should be set at the top.

set mapred.job.queue.name=root.queueA

Hadoop Cluster Migrator

Hadoop Cluster Migrator tool provides a unified interface for copying data from one cluster to another cluster. Traditionally, DistCpy tool provided by Hadoop is used to migrate and copy the data from one Hadoop cluster to other. However, Distcpy works only when the connectivity between the source and target cluster is well established without any firewall rules blocking this connectivity. But in production scenarios, the edge node isolate the clusters from each other and Distcpy can’t be used for transfer of data or backup of the cluster. This is where Hadoop Cluster Migrator from Knowledge Lens can be very handy.

Hadoop Cluster Migrator is a cluster agnostic tool, which supports migration across different distribution, different version of Hadoop. Currently we support MapR, Cloudera, Hortworks, EMC Pivotal and Apache distribution of Hadoop, both in Kerberos enabled and disabled mode.

This completely Java based tool provides large scale data transfer between the cluster with transfer rate in the range of 10GB/s depending upon the bandwidth available. The tool is completely restartable and restarts from the point where the last transfer was stopped.

For more details refer : Hadoop Cluster Migrator

migrator

Hadoop Mapreduce- A Good Presentation…

This is a video  I found from youtube that explains hadoop mapreduce very clearly using the wordcount example.

Deployment and Management of Hadoop Clusters

Back Up Mechanism for Namenode

Namenode is the single point of failure in hadoop cluster. Because it stores the metadata of the entire hadoop system.
So extra care should be given in maintaining it. We use the best hardware for namenode machines.
Even if we use best hardware, complete protection cannot be guarenteed, because hardware issues can happen at anytime. So a backup for namenode is very necessary.
One of the methods is creating a simple backup storage by mounting the partition of another machine located in a different place to the namenode machine.
The back up machine should have the same hardware/software specifications as of namenode machine and installed with hadoop similar to namenode machine. But hadoop services are not started in that machine.
Incase of failure, we can start namenode in this backup machine and it runs like normal namenode. The only thing we need to do is assigning ipaddress/hostname of actual namenode to the backup namenode.

In the hdfs-site.xml, we are giving an additional value to dfs.name.dir property.
ie actual location, backup location.

Eg:

<property>
<name>dfs.name.dir</name>
<value>/app/hadoop/name,/app/hadoop/backup</value>
<description>
Determines where on the local filesystem the DFS name node should store the name table(fsimage). If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy. 
</description>
</property>

here /app/hadoop/name is the actual namenode storage location and /app/hadoop/backup is the location where the partition is mounted for storing the namenode backup.
In case of failure of the first namenode machine, the namenode data will be safe in the second machine(backup), so we can start the namenode in the second machine.
The second machine is placed in different location and is provided with a differnt power supply, so that the dependencies of both the machines will be different, thus making an efficient backup.

Recovery of deleted files in Hadoop

There may be incidents which we accidently delete necessary files from hadoop. Sometimes the entire file system may get deleted. For doing recovery process the below steps may help you.

For doing this recovery method  trash should be enabled in hdfs. Trash can be enabled by setting the property  fs.trash.interval greater than 0. By default the value is zero.  Its value is number of minutes after which the checkpoint gets deleted. If zero, the trash feature is disabled. We have to set this property in core-site.xml.

<property>
  <name>fs.trash.interval</name>
  <value>30</value>
  <description>Number of minutes after which the checkpoint
  gets deleted.
  If zero, the trash feature is disabled.
  </description>
</property>

There is one more property which is having relation with the above property called fs.trash.checkpoint.interval. It is the number of minutes between trash checkpoints. This should be smaller or equal to  fs.trash.interval. Everytime the checkpointer runs, it creates a new checkpoint out of current and removes checkpoints created more than fs.trash.interval minutes ago.The default value of this property is zero.

<property>
  <name>fs.trash.checkpoint.interval</name>
  <value>15</value>
  <description>Number of minutes between trash checkpoints.
  Should be smaller or equal to fs.trash.interval.
  Every time the checkpointer runs it creates a new checkpoint 
  out of current and removes checkpoints created more than 
  fs.trash.interval minutes ago.
  </description>
</property>

If the above properties are enabled in your cluster. Then the deleted files will be present in .Trash directory of hdfs. You have time to recover the files until the next checkpoint occurs. After the new checkpoint the deleted files will not be present in the .Trash. So recover before the new checkpoint. If this property is not enabled in your cluster,  you can enable this for future recovery.. 🙂