In [None]:
%session_id_prefix delta-dataframe-
%glue_version 3.0
%idle_timeout 60
%connections 
%%configure
{
 "--conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
 "--extra-py-files": "/tmp/etl-delta-core_2.12-1.0.0.jar" # for custom connector
 # "--extra-py-files": "/tmp/delta-core_2.12-1.0.0.jar" # for marketplace connector
}

In [None]:
bucket_name = ""
bucket_prefix = ""
database_name = "delta_dataframe"
database_prefix = f"{bucket_prefix}/{database_name}"
database_location = f"s3://{bucket_name}/{database_prefix}/"
table_name = "products"
table_prefix = f"{database_prefix}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}/"

## Clean up existing resources

In [None]:
import boto3

## Delete files in S3
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.objects.filter(Prefix=f"{table_prefix}/").delete()

## Drop tables in Glue Data Catalog
glue = boto3.client('glue')
try:
 glue.delete_table(DatabaseName=database_name, Name=table_name)
except glue.exceptions.EntityNotFoundException:
 print(f"Table {database_name}.{table_name} does not exist")
try:
 glue.delete_table(DatabaseName=database_name, Name='testTable')
except glue.exceptions.EntityNotFoundException:
 print(f"Table {database_name}.testTable does not exist")


## Create Delta table with sample data

In [None]:
try:
 glue = boto3.client('glue')
 res = glue.get_database(Name=database_name)
 print(f"Database {database_name} exists.")
 if 'LocationUri' not in res['Database']:
 print(f"Warning: Database {database_name} does not have Location. You need to configure location in the database.")
except glue.exceptions.EntityNotFoundException:
 print(f"Database {database_name} does not exist.")
 glue = glue.create_database(
 DatabaseInput={
 'Name': database_name,
 'LocationUri': database_location
 }
 )
 print(f"Created a new database {database_name}.")

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

In [None]:
# Create table in the metastore using DataFrame's schema and write data to it
df_products.write.format("delta").mode("overwrite").option("path",table_location).saveAsTable(f"{database_name}.{table_name}")

In [None]:
# Create a Deltatable with path (not in metastore) using DataFrame's schema and write/overwrite data to it
df_products.write.format("delta").mode("overwrite").save(table_location)

In [None]:
# Create a Deltatable using the DeltaTableBuilder API using a dataframe's schema
from delta.tables import DeltaTable
deltaTable = DeltaTable.create(spark).tableName(f"{database_name}.testTable").addColumns(df_products.schema).location(table_location+"_testTable").execute()

## Read from Delta table via DataFrame

In [None]:
# query table in the metastore
df_products_read = spark.table(f"{database_name}.{table_name}")
df_products_read.show()



In [None]:
# query table by path
df_products_read = spark.read. \
 format("delta"). \
 load(table_location)
df_products_read.show()

## Read from Delta table via DeltaLake library

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, table_location) #query table from path
deltaTable.toDF().show()

In [None]:
deltaTable = DeltaTable.forName(spark,f"{database_name}.{table_name}") #query table from metastore
deltaTable.toDF().show()

In [None]:
deltaTabletest = DeltaTable.forName(spark,f"{database_name}.testTable").toDF().show()

## Modify schema/properties of DeltaLake table

In [None]:
from pyspark.sql.functions import lit

df_products.withColumn("Currency",lit("USD")).write \
 .format("delta") \
 .mode("overwrite") \
 .option("overwriteSchema", "true") \
 .option("path", table_location) \
 .saveAsTable(f"{database_name}.{table_name}") # External table

## Insert records

In [None]:
ut = time.time()
product_insert = [
 {'product_id': '00006', 'product_name': 'Book', 'price': 500, 'category': 'Stationery','Currency': 'INR', 'updated_at': ut}, # Insert
 {'product_id': '00007', 'product_name': 'Pen', 'price': 50, 'category': 'Stationery','Currency': 'USD', 'updated_at': ut} # Insert
]
df_product_insert = spark.createDataFrame(Row(**x) for x in product_insert)

df_product_insert.write.format("delta").mode("append").saveAsTable(f"{database_name}.{table_name}")

## Update records

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, f"{database_name}.{table_name}")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
 condition = "product_id = '00006'",
 set = { "Currency": "'USD'" }
)


## Delete records

In [None]:
# delete product_id 00007

deltaTable = DeltaTable.forName(spark, f"{database_name}.{table_name}")
deltaTable.delete("product_id = '00007'")

## Upsert records into Delta table

In [None]:
ut = time.time()

product_updates = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'Currency': 'INR','updated_at': ut}, # Update
 {'product_id': '00007', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'Currency': 'INR','updated_at': ut} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)

deltaTable.alias("products").merge(
 df_product_updates.alias("updates"),
 "products.product_id = updates.product_id") \
 .whenMatchedUpdate(set = {
 "product_name": "updates.product_name",
 "price": "updates.price",
 "category": "updates.category",
 "updated_at": "updates.updated_at",
 "Currency": "updates.Currency"
 } ) \
 .whenNotMatchedInsert(values = {
 "product_id": "updates.product_id",
 "product_name": "updates.product_name",
 "price": "updates.price",
 "category": "updates.category",
 "updated_at": "updates.updated_at",
 "Currency": "updates.Currency"
 }
) \
.execute()
deltaTable.toDF().show()

## View History

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, table_location)

fullHistoryDF = deltaTable.history() 
fullHistoryDF.show()

## Query with time travel

In [None]:
df1 = spark.read.format("delta").option("timestampAsOf", "2022-04-28 12:44:07").load(table_location) #timestamp in YYYY-MM-DD HH:MM:SS
df2 = spark.read.format("delta").option("versionAsOf", 3).load(table_location)
df1.show()
df2.show()


# Roll Back

In [None]:
#Restore to version 2 using dataframe

spark.read.format("delta") \
 .option("versionAsOf", 2) \
 .load(table_location) \
 .write \
 .mode("overwrite") \
 .format("delta") \
 .save(table_location)


## Stop Session

In [None]:
%stop_session