Advertisements

Notification on completion of Mapreduce jobs

Heavy mapreduce jobs may run for several hours. There can be several jobs and checking the status of mapreduce jobs manually will be a boring task. I don’t like this  J. If we try to manage java programs using a script, it will not be a clean approach. Using scripts for managing java programs is bad approach. I consider these kind of designs as worst designs.

My requirement was to get notification on completion of mapreduce jobs. These are some critical mapreduce jobs and I don’t want to frequently check the status and wait for its completion.

Hadoop is providing a useful configuration to solve my problem. It is very easy to achieve this solution. Just a few lines of code will help us. Add these three lines to the Driver class

conf.set("job.end.notification.url", "http://myserverhost:8888/notification/jobId=$jobId?status=$jobStatus");
conf.setInt("job.end.retry.attempts", 3);
conf.setInt("job.end.retry.interval", 1000);

By setting these properties, hadoop sends an http request on completion of the job. We need a small piece of code for creating a webservice that accepts this http request and send email. For creating the webservice and email utility I used python language because of simplicity. Once the mapreduce job completes, it sends an http request to the URL mentioned by the configuration job.end.notification.url. The variables jobId and jobStatus will be replaced with the actual values. Once a request comes to the webservice, it will parse the arguments and call the email sending module. This is a very simple example. Instead of email, we can make different kind of notifications such as sms, phone call or triggering some other application etc. The property job.end.notification.url  is very helpful in tracking asynchronous mapreduce jobs. We can trigger another action also using this trigger. This is a clean approach because we are not running any other script to track the status of the job. The job itself is providing the status. We are using the python program for just collecting the status and making notifications using the status.

The python code for the webservice and email notification are given below.

Advertisements

Sample program with Hadoop Counters and Distributed Cache

Counters are very useful feature in hadoop. This helps us in tracking global events in our job, ie across map and reduce phases.
When we execute a mapreduce job, we can see a lot of counters listed in the logs. Other than the default built-in counters, we can create our own custom counters. The custom counters will be listed along with the built-in counters.
This helps us in several ways. Here I am explaining a scenario where I am using a custom counter for counting the number of good words and stop words in the given text files. The stop words in this program are provided at the run time using distributed cache.
This is a mapper only job. The property job.setNumReduceTasks(0) makes the it a mapper only job.

Here I am introducing another feature in hadoop called Distributed Cache.
Distributed cache will distribute application specific read only files efficiently through out the application.
My requirement is to filter the stop words from input text files. The stop words list may vary. So if I hard code the list in my program, I have to update the code everytime to make changes in the stop word list. This is not a good practice. I used distributed cache for this and the file containing the stop words is loaded to the distributed cache. This makes the file available to mapper as well as reducer. In this program, we don’t require any reducer.

The code is attached below. You can also get the code from the github.

Create a java project with the above java classes. Add the dependent java libraries.(Libraries will be present in your hadoop installation). Export the project as a runnable jar and execute. The file containing the stop words should be present in hdfs. The stop words should be added line by line in the stop word file. Sample format is given below.

is

the

am

are

with

was

were

Sample command to execute the program is given below.

hadoop jar <jar-name>  -skip  <stop-word-file-in hdfs>   <input-data-location>    <output-location>

Eg:  hadoop jar Skipper.jar  -skip /user/hadoop/skip/skip.txt     /user/hadoop/input     /user/hadoop/output

In the job logs, you can see the custom counters also. I am attaching a sample log below.

Counters

HDFS Block Size

In hadoop data is stored as blocks. In every systems, blocks are the basic units. Hadoop block size is larger compared to the disk level and the os level block size, because hadoop is dealing with large data, so if small block size will result in more seek time and more metadata which ultimately results in poor performance. By default the block size in hadoop is 64 MB ( In some distributions, it is 128 MB , eg : Amazon EMR hadoop). We can change the block size based on our requirement. We can change the block size for a singe file , for a set of files or for the entire hdfs.

The property to set the block size is present in hdfs-site.xml. The propery name is dfs.blocksize (dfs.block.size was the old property name, this is deprecated) .

For checking the default  block size, we can use a simple command. This will print the default block size of an hdfs client.

This may not be the block size of the files stored in hdfs. If we are specifying a block size for a file while storing, it will be stored with that block size, else it will be stored with default block size.

> hdfs getconf -confKey dfs.blocksize

An Introduction to Distributed Systems

Hadoop Mapreduce- A Good Presentation…

This is a video  I found from youtube that explains hadoop mapreduce very clearly using the wordcount example.

