Advertisements

Load Balancers – HA Proxy and ELB

ELB

HAProxy

Earlier I wondered how the sites like google handles the large number of requests reaching there. Later I came to know that there is a concept of load balancing. Using load balancing we can keep multiple servers in the back end and route the incoming requests to the back end servers. This will ensure faster response as well as high availability. This Load balancers play a very important role. There are a lot of opensource load balancers as well as paid services. HAProxy is one of the opensource load balancer. Amazon is providing a Load Balancer as a service known as Elastic Load Balancer (ELB).Using the load balancer, we can handle very large number of requests in a very reliable and optimal way. We can use this load balancer in Impala for load balancing the requests hitting the impala server. For on-premise environments, we can configure HAProxy and for cloud environments, we can use ELB.The ELB is a ready to use service, we just have to add the details of ports to be forwarded and the listener machines. HA Proxy is a very simple application that is available in the linux repositories. It is very easy to configure also.

Advertisements

Introduction to Apache Spark

Big Data is very hot in market. Hadoop is one of the top rated technologies in big data. Hadoop became very popular in the market because of its elegant design, its capability to handle large structured/unstructured/semi-structured data and the better community support. Hadoop is a batch processing framework that can process data of any size. The only thing Hadoop guarantees is it will not fail because of load. Initially the requirement was to handle large data without any failure. This led to the design and development of frameworks such as hadoop. After that people started thinking about the performance improvements that can be made in this processing. This led to the development of a technology called spark.

Spark is an open source technology for processing large data in a distributed manner with some extra features compared to mapreduce. The processing speed of spark is higher than that of mapreduce. Most current cluster programming models are based on directed acyclic data flow from stable storage to stable storage. Acyclic data flow is inefficient for applications that repeatedly reuse a working set of data. The main motivation behind the development of spark is because of the inefficient handling of two types of applications such as Iterative Algorithms and Interactive data mining tools in the current computing frameworks. With current frameworks, applications reload data from stable storage on each query. If the reload of the same data happens multiple times, it will consume more time. This affects the processing speed. If this happens in case of large data processing, the time loss will be high. If we store the intermediate results of a process in memory and share the in-memory copy of results across the cluster resources, the time delay will be less which will results in performance improvement. In this way we can say that the performance improvement is higher with in-memory computations. The inability to keep intermediate results in memory is one of the major drawback in most of the popular distributed data processing technologies. This is requirement of in-memory computation is mainly in iterative Algorithms and data mining applications. Spark achieves this in memory computation with RDDs. The back end of spark is RDD (Resilient Distributed Datasets).

RDD is a distributed memory abstraction that helps programmers to perform in-memory computation on very large clusters in an error free manner. An RDD is a read-only, partitioned collection of records. An RDD has enough information about how it was derived from other datasets. RDDs are immutable collections of objects spread across a cluster.

spark

Spark is rich with several features because of the modules build on spark.

  • Spark Streaming: processing real-time data streams
  • Spark SQL and DataFrames: support for structured data and relational queries
  • MLlib: built-in machine learning library
  • GraphX: Spark’s new API for graph processing
  • Bagel (Pregel on Spark): older, simple graph processing model

Is spark a replacement of hadoop ?

Spark is not a replacement for hadoop. It can work along with hadoop. It can use hadoop’s file system-HDFS as the storage layer. It can run on the existing hadoop cluster. Now spark became one of the most active projects in the hadoop ecosystem. The comparison happens only with the processing layer-Mapreduce. As per the current test results, spark is performing much better than mapreduce. Spark has several advantages of mapreduce. Spark is still under development and more features are coming up. The realtime stream processing is better in spark compared to other ecosystem components in hadoop. The detailed performance report of spark is available in the following url.

https://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html

Is Spark free or Licensed?

Spark is a 100 % open source project. Now it became an apache project with several committers and contributors across the world.

