Delta Science – The art of designing new generation Data Lake

When we hear about Delta Lake, the first question that comes to our mind is

“What is Delta Lake and How it works ?”. 

“Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads”

But the question is how it is possible to maintain transactions in the Big Data world ?. The answer is very simple. It is using Delta Format.

Delta Lake stores data in Delta Format. Delta format is a versioned parquet format along with a scalable metadata. It stores the data as parquet internally and it tracks the changes happening to the data in the metadata file. So the metadata will also grow along with the data.

Delta format solved several major challenges in the Big Data Lake world.  Some of them are listed below

  1. Transaction management
  2. Versioning
  3. Incremental Load
  4. Indexing
  5. UPSERT and DELETE operations
  6. Schema Enforcement and Schema Evolution

I will elaborate this post by explaining each of the above features and explain more about the internals of Delta Lake.

How to configure Delta Lake on EMR ?

EMR versions 5.24.x and higher versions has Apache Spark version 2.4.2 and higher. So Delta Lake can be enabled in EMR versions 5.24.x and above. By default Delta Lake is not enabled in EMR. It is easy to enable Delta Lake in EMR.

We just need to add the delta jar to the spark jars. We can either add it manually or can be performed easily by using a custom bootstrap script. A Sample script is given below. Upload the delta-core jar to an S3 bucket and download it to the spark jars folder using the below shell script. The delta core jar can be downloaded from maven repository. You can even build it yourselves also. The source code is available in github.

Adding this as a bootstrap action will automatically perform this activity while provisioning the cluster. Keep the below script in an S3 location and pass it as bootstrap script.

copydeltajar.sh

#!/bin/bash

aws s3 cp s3://mybucket/delta/delta-core_2.11.0.4.0.jar /usr/lib/spark/jars/

You can launch the cluster either by using the aws web console or by using the aws cli.

aws emr create-cluster --name "Test cluster" --release-label emr-5.25.0 \
--use-default-roles --ec2-attributes KeyName=myDeltaKey \
--applications Name=Hive Name=Spark \
--instance-count 3 --instance-type m5.xlarge \
--bootstrap-actions Path="s3://mybucket/bootstrap/copydeltajar.sh"

 

How to set up Delta Lake in Apache Spark ?

Delta lake is supported in the latest version of Apache Spark. Delta Lake is open sourced with Apache 2.0 license. So it is free to use. Delta Lake is supported in Apache Spark versions above 2.4.2. It is very easy to set up and it does not require any admin skills to configure. Delta Lake is available by default in Databricks. We don’t have to do any installation or configuration to use this Delta Lake in Databricks.

For trying out the basic example, launch pyspark or spark-shell by adding the delta package. No need of any additional installation. Just use the following command

For pyspark

pyspark --packages io.delta:delta-core_2.11:0.4.0
For spark-shell
bin/spark-shell --packages io.delta:delta-core_2.11:0.4.0

The above command/s will add delta package to the context and delta lake will be enabled. You can try out the following basic example in the pyspark shell.

Delta Lake – The New Generation Data Lake

‘Delta Lake is the need of the present era. From the past several years, we have been hearing about DataLakes. I myself worked on several Data Lake implementations also. But the previous generation Data Lake was not a complete solution. One of the main fall backs in the old gen data lake is the difficulty in handling ACID transactions.

Delta Lake brings ACID transactions in the storage layer and thus makes the system more robust and efficient. One of my recent projects was to build a Data Lake for one of the India’s Largest Ride Sharing company. Their requirements include handling CDC (Change Data Capture) in the lake.  Their customers make several rides per day and there will be lot of transactions and changes happening in various entities associated with the platform such as debiting money from wallet, crediting money to wallet, creating ride, deleting ride, updating ride, updating user profile etc.

The initial version of the Lake that I designed was capable of recording only the latest values of each of these entities. But that was not a proper solution as it will not bring the complete analytics capability. So after that I came up with a design using Delta that has the capability to handle the CDC. In this way we will be able to track all the changes happening to the data and also instead of updating the records, we will be keeping the historic data also in the system. Delta-Lake-Architecture

Image Credits: Delta Lake 

The Delta format is the main magic behind the Delta Lake. The Delta format is open sourced by DataBricks and it is available with Apache Spark.

Some of the key features of Delta Lake are listed below.

  1. Support to ACID transactions. It is very tedious to bring data integrity in the conventional Data Lake. The transaction handling capability was missing in the old generation Data Lakes. With the support to transactions, the Delta Lake becomes more efficient and reduces the workload of Data Engineers.
  2. Data Versioning: Delta Lake supports time travel. This helps us for rollback, audit control, version control etc. In this way, the old records are not getting deleted instead it is getting versioned.
  3. Support for Merge, Update and Delete operations.
  4. No major change is required in the existing system to implement Delta Lake. Delta Lake is 100% open source and it is 100% compatible with the Spark APIs. The Delta Lake uses Apache Parquet format to store the data. The following snippet shows show to save data in Delta format. It is very simple, just use “delta” instead of “parquet”
dataframe
   .write
   .format("parquet")
   .save("/dataset")
dataframe
   .write
   .format("delta")
   .save("/dataset")

For trying out Delta in detail, use the community version of DataBricks

