# Build a high-performance, transactional data lake using Delta Lake on Amazon EMR

## Topics covered in this example

1) [Configuring Delta Lake](#configure_deltalake) <br>
2) [Creating an Delta Lake Table](#create_table) <br>
3) [DML Statements](#dml) <br>
&emsp;&emsp;&emsp;&emsp;a) [Updates](#updates) <br>
&emsp;&emsp;&emsp;&emsp;b) [Deletes](#deletes) <br>
&emsp;&emsp;&emsp;&emsp;c) [Upserts](#upserts) <br>
4) [Time Travel](#time_travel) <br>
5) [Optimization with File Management](#Optimization_with_File_Management) <br>
6) [Z-ordering](#z_ordering) <br>

## Introduction

Delta Lake is an open source project that enables building a modern data architecture on top of data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, and HDFS. Specifically, Delta Lake offers:
* ACID transactions on Spark: Serializable isolation levels ensure that readers never see inconsistent data.
* Scalable metadata handling: Leverages Spark distributed processing power to handle all the metadata for petabyte-scale tables with billions of files at ease.
* Streaming and batch unification: A table in Delta Lake is a batch table as well as a streaming source and sink. Streaming data ingest, batch historic backfill, interactive queries all just work out of the box.
* Schema enforcement: Automatically handles schema variations to prevent insertion of bad records during ingestion.
* Time travel: Data versioning enables rollbacks, full historical audit trails, and reproducible machine learning experiments.
* Upserts and deletes: Supports merge, update and delete operations to enable complex use cases like change-data-capture, slowly-changing-dimension (SCD) operations, streaming upserts, and so on.

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* This notebook was tested using EMR 6.7 cluster and Delta Lake 2.0.0
* [Download](https://github.com/delta-io/delta/releases/) the delta jar file (we used delta-core_2.12:2.0.0) and store it in a S3 Bucket (e.g. s3://your bucket/jars/) in your AWS account. Create below script and store it into a S3 bucket (e.g. s3://your bucket/bootstrap/deltajarinstall.sh) to be used for bootstrap action as shown in the following example.

       
           #!/bin/bash
           sudo curl -O --output-dir /usr/lib/spark/jars/  https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.0.0/delta-core_2.12-2.0.0.jar 
           sudo curl -O --output-dir /usr/lib/spark/jars/  https://repo1.maven.org/maven2/io/delta/delta-storage/2.0.0/delta-storage-2.0.0.jar
           sudo python3 -m pip install delta-spark==2.0.0


* Here is a sample CLI command to create an EMR cluster. Replace "your subnet" with one of the subnets in which your EMR Studio is running and update "your-bucket" with the your s3 bucket:

            aws emr create-cluster \
            --name "emr-delta-lake-blog" \
            --release-label emr-6.7.0 \
            --applications Name=Hadoop Name=Hive Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
            --instance-type m5.xlarge \
            --instance-count 3 \
            --ec2-attributes SubnetId='<your subnet>' \
            --use-default-roles \
            --bootstrap-actions Path="s3://<your bucket>/bootstrap/deltajarinstall.sh"
              

* This notebook uses the `PySpark` kernel. 
***

<a id="configure_deltalake"></a>
## Configuring Delta Lake on Spark session

Configure your Spark session using the %%configure magic command.

In [None]:
%%configure -f
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}

In [None]:
#import libaries
from delta.tables import *
from pyspark.sql.functions import *

In [None]:
deltaPath = "s3://<your bucket>/delta-amazon-reviews-pds/"

<a id="create_table"></a>
## Create Delta Lake Table

**We will be using <a href="https://us-east-1.console.aws.amazon.com/s3/home?region=us-east-1&bucket=amazon-reviews-pds" target="_blank"> Amazon Product Reviews Dataset</a> dataset, spend some time to get familiarized with this dataset.**

We are loading just one partition for sake of simplicity

In [None]:
df_parquet = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Gift_Card/*.parquet")

In [None]:
#to check schema
df_parquet.printSchema()

Convert the parquet file and write the data to s3 in Delta Lake table format

In [None]:
df_parquet.write.mode("overwrite").format("delta").partitionBy("year").save(deltaPath)

In [None]:
df_delta = spark.read.format("delta").load(deltaPath)
df_delta.show()

In [None]:
%%sql
SELECT * FROM  delta.`s3://<your bucket>/delta-amazon-reviews-pds/` LIMIT 10

<a id="dml"></a>
## DML Operations
Delta Lake supports all DML statements to add or modify data in your data lake: Inserts to add new data, Updates to modify specific columns in specific rows in your existing data, Deletes for GDPR and CCPA compliance and Upserts when you have incoming data that may have a mix of inserts and updates. Let us look at each of them now.

<a id="updates"></a>
### Updates
Change the `marketplace` from US to USA. Delta Lake Table allows updates using a simple `UPDATE` and`SET` clause added to your query

In [None]:
deltaTable = DeltaTable.forPath(spark, deltaPath)

In [None]:
# Update column 
deltaTable.update("marketplace = 'US'",{ "marketplace":"'USA'"})

You can also use sparkmagic %%sql 

In [None]:
%%sql
update delta.`s3://<your bucket>/delta-amazon-reviews-pds/`
set marketplace = 'US' where marketplace = 'USA' 

In [None]:
deltaTable.toDF().show()

<a id="deletes"></a>
### Deletes
GDPR and CCPA regulations mandate timely removal of individual customer data and other records from datasets. Delta Lake Table is designed to be able to handle these trivially.

In [None]:
deltaTable.delete("verified_purchase = 'N'")

In [None]:
deltaTable.toDF().show()

<a id="time_travel"></a>
## Time Travel
Let us query our table as of the previous snapshot.

In [None]:
deltaTable.history(100).select("version", "timestamp", "operation", "operationParameters").show(truncate=False)

In [None]:
df_time_travel = spark.read.format("delta").option("versionAsOf", 0).load(deltaPath)
df_time_travel.show()

<a id="upserts"></a>
### Upserts

In [None]:
data_upsert = [ {"marketplace":'US',"customer_id":'38602100', "review_id":'R315TR7JY5XODE',"product_id":'B00CHSWG6O',"product_parent":'336289302',"product_title" :'Amazon eGift Card', "star_rating":'5', "helpful_votes":'2',"total_votes":'0',"vine":'N',"verified_purchase":'Y',"review_headline":'GREAT',"review_body":'GOOD PRODUCT',"review_date":'2014-04-11',"year":'2014'},
    {"marketplace":'US',"customer_id":'38602103', "review_id":'R315TR7JY5XOA1',"product_id":"B007V6EVY2","product_parent":'910961751',"product_title" :'Amazon eGift Card', "star_rating":'5', "helpful_votes":'2',"total_votes":'0',"vine":'N',"verified_purchase":'Y',"review_headline":'AWESOME',"review_body":'GREAT PRODUCT',"review_date":'2014-04-11',"year":'2014'}
]

In [None]:
df_data_upsert = spark.createDataFrame(data_upsert)

In [None]:
df_data_upsert.show()

In [None]:
(deltaTable
.alias('t')
.merge(df_data_upsert.alias('u'), 't.review_id = u.review_id')
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())

In [None]:
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("temp_product_reviews")
spark.sql("SELECT * FROM temp_product_reviews where review_id in ('R315TR7JY5XODE','R315TR7JY5XOA1')").show()

Compare this with original records using time travel

In [None]:
df_time_travel.filter("review_id ='R315TR7JY5XODE'").show()

<a id="Optimization_with_File_Management"></a>
## Optimization with File Management
To improve query speed, Delta Lake supports the ability to optimize the layout of data instorage. There are various ways to optimize the layout. You can use the following command to optimize the whole table

In [None]:
deltaTable.optimize().executeCompaction()

If you have large amount of data, you can reduce the scope of optimization by using where clause condition as shown below:

In [None]:
deltaTable.optimize().where("year='2015'").executeCompaction()

<a id="z_ordering"></a>
## Z-ordering
Delta Lake used Z-Ordering for the data - skipping algorithms. This reduces the amount of data that delta lake needs to read. To perform the Z-Order of data you specify the columns to order on in the ZORDER BY clause. In the following example, we are z-ordering the table based on a low cardinality column “verified_purchase”.

In [None]:
deltaTable.optimize().executeZOrderBy("verified_purchase")

<a id="clean_up"></a>
## Clean up
To avoid ongoing charges, delete the Amazon S3 buckets, delete the Amazon EMR studio, and terminate the EMR cluster used for experimentation of this post.