Amazon’s EMR is a very easiest way to launch hadoop cluster. Amazon is providing a console as well as api interface for launching clusters. Boto is a python library for dealing with amazon web services. Boto is not only for EMR, it is for most of the amazon web services. Here I am sharing with you a small program for launching an EMR cluster using python boto. This program helps us in situations where automation is required.
In this program, the hadoop cluster will be launched with services such as Pig, Hive, Impala, Ganglia and with some user defined installation.
Bootstrapping is a process in which we can add our own custom installations while launching the cluster.
Suppose if we want to install our own custom installations in the emr cluster in all the nodes, doing the same process manually will be difficult. The bootstrap option helps us to solve this problem in a very simple way. The only thing we need to do is to write a shell script containing the custom steps and put it in an S3 bucket and specify that bucket while installation.
For writing all the amazon related programs, you can check the python boto api. All the methods and coding conventions are taken from the python boto. The best way to learn python coding conventions is by following the conventions used in the boto source code. Boto is an open source python library. I wrote this code by referring the python boto documentation. The coding standards used in this code is also similar to that in the boto.
In this program, if you don’t have a bootstrap step, you can keep it as None.
The code is given below. You can get the code from github also.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = 'Amal G Jose' | |
import time | |
import logging | |
from boto.emr.connection import EmrConnection | |
from boto.emr.bootstrap_action import BootstrapAction | |
from boto.emr.step import InstallHiveStep | |
from boto.emr.step import InstallPigStep | |
from boto.regioninfo import RegionInfo | |
#Program for launching an EMR cluster | |
class EmrLauncher(object): | |
# Default constructor of the class. | |
def __init__(self): | |
try: | |
self.zone_name = "ap-southeast-1" | |
self.access_key = "xxxxxx" | |
self.private_key = "xxxxxxx" | |
self.ec2_keyname = "xxxxxxxx" | |
self.base_bucket = "s3://emr-bucket/" | |
self.bootstrap_script = "custom-bootstrap.sh" | |
self.log_dir = "Logs" | |
self.emr_status_wait = 20 | |
self.conn = "" | |
self.cluster_name = "MyFirstEmrCluster" | |
# Establishing EmrConnection | |
self.conn = EmrConnection(self.access_key, self.private_key, | |
region=RegionInfo(name=self.zone_name, | |
endpoint=self.zone_name + '.elasticmapreduce.amazonaws.com')) | |
self.log_bucket_name = self.base_bucket + self.log_dir | |
self.bootstrap_script_name = self.base_bucket + self.bootstrap_script | |
#Method for launching the EMR cluster | |
def launch_emr_cluster(self, master_type, slave_type, num_instance, ami_version): | |
try: | |
#Custom Bootstrap step | |
bootstrap_step = BootstrapAction("CustomBootStrap", self.bootstrap_script_name, None) | |
#Modifyting block size to 256 MB | |
block_size_conf = 'dfs.block.size=256' | |
hadoop_config_params = ['-h', block_size_conf, '-h'] | |
hadoop_config_bootstrapper = BootstrapAction('hadoop-config', | |
's3://elasticmapreduce/bootstrap-actions/configure-hadoop', | |
hadoop_config_params) | |
#Bootstrapping Ganglia | |
hadoop_monitor_bootstrapper = BootstrapAction('ganglia-config', | |
's3://elasticmapreduce/bootstrap-actions/install-ganglia', '') | |
#Bootstrapping Impala | |
impala_install_params = ['–install-impala','–base-path', 's3://elasticmapreduce', '–impala-version', 'latest'] | |
bootstrap_impala_install_step = BootstrapAction("ImpalaInstall", "s3://elasticmapreduce/libs/impala/setup-impala", | |
impala_install_params) | |
#Hive installation | |
hive_install_step = InstallHiveStep(); | |
#Pig Installation | |
pig_install_step = InstallPigStep(); | |
#Launching the cluster | |
jobid = self.conn.run_jobflow( | |
self.cluster_name, | |
self.log_bucket_name, | |
bootstrap_actions=[hadoop_config_bootstrapper, hadoop_monitor_bootstrapper, bootstrap_step, | |
bootstrap_impala_install_step], | |
ec2_keyname=self.ec2_keyname, | |
steps=[hive_install_step, pig_install_step], | |
keep_alive=True, | |
action_on_failure = 'CANCEL_AND_WAIT', | |
master_instance_type=master_type, | |
slave_instance_type=slave_type, | |
num_instances=num_instance, | |
ami_version=ami_version) | |
#Enabling the termination protection | |
self.conn.set_termination_protection(jobid, True) | |
#Checking the state of EMR cluster | |
state = self.conn.describe_jobflow(jobid).state | |
while state != u'COMPLETED' and state != u'SHUTTING_DOWN' and state != u'FAILED' and state != u'WAITING': | |
#sleeping to recheck for status. | |
time.sleep(int(self.emr_status_wait)) | |
state = self.conn.describe_jobflow(jobid).state | |
if state == u'SHUTTING_DOWN' or state == u'FAILED': | |
logging.error("Launching EMR cluster failed") | |
return "ERROR" | |
#Check if the state is WAITING. Then launch the next steps | |
if state == u'WAITING': | |
#Finding the master node dns of EMR cluster | |
master_dns = self.conn.describe_jobflow(jobid).masterpublicdnsname | |
logging.info("Launched EMR Cluster Successfully") | |
logging.info("Master node DNS of EMR " + master_dns) | |
return "SUCCESS" | |
except: | |
logging.error("Launching EMR cluster failed") | |
return "FAILED" | |
#Main method of the program | |
def main(self): | |
try: | |
master_type = 'm3.xlarge' | |
slave_type = 'm3.xlarge' | |
num_instance = 3 | |
ami_version = '2.4.8' | |
emr_status = self.launch_emr_cluster(master_type, slave_type, num_instance, ami_version) | |
if emr_status == 'SUCCESS': | |
logging.info("Emr cluster launched successfully") | |
else: | |
logging.error("Emr launching failed") | |
except: | |
logging.error("Emr launching failed") | |
if __name__ == '__main__': | |
launcher = EmrLauncher() | |
launcher.main() | |