What are the attracting features in spark in comparison with Mapreduce ?

  • Spark is having Java, Scala and Python APIs.
  • Programming spark is simpler as compared to programming mapreduce. This reduces the development time.
  • The performance of spark is better compared to mapreduce. It is best suited for computations such as realtime processing, iterative computations etc on similar data.
  • Caching is one main feature in spark. Spark stores the intermediate result in memory across its distributed workers. Mapreduce stores the intermediate results on disk. The in memory caching feature of spark makes it faster. The spark streaming provides a realtime data processing feature on the fly of data flow which is missing in case of mapreduce.
  • With spark, it is possible to obtain batch processing, streaming processing, graph processing and machine learning in the same cluster. This provides better resource utilization and easy resource management.
  • Spark has an excellent feature of spilling the data partitions to disk if the node is not having sufficient RAM for storing the data partitions.
  • All these features made spark a very powerful member in the bigdata technology stack and this will be the one of the hottest technologies that is going to capture the market.

Rhipe on AWS (YARN and MRv1)

Rhipe is an R library that runs on top of hadoop. Rhipe is using hadoop streaming concept for running R programs in hadoop. To know more about Rhipe, please check my older post. My previous post on Rhipe was the basic explanation and the installation steps for running Rhipe in cdh4(MRv1). Now yarn became popular and almost everyone are using YARN. So a lot of people asked me assistance for installing Rhipe in YARN. Rhipe works on yarn very well. Here I am just giving a pointer on how to install Rhipe on AWS (Amazon Web Services). I checked this script and it is working fine. This contains the bootstrap script and installables that installs Rhipe automatically in AWS. For those who are new to AWS, I will explain the basics of AWS EMR and bootstrap script. Amazon Web Services are providing a lot of cloud services. Among that Elastic Mapreduce(EMR) is a service that provides a hadoop cluster. This is one of the best solution for users who don’t want to maintain a data center and don’t want to take the headaches of hadoop administration. AWS is providing a list of components for installing in the hadoop cluster. Those services we can choose while installing the hadoop cluster through the web console. Examples for such components are hive, pig, impala, hue, hbase, hunk etc. But in most of the cases, user may require some extra softwares also. This extra requirement depends on user. If the user try to install the extra service manually in the cluster, it will take lot of time. The automated cluster launch will take less than 10 minutes.( I tried for around 100 nodes). But if you install the software in all of these nodes manually, it will take several hours. For this problem, amazon is providing a solution. User can provide any custom shell scripts and these scripts will be executed on all the nodes while installing the hadoop. This script is called bootstrap script. Here we are installing Rhipe using a bootstrap script. For users who want to install Rhipe on  AWS Hadoop MRv1, you can follow this url. Please ensure that you are using the correct AMI. AMI is Amazon Machine Image. This is just a version of the image that they are providing. For those users who want to install Rhipe on AWS Hadoop MRv2 (YARN), you can follow this url. This will work perfectly on AWS AMI 3.2.1. You can download the github repo in your local and put it your S3. Then launch the cluster by specifying the details mentioned in the installation doc.

For non-aws users

For those users who want to install Rhipe on yarn (Any hadoop cluster), you can either build the Rhipe for their corresponding version of hadoop and put that jar inside Rhipe directory or you can directly try using the ready made rhipe for YARN. All the Rhipe versions are available in a common repository. You can download the installable from this location. You have to follow the steps mentioned in the all the shell scripts present in the given repository. This is a manual activity and you have to do this activity on all the nodes in your hadoop cluster.

Sample program with Hadoop Counters and Distributed Cache

Counters are very useful feature in hadoop. This helps us in tracking global events in our job, ie across map and reduce phases.
When we execute a mapreduce job, we can see a lot of counters listed in the logs. Other than the default built-in counters, we can create our own custom counters. The custom counters will be listed along with the built-in counters.
This helps us in several ways. Here I am explaining a scenario where I am using a custom counter for counting the number of good words and stop words in the given text files. The stop words in this program are provided at the run time using distributed cache.
This is a mapper only job. The property job.setNumReduceTasks(0) makes the it a mapper only job.

