


## Optimizing Data Lake Storage with Apache Iceberg table format

Amazon S3 uses object tagging to categorize storage where each tag is a key-value pair.
From Apache Iceberg perspective, it supports custom S3 Object tags that can be added
to S3 objects while writing and deleting into the table. Iceberg Users can also configure
tag-based object lifecycle policy at bucket level to transition objects to different S3 tiers.
With the s3.delete.tags config property in Iceberg, objects are tagged with the configured
key-value pairs before deletion. When the catalog property s3.delete-enabled is set
to false, the objects are not hard-deleted from S3. This is expected to be used in
combination with S3 delete tagging, so objects are tagged and removed using S3 lifecycle
policy. This property is set to true by default.
The example notebook in this blog shows example implementation of S3 Object tagging
and Lifecycle rules for Apache Iceberg Tables to optimize the storage cost.


## Prerequisites

In this example we will use Iceberg’s S3 Tags feature with the write tag as write-tag-name=created and delete tag as delete-tag-name=deleted. This example is demonstrated on an EMR emr-6.10.0 cluster with installed applications Hadoop 3.3.3, JupyterEnterpriseGateway 2.6.0, and Spark 3.3.1. The examples are executed on a Jupyter Notebook environment attached to the EMR cluster. To know more about how to create an EMR Cluster with Iceberg and how to use EMR Studio, please refer to the following documents: \
i. [Create an Iceberg EMR Cluster](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-spark-cluster.html) \
ii.[EMR Studio Guide](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio.html)





## Configuring Iceberg on Spark session

Configure your Spark session using the %%configure magic command. We will be using Hive Catalog for Iceberg Tables. 
Before you run the following step, create a S3 bucket in your AWS account with following naming convemtion /iceberg/

Update the your-iceberg-storage-blog in below configuration with the bucket which you created to test this example

In [None]:
%%configure -f
{
"conf":{
 "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
 "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
 "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.hive.HiveCatalog",
 "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
 "spark.sql.catalog.dev.warehouse":"s3:///iceberg/",
 "spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created",
 "spark.sql.catalog.dev.s3.delete.tags.delete-tag-name":"deleted",
 "spark.sql.catalog.dev.s3.delete-enabled":"false"
 }
}

Create an Iceberg Table to be loaded with Amazon Reviews

In [None]:
spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")

spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg (
 marketplace string,
 customer_id string,
 review_id string,
 product_id string,
 product_parent string,
 product_title string,
 star_rating int,
 helpful_votes int,
 total_votes int,
 vine string,
 verified_purchase string,
 review_headline string,
 review_body string,
 review_date date,
 year int)
USING iceberg 
location 's3:///iceberg/db/amazon_reviews_iceberg'
PARTITIONED BY (years(review_date))""")

In [None]:
spark.sql(""" select * from dev.db.amazon_reviews_iceberg""").show()

In [None]:
spark.sql(""" select * from dev.db.amazon_reviews_iceberg.snapshots""").show()


### Inserts

We will be using Amazon Product Reviews Dataset dataset for our testing. While inserting the data, we will partition the data by review_date as per the table definition.

In [None]:
df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")

**Run below cell to write data into the Iceberg table, We are writing just one partition for sake of simplicity**

In [None]:
df.sortWithinPartitions("review_date").writeTo("dev.db.amazon_reviews_iceberg").append()

**Verify data is loaded into iceberg table successfully.**

In [None]:
spark.sql("""select * from dev.db.amazon_reviews_iceberg limit 1""").show()

**Verify the new snapshot created for this table after the data insert.**

In [None]:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show(truncate=False)

**Verify the S3 objects related to this table is having the specified tags.** You can do the same from AWS Console or going to the AWSCLI 

**Insert a single record into the Iceberg table.**

In [None]:
spark.sql("""insert into dev.db.amazon_reviews_iceberg values ("US", "99999999","R2RX7KLOQQ5VBG","B00000JBAT","738692522","Diamond Rio Digital",3,0,0,"N","N","Why just 30 minutes?","RIO is really great",date("2023-04-06"),2023)""")

**Check a new snapshot created after the insert. You will now see two snapshots**

In [None]:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

**Now check the S3 Tag populations as below for the new data file created:**

Go to AWS CLI or Console to check the tags populated for the new writes. Let's check the tag corresponding to the object created by single row insert. You can check the S3 folde s3://your-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/data/ and point to the partition review_date_year=2023/. Then check the parquet file under this folder to check the tags. From CLI you can run the following command to see the same. \
xxxxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket your-iceberg-storage-blog --key iceberg/db /amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-xxxxxxxxxx-00001.parquet \
{\
 "TagSet": [\
 {\
 "Key": "write-tag-name",\
 "Value": "created"\
 }\
 ]\
}

**Now Delete sample data and expire snapshots using Iceberg’s S3 Tags feature.** The objects in S3 will have the tag delete-tag-name=deleted associated when the relevant snapshot is expired (https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots).

In [None]:
spark.sql("""delete from dev.db.amazon_reviews_iceberg where review_date = '2023-04-06'""")

**Check the snapshots and you will find a new snapshot with operation value as delete**

In [None]:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

**Expire Snapshots and keep only last two snapshots**

In [None]:
spark.sql ("""CALL dev.system.expire_snapshots(table => 'dev.db.amazon_reviews_iceberg', older_than => DATE '2024-01-01', retain_last => 2)""")

In [None]:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

**View existing metadata files from the metadata log entries metatable after expiration of snapshots. Do note that the snapshots which have expired will show the latest snapshot id as null.**

In [None]:
spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.metadata_log_entries""").show()

**Create lifecycle configuration for the bucket to transition objects having delete-tag-name=deleted S3 tag to Glacier Instant Retrieval class.** Do note that Amazon S3 runs lifecycle rules once every day at midnight Universal Coordinated Time (UTC) and new lifecycle rules can take up to 48 hours to complete the first run. Amazon S3 Glacier is well suited to archive data that needs immediate access (with milliseconds retrieval). With S3 Glacier Instant Retrieval, customers can save up to 68% on storage costs compared to using the S3 Standard-Infrequent Access (S3 Standard-IA) storage class, when the data is accessed once per quarter.


## Cleanups

After you complete the test, please follow this cleanup step to avoid any recurring costs. \
1. Delete the S3 Buckets that you created for this test 
2. Terminate the EMR Cluster 
3. Stop and Delete the EMR Notebook instance