Deployment and Management of Hadoop Clusters

Hadoop FS Shell Commands

FS Shell

FS shell means FileSystem shell. The file system may be hdfs or the local file system(linux file system.

For HDFS the scheme is hdfs,

Eg: hdfs://namenodehost:<port>/user/test

For the local filesystem the scheme is file.

Eg: file:///testfile

If no schema is specified, the default scheme specified in the configuration is used. By default it is hdfs.

Majority of the commands in FS shell behave like corresponding Unix commands. Differences are described with each of the commands.

cat

Usage: hadoop fs -cat URI [URI …]

Copies source paths to stdout.

Example:

  • hadoop fs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
  • hadoop fs -cat file:///file3 /user/hadoop/file4

Exit Code:
Returns 0 on success and -1 on error.

chgrp

Usage: hadoop fs -chgrp [-R] GROUP URI [URI …]

Change group association of files. With -R, make the change recursively through the directory structure. The user must be the owner of files, or else a super-user. Additional information is in the HDFS Admin Guide: Permissions.

chmod

Usage: hadoop fs -chmod [-R] <MODE[,MODE]… | OCTALMODE> URI [URI …]

Change the permissions of files. With -R, make the change recursively through the directory structure. The user must be the owner of the file, or else a super-user. Additional information is in the HDFS Admin Guide: Permissions.

chown

Usage: hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

Change the owner of files. With -R, make the change recursively through the directory structure. The user must be a super-user. Additional information is in the HDFS Admin Guide: Permissions.

copyFromLocal

Usage: hadoop fs -copyFromLocal <localsrc> URI

Similar to put command, except that the source is restricted to a local file reference.

copyToLocal

Usage: hadoop fs -copyToLocal [-ignorecrc] [-crc] URI <localdst>

Similar to get command, except that the destination is restricted to a local file reference.

count

Usage: hadoop fs -count [-q] <paths>

Count the number of directories, files and bytes under the paths that match the specified file pattern. The output columns are:
DIR_COUNT, FILE_COUNT, CONTENT_SIZE FILE_NAME.

The output columns with -q are:
QUOTA, REMAINING_QUATA, SPACE_QUOTA, REMAINING_SPACE_QUOTA, DIR_COUNT, FILE_COUNT, CONTENT_SIZE, FILE_NAME.

Example:

  • hadoop fs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
  • hadoop fs -count -q hdfs://nn1.example.com/file1

Exit Code:

Returns 0 on success and -1 on error.

cp

Usage: hadoop fs -cp URI [URI …] <dest>

Copy files from source to destination. This command allows multiple sources as well in which case the destination must be a directory.
Example:

  • hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2
  • hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

Exit Code:

Returns 0 on success and -1 on error.

du

Usage: hadoop fs -du URI [URI …]

Displays aggregate length of files contained in the directory or the length of a file in case its just a file.
Example:
hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://nn.example.com/user/hadoop/dir1
Exit Code:
Returns 0 on success and -1 on error.

dus

Usage: hadoop fs -dus <args>

Displays a summary of file lengths.

expunge

Usage: hadoop fs -expunge

Empty the Trash. Refer to HDFS Architecture for more information on Trash feature.

get

Usage: hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>

Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option.

Example:

  • hadoop fs -get /user/hadoop/file localfile
  • hadoop fs -get hdfs://nn.example.com/user/hadoop/file localfile

Exit Code:

Returns 0 on success and -1 on error.

getmerge

Usage: hadoop fs -getmerge <src> <localdst> [addnl]

Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally addnl can be set to enable adding a newline character at the end of each file.

ls

Usage: hadoop fs -ls <args>

For a file returns stat on the file with the following format:
filename <number of replicas> filesize modification_date modification_time permissions userid groupid
For a directory it returns list of its direct children as in unix. A directory is listed as:
dirname <dir> modification_time modification_time permissions userid groupid
Example:
hadoop fs -ls /user/hadoop/file1 /user/hadoop/file2 hdfs://nn.example.com/user/hadoop/dir1 /nonexistentfile
Exit Code:
Returns 0 on success and -1 on error.

lsr

Usage: hadoop fs -lsr <args>
Recursive version of ls. Similar to Unix ls -R.

mkdir

Usage: hadoop fs -mkdir <paths>

Takes path uri’s as argument and creates directories. The behavior is much like unix mkdir -p creating parent directories along the path.

Example:

  • hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
  • hadoop fs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir

Exit Code:

Returns 0 on success and -1 on error.

moveFromLocal

Usage: dfs -moveFromLocal <localsrc> <dst>

Similar to put command, except that the source localsrc is deleted after it’s copied.

moveToLocal

Usage: hadoop fs -moveToLocal [-crc] <src> <dst>

Displays a “Not implemented yet” message.

mv

Usage: hadoop fs -mv URI [URI …] <dest>

Moves files from source to destination. This command allows multiple sources as well in which case the destination needs to be a directory. Moving files across filesystems is not permitted.
Example:

  • hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
  • hadoop fs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1

Exit Code:

Returns 0 on success and -1 on error.

put

Usage: hadoop fs -put <localsrc> … <dst>

Copy single src, or multiple srcs from local file system to the destination filesystem. Also reads input from stdin and writes to destination filesystem.

  • hadoop fs -put localfile /user/hadoop/hadoopfile
  • hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
  • hadoop fs -put localfile hdfs://nn.example.com/hadoop/hadoopfile
  • hadoop fs -put – hdfs://nn.example.com/hadoop/hadoopfile
    Reads the input from stdin.

Exit Code:

Returns 0 on success and -1 on error.

rm

Usage: hadoop fs -rm URI [URI …]

Delete files specified as args. Only deletes non empty directory and files. Refer to rmr for recursive deletes.
Example:

  • hadoop fs -rm hdfs://nn.example.com/file /user/hadoop/emptydir

Exit Code:

Returns 0 on success and -1 on error.

rmr

Usage: hadoop fs -rmr URI [URI …]

Recursive version of delete.
Example:

  • hadoop fs -rmr /user/hadoop/dir
  • hadoop fs -rmr hdfs://nn.example.com/user/hadoop/dir

Exit Code:

Returns 0 on success and -1 on error.

setrep

Usage: hadoop fs -setrep [-R] <path>

Changes the replication factor of a file. -R option is for recursively increasing the replication factor of files within a directory.

Example:

  • hadoop fs -setrep -w 3 -R /user/hadoop/dir1

Exit Code:

Returns 0 on success and -1 on error.

stat

Usage: hadoop fs -stat URI [URI …]

Returns the stat information on the path.

Example:

  • hadoop fs -stat path

Exit Code:
Returns 0 on success and -1 on error.

tail

Usage: hadoop fs -tail [-f] URI

Displays last kilobyte of the file to stdout. -f option can be used as in Unix.

Example:

  • hadoop fs -tail pathname

Exit Code:
Returns 0 on success and -1 on error.

test

Usage: hadoop fs -test -[ezd] URI

Options:
-e check to see if the file exists. Return 0 if true.
-z check to see if the file is zero length. Return 0 if true
-d check return 1 if the path is directory else return 0.

Example:

  • hadoop fs -test -e filename

text

Usage: hadoop fs -text <src>

Takes a source file and outputs the file in text format. The allowed formats are zip and TextRecordInputStream.

touchz

Usage: hadoop fs -touchz URI [URI …]

Create a file of zero length.

Example:

  • hadoop -touchz pathname

Exit Code:
Returns 0 on success and -1 on error.

Hadoop commands in hive command line interface

We can execute hadoop commands in hive cli. It is very simple.
Just put an exclamation mark (!) before your hadoop command in hive cli and put a semicolon (;) after your command.

Example:

hive> !hadoop fs –ls / ;

drwxr-xr-x   - hdfs supergroup          0 2013-03-20 12:44 /app
drwxrwxrwx   - hdfs supergroup          0 2013-05-23 11:54 /tmp
drwxr-xr-x   - hdfs supergroup          0 2013-05-08 18:47 /user

Very simple.. 🙂

Making hive usable to multiple users in a hadoop cluster.

By default, hive operations are limited to the superuser. If you are using cdh, then the superuser is hdfs.
The reason for this is because of the permission of hive warehouse directory.
By default the read/permission of this directory is given only to the superuser.
So if we want to use hive from multiple users, change the permission of this directory accordingly.
If you want to make hive usable by all users, then do the following command.

hadoop fs –chmod –R 777 /user/hive/warehouse

hadoop fs –chmod –R 777 /tmp

If you group the users in specific groups, then you can do this by giving read/write permission to group only. ie 775

Changing the Hive Warehouse Directory

By default the hive warehouse directory is located at  the hdfs location /user/hive/warehouse

If you want to change this location, you can add the following property to hive-site.xml.

Everyone using hive should have appropriate read/write permissions to this warehouse directory.

<property>
   <name>hive.metastore.warehouse.dir</name>
   <value>/user/hivestore/warehouse </value>
   <description>location of the warehouse directory</description>
 </property>