Sample code snippet for trying out the same is attached below.

Programmatic way to identify the status of the namenode in an HA enabled hadoop cluster

In an namenode HA enabled hadoop cluster, one of the namenodes will be active and the other will be standby. If you want to perform some operations on HDFS programmatically, some of the the libraries or packages need the details of active namenode (some of the packages in python need the details of active namenode, they will not support the nameservice). In this case, the easiest way to get the status is to issue a GET request similar to the one given below on each of the namenodes. This will help us to identify the status of each namenode.

GET REQUEST

curl 'http://namenode.1.host:50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'

SAMPLE OUTPUT

{
"beans" : [ {
"name" : "Hadoop:service=NameNode,name=NameNodeStatus",
"modelerType" : "org.apache.hadoop.hdfs.server.namenode.NameNode",
"State" : "active",
"SecurityEnabled" : false,
"NNRole" : "NameNode",
"HostAndPort" : "namenode.1.host:8020",
"LastHATransitionTime" : 0
} ]
}

Python code to list all the running EC2 instances across all regions in an AWS account

This code snippet will help you to get the list of all running EC2 instances across all regions in an AWS account. I have used python boto3 package for developing the code. This code will dynamically pick up all the aws ec2 regions. So the code will work perfectly without any modification even if a new region gets added to the AWS.

Note: Only the basic api calls just to list the instance details are mentioned in this program . Proper coding convention is not followed . 🙂

Add partitions to hive table with location as S3

Recently I tried to add a partition to a hive table with S3 as the storage. The command I tried is given below.

ALTER table mytable ADD PARTITION (testdate='2015-03-05') location 's3a://XXXACCESS-KEYXXXXX:XXXSECRET-KEYXXX@bucket-name/DATA/mytable/testdate=2015-03-05';

I got the following exceptions

Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.SentryFilterDDLTask. MetaException(message:Got exception: org.apache.hadoop.fs.FileAlreadyExistsException Can't make directory for path 's3a://XXXACCESS-KEYXXXXX:XXXSECRET-KEYXXX@bucket-name/DATA/mytable' since it is a file.) (state=08S01,code=1)
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:s3a://XXXACCESS-KEYXXXXX:XXXSECRET-KEYXXX@bucket-name/DATA/mytable/testdate=2015-03-05 is not a directory or unable to create one)

Solution:

Use S3n instead of S3a. It will work. So the S3 url should be

s3n://XXXACCESS-KEYXXXXX:XXXSECRET-KEYXXX@bucket-name/DATA/mytable/testdate=2015-03-05

 

 

Heterogeneous storages in HDFS

From hadoop 2.3.0 onwards, hdfs supports heterogeneous storage. What is this heterogeneous storage? What are the advantages of using this?.

Hadoop came as a processing system for processing and storing huge data, a scalable batch processing system. But now it became the platform for DataLake for Enterprises. In large enterprises, various types of data needs to be stored and processed for advanced analytics. Some of these data are required frequently, some are not required frequently, some are required very rarely. If we store all these in the same platform or hardware, the cost will be more. For example, if we are using a cluster in AWS. We have EC2 nodes for our cluster nodes. EC2 uses EBS and ephemeral storage. Depending upon the type of storage, the cost varies. S3 storage is cheaper than EBS storage, but access speed will be less. Similarly glacier will be cheaper compared to S3, but again the data retrieval will take time. Similarly, if we want to keep data in different storage types depending upon the priority and requirement, we can use this feature in hadoop. This feature was not available in earlier versions of hadoop. This is available in hadoop version 2.3.0 onwards. Now datanode can be defined as a collection of storages. Various storage policies available in hadoop are Hot, Warm, Cold, All_SSD, One_SSD and Lazy_Persist.

  • Hot – for both storage and compute. The data that is popular and still being used for processing will stay in this policy. When a block is hot, all replicas are stored in DISK.
  • Cold – only for storage with limited compute. The data that is no longer being used, or data that needs to be archived is moved from hot storage to cold storage. When a block is cold, all replicas are stored in ARCHIVE.
  • Warm – partially hot and partially cold. When a block is warm, some of its replicas are stored in DISK and the remaining replicas are stored in ARCHIVE.
  • All_SSD – for storing all replicas in SSD.
  • One_SSD – for storing one of the replicas in SSD. The remaining replicas are stored in DISK.
  • Lazy_Persist – for writing blocks with single replica in memory. The replica is first written in RAM_DISK and then it is lazily persisted in DISK.

ORA-01045:user name lacks CREATE SESSION privilege; logon denied

After creating a user in oracle database, I tried to login using SQL developer and got an error “ORA-01045:user name lacks CREATE SESSION privilege; logon denied”.

The reason for this error was insufficient privileges.

I solved this issue by granting the following privilege.

grant create session to "<user-name>";

 

 

Unauthorized connection for super-user: hue from IP “x.x.x.x”

If you are getting the following error in hue,

Unauthorized connection for superuser: hue from IP “x.x.x.x”

Add the following property in the core-site.xml of your hadoop cluster and restart the cluster

<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>

<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>

You may face similar error with oozie also. In that case add a similar conf for oozie user in the core-sire.xml

<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>

<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>