In [None]:
%session_id_prefix native-delta-sql-s3path
%glue_version 3.0
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
  "--datalake-formats": "delta"
}

In [None]:
bucket_name = "<Your S3 bucket name>"
bucket_prefix = "<Your S3 bucket prefix>"
database_name = "delta_sql_s3path"
database_prefix = f"{bucket_prefix}/{database_name}"
database_location = f"s3://{bucket_name}/{database_prefix}/"
table_name = "products"
table_prefix = f"{database_prefix}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}/"

## Clean up existing resources

In [None]:
import boto3

## Delete files in S3
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.objects.filter(Prefix=f"{table_prefix}/").delete()

## Create Delta table with sample data

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

In [None]:
df_products.write.format("delta"). \
  mode("overwrite"). \
  save(table_location)

## Read from Delta Lake table

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## Insert records

In [None]:
ut = time.time()
query=f"""INSERT INTO delta.`s3://{bucket_name}/{table_prefix}` VALUES('00006', 'Pen', 30,'Stationery',{ut}), ('00007', 'Book', 500,'Stationery',{ut})"""
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## Update records

In [None]:
ut = time.time()
query=f"""UPDATE  delta.`s3://{bucket_name}/{table_prefix}` SET price=300, updated_at={ut} WHERE product_id == '00007'"""
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## Upsert records

In [None]:
ut = time.time()
product_updates = [
    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update
    {'product_id': '00008', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)

df_product_updates.createOrReplaceTempView("tmp_products_updates")

In [None]:
query = f"""MERGE INTO delta.`s3://{bucket_name}/{table_prefix}` AS old 
USING tmp_products_updates AS new 
ON old.product_id=new.product_id 
WHEN MATCHED THEN 
UPDATE SET 
    old.product_name=new.product_name,
    old.price=new.price,
    old.category=new.category,
    old.updated_at=new.updated_at
WHEN NOT MATCHED 
THEN INSERT (product_id, product_name, price,category,updated_at) 
VALUES ( 
    new.product_id, 
    new.product_name, 
    new.price, 
    new.category, 
    new.updated_at 
)"""
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## Alter DeltaLake table

In [None]:
query = f"""ALTER TABLE delta.`s3://{bucket_name}/{table_prefix}` ADD COLUMNS (CURRENCY STRING AFTER PRICE)"""
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

In [None]:
query=f"""UPDATE  delta.`s3://{bucket_name}/{table_prefix}` SET CURRENCY ="INR" """
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## Delete records

In [None]:
query = f"""DELETE FROM delta.`s3://{bucket_name}/{table_prefix}` WHERE product_name == "Pen" """
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## View History

In [None]:
query = f"""DESCRIBE HISTORY delta.`s3://{bucket_name}/{table_prefix}` """
spark.sql(query).show(truncate=False)

## Query with time travel

In [None]:
from delta import *
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@v5`""" #Using a version number
spark.sql(query).show(truncate=False)

You need timestamp value in yyyyMMddHHmmssSSS format, and replace the folloiwng timestamp with the value.

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@20220414145923000`""" #by passing the timestamp in yyyyMMddHHmmssSSS format to the path
spark.sql(query).show(truncate=False)

Note: Spark SQL does not support TIMESTAMP AS OF/VERSION keywords as of now https://github.com/delta-io/delta/issues/128 #https://issues.apache.org/jira/browse/SPARK-34978

## Roll Back

In [None]:
##Fix accidental deletes to product_name pen
query = f"""INSERT INTO delta.`s3://{bucket_name}/{table_prefix}`
  SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@v5`
  WHERE product_id = 00006"""

spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

In [None]:
##Fix accidental updates to product_id 00005. First we update the price of product_id 00005

ut=time.time()
query=f"""UPDATE delta.`s3://{bucket_name}/{table_prefix}` SET price=100,updated_at={ut} WHERE product_id == '00005'"""

spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

In [None]:
query = f"""DESCRIBE HISTORY delta.`s3://{bucket_name}/{table_prefix}` """
spark.sql(query).show(truncate=False)

In [None]:
#Roll Back the update just made
query=f"""MERGE INTO delta.`s3://{bucket_name}/{table_prefix}` dest
USING delta.`s3://{bucket_name}/{table_prefix}@v7` src
ON src.product_id = dest.product_id
WHEN MATCHED THEN UPDATE SET * """
spark.sql(query)

In [None]:
query = f"""SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`"""
spark.sql(query).show(truncate=False)

## Stop Session

In [None]:
%stop_session