# Iceberg Example Using Hive Catalog Notebook

## Topics covered in this example

1) [Configuring Iceberg](#configure_iceberg) <br>
2) [Creating an Iceberg Table](#create_table) <br>
3) [DML Statements](#dml) <br>
&emsp;&emsp;&emsp;&emsp;a) [Inserts](#inserts) <br>
&emsp;&emsp;&emsp;&emsp;b) [Deletes](#deletes) <br>
&emsp;&emsp;&emsp;&emsp;d) [Updates](#updates) <br>
4) [Schema Evolution](#schema_evolution) <br>
&emsp;&emsp;&emsp;&emsp;a) [Adding Columns](#adding_columns) <br>
&emsp;&emsp;&emsp;&emsp;c) [Dropping Columns](#dropping_columns) <br>
5) [Time Travel](#time_travel) <br>
&emsp;&emsp;&emsp;&emsp;a) [Rollback](#rollback) <br>

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* This notebook was tested using a single node r5.4xlarge EMR 6.5 cluster. Iceberg 0.12.0, Spark 3.1.2
* To run this notebook
    - Launch an EMR 6.5+ cluster in one of the subnets on which this EMR Studio is running.
    - Launch the cluster with the following configuration classifications:

        
            [
              {
                "Classification": "iceberg-defaults ",
                "Properties": {
                  "iceberg.enabled":"true"
                }
              }
            ]

The classification enables Iceberg.

* Here is a sample CLI command to create an EMR cluster. Do remember to replace <span style="color:red">EMR-STUDIO-SUBNET</span> with one of the subnets in which your EMR Studio is running:

            aws emr create-cluster \
                --name iceberg-emr-cluster\
                --use-default-roles \
                --release-label emr-6.5.0 \
                --instance-count 1 \
                --instance-type r5.4xlarge \
                --applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
                --ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
                --configurations '[{"Classification":"iceberg-defaults","Properties":{"iceberg.enabled":"true"}}]'
                

* This notebook uses the `PySpark` kernel. 
***

## Introduction
Apache Iceberg (https://iceberg.apache.org/) is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. Iceberg tracks individual data files in a table instead of directories. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time a new file is inserted to any partition in this table, a new point-in-time snapshot of all the files get created. At the query time, there is no need to list a directory to find the files we need to work with, as the snapshot already has that information pre-populated during the write time (commonly known as snapshot isolation (https://en.wikipedia.org/wiki/Snapshot_isolation) in databases).

Iceberg supports write, delete, update, and time travel operations with complete support for ACID transactions (https://en.wikipedia.org/wiki/ACID). Table changes are atomic and readers never see partial or uncommitted changes (serializable isolation (https://en.wikipedia.org/wiki/Isolation_(database_systems)#Serializable))

Iceberg table format is an open specification at multiple levels. At the catalog level, you can plugin multiple types of catalogs such as hive, hadoop, AWS Glue Data Catalog etc. All these can co-exist. You can join tables across different types of catalogs. In this example, we are going to work with Hive Data Catalog.

***

## Setup
Create an S3 bucket location to save sample dataset. In this example we use the path format: s3://<span style="color:red">my_s3_bucket</span>/iceberg/<span style="color:red">YOUR-CATALOG-NAME</span>/tables/ 
    
    For example: s3://my_s3_bucket/iceberg/db/amazon_reviews_iceberg

update the my_s3_bucket with the bucket which you create.
***



<a id="configure_iceberg"></a>
## Configuring Iceberg on Spark session

Configure your Spark session using the %%configure magic command. We will be using Hive Catalog for Iceberg Tables

Update the <span style="color:red">my_bucket_name </span>in below configuration which you create the Prerequisite - Create S3 Bucket section of workshop

In [None]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",
    "spark.sql.catalog.spark_catalog.type":"hive",
    "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.dev.type":"hadoop",
    "spark.sql.catalog.dev.warehouse":"s3://my_s3_bucket/iceberg/"
    }
}

<a id="create_table"></a>
## Create Iceberg Table, this table is using Hive Catalog. 

Update the **my_bucket_name** in below DDL.

We will use PySpark for most of our Iceberg operations, although you could use equivalent Spark SQL, Scala, or Java languages to achieve all of these as well.

In [None]:
spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")

spark.sql(""" CREATE TABLE  dev.db.amazon_reviews_iceberg (
    marketplace string,
    customer_id string,
    review_id string,
    product_id string,
    product_parent string,
    product_title string,
    star_rating int,
    helpful_votes int,
    total_votes int,
    vine string,
    verified_purchase string,
    review_headline string,
    review_body string,
    review_date date,
    year int)
USING iceberg 
location 's3://my_s3_bucket/iceberg/db/amazon_reviews_iceberg'""")

<a id="dml"></a>
## DML Operations
Icerberg supports all DML statements to add or modify data in your data lake: Inserts to add new data, Updates to modify specific columns in specific rows in your existing data, Deletes for GDPR and CCPA compliance and Upserts when you have incoming data that may have a mix of inserts and updates. Let us look at each of them now.

<a id="inserts"></a>
### Inserts

**We will be using <a href="https://us-east-1.console.aws.amazon.com/s3/home?region=us-east-1&bucket=amazon-reviews-pds" target="_blank"> Amazon Product Reviews Dataset</a> dataset, spend some time to get familiarized with this dataset.**

We are loading just one partition for sake of simplicity

In [None]:
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")

**Run below cell to write data into the Iceberg table, We are writing just one partition for sake of simplicity**

In [None]:
df.writeTo("dev.db.amazon_reviews_iceberg").append()

**Verify data is loaded into iceberg table successfully.**

In [None]:
spark.sql("""select * from dev.db.amazon_reviews_iceberg""").show()

<a id="deletes"></a>
### Deletes
GDPR and CCPA regulations mandate timely removal of individual customer data and other records from datasets. Iceberg is designed to be able to handle these trivially.
Now let us delete a record from our Iceberg table.

Delete all records from the table for verified_purchase = 'N'

In [None]:
spark.sql("""delete from dev.db.amazon_reviews_iceberg
where verified_purchase = 'N'""")

Test if data is deleted. Below query should produce zero records.

In [None]:
spark.sql("""select * from dev.db.amazon_reviews_iceberg where verified_purchase = 'N'""").show()

<a id="updates"></a>
### Updates
What if we want to go back and update an existing record? Let's change the `marketplace` from US to USA. Iceberg allows updates using a simple `UPDATE` and`SET` clause added to your query

In [None]:
spark.sql("""UPDATE dev.db.amazon_reviews_iceberg
SET marketplace = 'USA'
WHERE marketplace = 'US'""")

Verify 'marketplace' column is updated

In [None]:
spark.sql("""select * from dev.db.amazon_reviews_iceberg""").show()

<a id="schema_evolution"></a>
## Schema Evolution
Borrowing from the way columns work in databases, Iceberg tracks columns by using unique IDs and not by the column name. As long as the ID is the same, all the data still remains. You can safely add, drop, rename, update, or even reorder columns. You don’t have to rewrite the data for this. Schema evolution gets first class citizen treatment in Iceberg. Your ingest and read queries now have the freedom to be evolved without having to hide the schema inside JSON blobs.

In this example we will add a column to the iceberg table which we just created. We will add comment column to the table.

<a id="adding_columns"></a>
### Adding Columns
Now we are going to add another column called `high_rated_product`. Iceberg also allows documenting the purpose for each column as `comment`, which helps a lot in a collaborative environment and quick lookup of data from business users.

In [None]:
spark.sql("""ALTER TABLE dev.db.amazon_reviews_iceberg ADD COLUMNS (high_rated_product string comment 'Highly rated comment')""")

We will add **High rated** flag to the comment column where rating is greater or equal to 4

In [None]:
spark.sql("""UPDATE dev.db.amazon_reviews_iceberg SET high_rated_product = 'High rated'Where star_rating >=4""")

Verify column is added successfully by quering the table.

In [None]:
spark.sql("""Select customer_id,review_id,product_id, product_title, star_rating, high_rated_product from dev.db.amazon_reviews_iceberg""").show()

<a id="dropping_columns"></a>
### Dropping Columns
Now, there is a change in business requirements, we are not interested in the `high_rated_product` column anymore and need to remove that column from our table. Iceberg allows us to do that easily.

In [None]:
spark.sql("""ALTER TABLE dev.db.amazon_reviews_iceberg DROP COLUMN high_rated_product""")

In [None]:
spark.sql("""select * from dev.db.amazon_reviews_iceberg""").show()

<a id="time_travel"></a>
## Time Travel
Let us query our table as of the previous snapshot.

**To View Table's History, Snapshot, Mainefest and Metadata**

In [None]:
#Table History
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.history""").show()

In [None]:
#Table Snapshot
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

In [None]:
#You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
spark.sql("""select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor,
    s.summary['spark.app.id']
from dev.db.amazon_reviews_iceberg.history h
join dev.db.amazon_reviews_iceberg.snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at""").show()

In [None]:
#To show a table’s data files and each file’s metadata, run:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.files""").show()

In [None]:
#To show a table’s file manifests and each file’s metadata, run:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.manifests""").show()

<a id="rollback"></a>
### Rollback

To undo the recent changes, we can execute Iceberg stored procedures using `CALL` statement to rollback the state of the table to any historical commit using `rollback_to_snapshot` stored procedure. We could also use `rollback_to_timestamp`.

Recover the table to its original state, replace the xxxxxxxxxxxxx with Snapshot id from Table History. Use the snapshot_id with parent_id = null from Table History (first record)

In [None]:
spark.sql("CALL dev.system.rollback_to_snapshot('db.amazon_reviews_iceberg', xxxxxxxxxxxxx)")

Our table is now back to original state

In [None]:
spark.sql("select * from dev.db.amazon_reviews_iceberg").show()