Here I am introducing another feature in hadoop called Distributed Cache.
Distributed cache will distribute application specific read only files efficiently through out the application.
My requirement is to filter the stop words from input text files. The stop words list may vary. So if I hard code the list in my program, I have to update the code everytime to make changes in the stop word list. This is not a good practice. I used distributed cache for this and the file containing the stop words is loaded to the distributed cache. This makes the file available to mapper as well as reducer. In this program, we don’t require any reducer.

The code is attached below. You can also get the code from the github.

Create a java project with the above java classes. Add the dependent java libraries.(Libraries will be present in your hadoop installation). Export the project as a runnable jar and execute. The file containing the stop words should be present in hdfs. The stop words should be added line by line in the stop word file. Sample format is given below.

is

the

am

are

with

was

were

Sample command to execute the program is given below.

hadoop jar <jar-name>  -skip  <stop-word-file-in hdfs>   <input-data-location>    <output-location>

Eg:  hadoop jar Skipper.jar  -skip /user/hadoop/skip/skip.txt     /user/hadoop/input     /user/hadoop/output

In the job logs, you can see the custom counters also. I am attaching a sample log below.

Counters

Hadoop Cluster Migrator

Hadoop Cluster Migrator tool provides a unified interface for copying data from one cluster to another cluster. Traditionally, DistCpy tool provided by Hadoop is used to migrate and copy the data from one Hadoop cluster to other. However, Distcpy works only when the connectivity between the source and target cluster is well established without any firewall rules blocking this connectivity. But in production scenarios, the edge node isolate the clusters from each other and Distcpy can’t be used for transfer of data or backup of the cluster. This is where Hadoop Cluster Migrator from Knowledge Lens can be very handy.

Hadoop Cluster Migrator is a cluster agnostic tool, which supports migration across different distribution, different version of Hadoop. Currently we support MapR, Cloudera, Hortworks, EMC Pivotal and Apache distribution of Hadoop, both in Kerberos enabled and disabled mode.

This completely Java based tool provides large scale data transfer between the cluster with transfer rate in the range of 10GB/s depending upon the bandwidth available. The tool is completely restartable and restarts from the point where the last transfer was stopped.

For more details refer : Hadoop Cluster Migrator

migrator

Launching an EMR cluster using Python program

Amazon’s EMR is a very easiest way to launch hadoop cluster. Amazon is providing a console as well as api interface for launching clusters. Boto is a python library for dealing with amazon web services. Boto is not only for EMR, it is for most of the amazon web services. Here I am sharing with you a small program for launching an EMR cluster using python boto. This program helps us in situations where automation is required.
In this program, the hadoop cluster will be launched with services such as Pig, Hive, Impala, Ganglia and with some user defined installation.
Bootstrapping is a process in which we can add our own custom installations while launching the cluster.
Suppose if we want to install our own custom installations in the emr cluster in all the nodes, doing the same process manually will be difficult. The bootstrap option helps us to solve this problem in a very simple way. The only thing we need to do is to write a shell script containing the custom steps and put it in an S3 bucket and specify that bucket while installation.

For writing all the amazon related programs, you can check the python boto api. All the methods and coding conventions are taken from the python boto. The best way to learn python coding conventions is by following the conventions used in the boto source code. Boto is an open source python library. I wrote this code by referring the python boto documentation. The coding standards used in this code is also similar to that in the boto.

In this program, if you don’t have a bootstrap step, you can keep it as None.

The code is given below. You can get the code from github also.

HDFS Block Size

In hadoop data is stored as blocks. In every systems, blocks are the basic units. Hadoop block size is larger compared to the disk level and the os level block size, because hadoop is dealing with large data, so if small block size will result in more seek time and more metadata which ultimately results in poor performance. By default the block size in hadoop is 64 MB ( In some distributions, it is 128 MB , eg : Amazon EMR hadoop). We can change the block size based on our requirement. We can change the block size for a singe file , for a set of files or for the entire hdfs.

The property to set the block size is present in hdfs-site.xml. The propery name is dfs.blocksize (dfs.block.size was the old property name, this is deprecated) .

For checking the default  block size, we can use a simple command. This will print the default block size of an hdfs client.

This may not be the block size of the files stored in hdfs. If we are specifying a block size for a file while storing, it will be stored with that block size, else it will be stored with default block size.

> hdfs getconf -confKey dfs.blocksize

An Introduction to Distributed Systems

Pdf Input Format implementation for Hadoop Mapreduce

In my opinion Hadoop is not a cooked tool or framework with readymade features, but it is an efficient framework which allows a lot of customizations  based on our usecases. It is our choice to modify it. By modification, I am not meaning about the modification of the architecture or working, but the modification of its functionality and features.

By default hadoop accepts text files.  But in practical scenarios, our input files may not be text files. It can be pdf, ppt, pst, image or anything. So we need to make hadoop compatible with this various types of input formats.

Here I am explaining about the creation of a custom input format for hadoop. I am explain the code for implementing pdf reader logic inside hadoop. Similarly  you can create any custom reader of your choice. The fundamental is same.

A simple pdf to text conversion program using java is  explained in my previous post PDF to Text. This is a simple pdf parser which converts the text content of the pdf only. If you want more features, you can modify it accordingly. My intention here is to explain about the creation of a custom input format reader for hadoop.

For doing this logic, we need to write or modify two classes.

One is we need a similar class like the default TextInputFormat.class for pdf. We can call it as PdfInputFormat.class.

Second one is we need a similar class like the default LineRecordReader for handling pdf. We can call it as PdfRecordReader.class

If you examine the source code of hadoop, you can see that the default TextInputFormat class is extended from a parent class called FileInputFormat.

So in our case, we can also create a PdfinputFormat class extending the FileInputFormat class.

This will contain a method called createRecordReader which it got from the parent class.

We are calling our custom PdfRecordReader class from this createRecordReader method.

The code for PdfInputFormat is given below.

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat {

	@Override
	public RecordReader createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {

		return new PdfRecordReader();
	}

}

The PdfRecordReader is a custom class created by us extending the RecordReader.

This mainly contains  five methods which is inherited from the parent RecordReader class.

Initialize(), nextKeyValue(),getCurrentKey(),getCurrentValue(), getProgress(), close().

The logic I am using is explained below.

We are applying our pdf parsing logic in this method. This method will get the input split and we parses the input split using our pdf parser logic. The output of the pdf parser will be a text which will be stored in a variable. Then we splits the text into multiple lines by using ‘/n’ as the splitter and we will store this lines in  an array.

We need to send this as a key-value pair. So I am planning to send line number as the key and each line as value. So the logic for checking  getting from the array, setting it as key and value , checking for the completion condition etc are written in the code.

The PdfRecordReader  code is written below.

package com.amal.pdf;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader {

	private String[] lines = null;
	private LongWritable key = null;
	private Text value = null;

	@Override
	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {

		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		final Path file = split.getPath();

		/*
		 * The below code contains the logic for opening the file and seek to
		 * the start of the split. Here we are applying the Pdf Parsing logic
		 */

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		PDDocument pdf = null;
		String parsedText = null;
		PDFTextStripper stripper;
		pdf = PDDocument.load(fileIn);
		stripper = new PDFTextStripper();
		parsedText = stripper.getText(pdf);
		this.lines = parsedText.split("\n");

	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {

		if (key == null) {
			key = new LongWritable();
			key.set(1);
			value = new Text();
			value.set(lines[0]);
		} else {
			int temp = (int) key.get();
			if (temp < (lines.length - 1)) {
				int count = (int) key.get();
				value = new Text();
				value.set(lines[count]);
				count = count + 1;
				key = new LongWritable(count);
			} else {
				return false;
			}

		}
		if (key == null || value == null) {
			return false;
		} else {
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {

		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {

		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {

		return 0;
	}

	@Override
	public void close() throws IOException {

	}

}

For using this in a program, you need to specify this in the driver class. We need to set our custom input format class in the “InputFormatClass” property of our mapreduce program.
For getting a complete picture, I am associating this  with the basic word count mapreduce program.

Driver Class

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf, args);
		args = parser.getRemainingArgs();
		Job job = new Job(conf, "Pdfwordcount");
		job.setJarByClass(PdfInputDriver.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		job.setInputFormatClass(PdfInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		System.out.println(job.waitForCompletion(true));
	}
}

Mapper Class

package com.amal.pdf;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class WordCountMapper extends
		Mapper {
	private Text word = new Text();
	private final static LongWritable one = new LongWritable(1);

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.progress();
			context.write(word, one);
		}
	}
}

Reducer Class

package com.amal.pdf;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
		Reducer {
	protected void reduce(Text key, Iterable values,
			Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (LongWritable value : values) {
			sum += value.get();

		}
		context.write(key, new LongWritable(sum));
	}
}

In a similar way, you can write any custom input format as you wish. 🙂 🙂

A Basic Hadoop Mapreduce Program

Majority of the introductory mapreduce programs I have seen in various websites and books are wordcount. Here I am explaining the working of a basic  mapreduce program without anyprocessing logic.

By default, mapreduce accepts text input format and the java class  which is responsible for this reading is TextInputFormat.class. It internally calls a class called LineRecordReader.

The LineRecordReader reads the input split and it returns record by record which ultimately reaches the mapper class. By default one record is a single line. ( because the entire text is splitted as records using delimiter ‘/n’).

The LineRecordReader returns a key-value pair which ultimately reaches the mapper class. By default, the key is offset and value is a single line.

Here I am writing a program which is nothing but just the basic frame of a mapreduce program.
When you supply an input text to this program, you will get the output as a set of lines and offsets, which is nothing but the input that is getting to the mapper.

Here the output will be (line, offset) because I am swapping the key and value in the mapper class.
The program is written below.

Mapper Class

Here the mapper class is written without adding any processing logic. It get the input as key-value pair where the key will be the offset and value will be the line.

After that it is sending output as another key value pair. Here I am just swapping the key and value. If you don’t want that change, you can directly pass that similar to the input (remember to make the corresponding changes in the mapper and reducer class by changing the LongWritable to Text and Text to LongWritable)


package org.amal.hadoop;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperClass extends Mapper<LongWritable, Text, Text, LongWritable> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

context.write(value, key);
}
}

Reducer Class

Here also I am not adding any processing logic. Just to read key-value pair and write it to the output.
Reducer gets input as key, list(values), that is the reason for the usage of iterable and a for loop.

package org.amal.hadoop;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

public void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
for(LongWritable val: values)
{
context.write(key,val);
}
}
}

Driver Class or Runner Class


package org.amal.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Driver {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

Job job = new Job(conf, "SampleMapreduce");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setJarByClass(Driver.class);
job.setMapperClass(MapperClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}

}

Sample Input

Music is an art form whose medium is sound and silence.
Its common elements are pitch , rhythm , dynamics, and the sonic qualities of timbre and texture.
The word derives from Greek.

Output

Its common elements are pitch , rhythm , dynamics, and the sonic qualities of timbre and texture.    56
Music is an art form whose medium is sound and silence.    0
The word derives from Greek.    154

So the output we got is the line and offset (just the swapped input). 56, 0 and 154 are the offsets.
Any idea on why the lines got rearranged in the output.??
If you examine the input and output, you can see a change in order. This is because, the keys are undergoing a sorting process based on its hash value. Because of that only the change in order occurred.. 🙂