# Iceberg Example Notebook



## Topics covered in this example

1) [Configuring Iceberg](#configure_iceberg) <br>
2) [Iceberg Catalogs and Namespaces](#catalogs) <br>
3) [Creating an Iceberg Table](#create_table) <br>
4) [DML Statements](#dml) <br>
&emsp;&emsp;&emsp;&emsp;a) [Inserts](#inserts) <br>
&emsp;&emsp;&emsp;&emsp;b) [Deletes](#deletes) <br>
&emsp;&emsp;&emsp;&emsp;c) [Upserts](#upserts) <br>
&emsp;&emsp;&emsp;&emsp;d) [Updates](#updates) <br>
5) [Schema Evolution](#schema_evolution) <br>
&emsp;&emsp;&emsp;&emsp;a)  [Renaming Columns](#renaming_columns) <br>
&emsp;&emsp;&emsp;&emsp;b) [Adding Columns](#adding_columns) <br>
&emsp;&emsp;&emsp;&emsp;c) [Dropping Columns](#dropping_columns) <br>
6) [Time Travel](#time_travel) <br>
&emsp;&emsp;&emsp;&emsp;a) [Rollback](#rollback) <br>
&emsp;&emsp;&emsp;&emsp;b) [Roll Forward](#roll_forward) <br>
7) [Partition Evolution](#partition_evolution) <br>


***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* This notebook was tested using a single node r5.4xlarge EMR 6.5 cluster. Iceberg 0.12.0, Spark 3.1.2
* To run this notebook
    - Launch an EMR 6.5+ cluster in one of the subnets on which this EMR Studio is running.
    - Launch the cluster with the following configuration classifications:

        
            [
              {
                "Classification": "iceberg-defaults",
                "Properties": {
                  "iceberg.enabled":"true"
                }
              },
              {
                "Classification": "spark-hive-site",
                "Properties": {
                  "hive.metastore.client.factory.class":        
                     "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
                }
              }
            ]

The first classification enables Iceberg. The second one configures Glue Catalog as the Metastore for Spark applications in this cluster.

* Here is a sample CLI command to create an EMR cluster. Do remember to replace <span style="color:red">EMR-STUDIO-SUBNET</span> with one of the subnets in which your EMR Studio is running:

            aws emr create-cluster \
                --name iceberg-emr-cluster\
                --use-default-roles \
                --release-label emr-6.5.0 \
                --instance-count 1 \
                --instance-type r5.4xlarge \
                --applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
                --ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
                --configurations '[{"Classification":"iceberg-defaults","Properties":{"iceberg.enabled":"true"}},{"Classification":"spark-hive-site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]'
                

* This notebook uses the `PySpark` kernel. However, most of the commands are Spark SQL commands. So we use the magic command %%sql in the beginning of those cells.
***

## Introduction
Apache Iceberg (https://iceberg.apache.org/) is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. Iceberg tracks individual data files in a table instead of directories. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time a new file is inserted to any partition in this table, a new point-in-time snapshot of all the files get created. At the query time, there is no need to list a directory to find the files we need to work with, as the snapshot already has that information pre-populated during the write time (commonly known as snapshot isolation (https://en.wikipedia.org/wiki/Snapshot_isolation) in databases).

Iceberg supports write, delete, update, and time travel operations with complete support for ACID transactions (https://en.wikipedia.org/wiki/ACID). Table changes are atomic and readers never see partial or uncommitted changes (serializable isolation (https://en.wikipedia.org/wiki/Isolation_(database_systems)#Serializable))

Iceberg table format is an open specification at multiple levels. At the catalog level, you can plugin multiple types of catalogs such as hive, hadoop, AWS Glue Data Catalog etc. All these can co-exist. You can join tables across different types of catalogs. In this example, we are going to work with Glue Data Catalog.

The post: <a href="https://quip-amazon.com/xVv5A0WWPaxq/Iceberg-on-EMR-Blog-Post-v2" target="_blank">Build fast, ACID compliant, evolving big data processing using Apache Iceberg on Amazon EMR</a> provides detailed information.

***

## Setup
Create an S3 bucket location to save sample dataset. In this example we use the path format: s3://<span style="color:red">YOUR-BUCKET-NAME</span>/iceberg/<span style="color:red">YOUR-CATALOG-NAME</span>/tables/ 
    
    For example: s3://EXAMPLE-BUCKET/iceberg/glue_catalog1/tables/

***


<a id="configure_iceberg"></a>
## Configuring Iceberg on Spark session

Let us create a Glue catalog. In this example notebook, we use the Glue catalog name: <span style="color:red">glue_catalog1</span>. Let us assume that the name of your catalog is <span style="color:red">YOUR-CATALOG-NAME</span>. 

* Set <span style="color:red">YOUR-CATALOG-NAME</span>.warehouse to the s3 path where you want to store your data and metadata.
* To make the catalog a Glue catalog, set <span style="color:red">YOUR-CATALOG-NAME</span>.catalog-impl to `org.apache.iceberg.aws.glue.GlueCatalog`. This key is required to point to an implementation class for any custom catalog implementation. 
* Use `org.apache.iceberg.aws.s3.S3FileIO` as the <span style="color:red">YOUR-CATALOG-NAME</span>.io-impl in order to take advantage of S3 multipart upload for high parallelism. 
* We use a DynamoDB table for lock implementation. This is optional, and is recommended for high concurrency workloads. To do that we set `lock-impl` for our glue catalog to `org.apache.iceberg.aws.glue.DynamoLockManager` and we set `lock.table` to `myGlueLockTable` as the table name so that for every commit, Glue Catalog first obtains a lock using this table and then tries to safely modify the Glue table. If you choose this option the table gets created in your own account. Note that you need to have the necessary access permissions to create and use a DynamoDB table. Further, additional DynamoDB charges apply.

In [None]:
%%configure -f
{
    "conf":  {       
             "spark.sql.catalog.glue_catalog1":"org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.glue_catalog1.warehouse":"s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/",
             "spark.sql.catalog.glue_catalog1.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
             "spark.sql.catalog.glue_catalog1.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
             "spark.sql.catalog.glue_catalog1.lock-impl":"org.apache.iceberg.aws.glue.DynamoLockManager",
             "spark.sql.catalog.glue_catalog1.lock.table":"myGlueLockTable",
             "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
           } 
}




Checking the version of spark

In [None]:
spark.version

<a id="catalogs"></a>
## Iceberg Catalogs and Namespaces
The default catalog is the `AwsDataCatalog`. Let us switch to our Glue catalog `glue_catalog1` that has support for Iceberg tables. Note that there are no namespaces. A namespace in iceberg is the same thing as a database in Glue.

In [None]:
%%sql
use glue_catalog1

In [None]:
%%sql
show current namespace

Let us create a database and switch to it

In [None]:
%%sql
CREATE SCHEMA IF NOT EXISTS salesdb;

In [None]:
%%sql
use salesdb

In [None]:
%%sql
show current namespace

<a id="create_table"></a>
## Creating an Iceberg Table

We will use Spark SQL for most of our Iceberg operations, although you could use equivalent PySpark, Scala, or Java languages to achieve all of these as well.
Let us start by creating a table. The DDL syntax looks the same as creating, say a Hive table, except that we include `USING iceberg`

In [None]:
%%sql
drop table glue_catalog1.salesdb.orders

In [None]:
%%sql
CREATE TABLE glue_catalog1.salesdb.orders
    (
      order_id              int,
      product_name          string,
      product_category      string,
      qty                   int,
      unit_price            decimal(7,2),
      order_datetime        timestamp
    )
USING iceberg
PARTITIONED BY (days(order_datetime))

In [None]:
%%sql
show tables

Since there is no data yet, we don't expect to see any snapshots created yet.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

<a id="dml"></a>
## DML Operations
Icerberg supports all DML statements to add or modify data in your data lake: Inserts to add new data, Updates to modify specific columns in specific rows in your existing data, Deletes for GDPR and CCPA compliance and Upserts when you have incoming data that may have a mix of inserts and updates. Let us look at each of them now.

<a id="inserts"></a>
### Inserts
Let us insert our first record. 

In [None]:
%%sql
INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        1, 
        'Harry Potter and the Prisoner of Azkaban',
        'Books',
        2,
        7.99,
        current_timestamp()
    )

Notice that DML statements do result in snapshots getting created. Note the `snapshot_id` and the timestamp column called `committed_at`

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

Let us insert four more records. 

In [None]:
%%sql
INSERT INTO glue_catalog1.salesdb.orders VALUES
    (
        2, 
        'Harry Potter and the Half-Blood Prince',
        'Books',
        1,
        9.99,
        date_sub(current_timestamp(), 3)
    ),
    (
        3, 
        "New Balance Mens 623 V3 Casual Comfort Cross Trainer",
        'Shoes',
        1,
        55.97,
        date_sub(current_timestamp(), 4)
    ),
    (
        4, 
        "Skechers Womens Go Walk Joy Walking Shoe",
        'Shoes',
        1,
        45.00,
        date_sub(current_timestamp(), 9)
    ),
    (
        5, 
        "Nintendo Switch with Neon Blue and Neon Red Joy‑Con - HAC-001(-01)",
        'Games',
        1,
        299.99,
        date_sub(current_timestamp(), 4)
    )

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders

Iceberg treats it as single commit, and adds just one additional snapshot, an append operation as expected.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

In [None]:
%%sql
SELECT count(*) FROM glue_catalog1.salesdb.orders;

<a id="deletes"></a>
### Deletes
GDPR and CCPA regulations mandate timely removal of individual customer data and other records from datasets. Iceberg is designed to be able to handle these trivially.
Now let us delete a record from our Iceberg table.

In [None]:
%%sql
DELETE FROM glue_catalog1.salesdb.orders
WHERE order_datetime < date_sub(current_timestamp(), 1)
AND order_datetime > date_sub(current_timestamp(), 4)

The book with order_id 2 happens to be within this date range and has been deleted.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

A delete marker shows up as an overwrite operation.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

You can query the number of files deleted from your manifests too.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.manifests;

In [None]:
%%sql
SELECT count(*) FROM glue_catalog1.salesdb.orders;

<a id="updates"></a>
### Updates
What if we want to go back and update an existing record? Let's change the `qty` for our `order_id` 5 from 1 to 10 Nintendo Switches. Iceberg allows updates using a simple `UPDATE` and`SET` clause added to your query

In [None]:
%%sql
UPDATE glue_catalog1.salesdb.orders
SET qty = 10
WHERE order_id = 5

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

As you can see below, Iceberg has added another snapshot with `overwrite` operation for updating the `qty` of Nintendo Switches.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

<a id="upserts"></a>
### Upserts
How about, if we get some incoming data and we don't know if those keys exist or not in our dataset? This is a common scenario when applying Change Data Capture(CDC) data on your data lake, for example. Iceberg makes it easy to merge both inserting new data and updating to existing data into your data lake with a single `MERGE INTO` statement.

Before we look into the `MERGE INTO` statement, we first need some source data that has some new records to insert as well as some updates to existing records. We store this data in a table called `glue_catalog1.salesdb.orders_update`. First we create this table.

In [None]:
%%sql
drop table glue_catalog1.salesdb.orders_update;

In [None]:
%%sql
create table glue_catalog1.salesdb.orders_update as select * from glue_catalog1.salesdb.orders limit 0;

Let us assume that our CDC data comprises one new purchase of 10 books and an update of a previous order for shows. We add the CDC records to this table, one with a new new `order_id` (99) and one with existing `order_id` (3).

In [None]:
%%sql
INSERT INTO glue_catalog1.salesdb.orders_update VALUES 
    (
        3, 
        "New Balance Mens 623 V3 Casual Comfort Cross Trainer",
        'Shoes',
        2,
        40.00,
        current_timestamp()
    ),
    (
        99, 
        'Harry Potter and the Sorcerers Stone',
        'Books',
        10,
        9.99,
        current_timestamp()
    )

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders_update;

Now that we have our source data ready, we can now use the `MERGE INTO` statement to upsert data to our `orders` table.

In [None]:
%%sql
MERGE INTO glue_catalog1.salesdb.orders target 
USING glue_catalog1.salesdb.orders_update source          
ON target.order_id = source.order_id              
WHEN MATCHED THEN 
    UPDATE SET
        order_id = source.order_id,
        product_name = source.product_name,
        product_category = source.product_category,
        qty = source.qty,
        unit_price = source.unit_price,
        order_datetime = source.order_datetime
WHEN NOT MATCHED THEN
    INSERT *

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders;

Iceberg also lets us query the metadata such as the actual `files` that are created including `file_format`, `partition` and lot more statistics as shown below. These can be handy when troubleshooting data quality and performance issues.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.files;

<a id="schema_evolution"></a>
## Schema Evolution
Borrowing from the way columns work in databases, Iceberg tracks columns by using unique IDs and not by the column name. As long as the ID is the same, all the data still remains. You can safely add, drop, rename, update, or even reorder columns. You don’t have to rewrite the data for this. Schema evolution gets first class citizen treatment in Iceberg. Your ingest and read queries now have the freedom to be evolved without having to hide the schema inside JSON blobs.

<a id="renaming_columns"></a>
### Renaming Columns
In Iceberg, since columns are not tracked by name, but using unique IDs instead, renaming a column is a simple metadata change. There is no data movement. Data lakes are increasingly looking like databases!

In [None]:
%%sql
ALTER TABLE glue_catalog1.salesdb.orders RENAME COLUMN qty TO quantity

In [None]:
%%sql
desc table glue_catalog1.salesdb.orders

No new snapshots created for a DDL operation like a column rename. Snapshots are created only when there is a change in the data.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

Let us check what is in our table.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

<a id="adding_columns"></a>
### Adding Columns
Now we are going to add another column called `discount`. Iceberg also allows documenting the purpose for each column as `comment`, which helps a lot in a collaborative environment and quick lookup of data from business users.

In [None]:
%%sql
ALTER TABLE glue_catalog1.salesdb.orders
ADD COLUMNS (
    discount decimal(7,2) comment 'discount applied to this order'
  )

However, as you can see below, when querying the table, the new column does not get displayed yet. In Iceberg tables the columns that do not have any data in your query results, do not show up in the output.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

However, we can describe our table to see that a new column `discount` did get added.

In [None]:
%%sql
desc glue_catalog1.salesdb.orders

Let us now insert another record with a value added for the `discount` column. Our customer here has earned the right to get a discount as they are purchasing the entire set of the Harry Potter series!

In [None]:
%%sql
INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        6, 
        'Harry Potter Paperback Box Set',
        'Books',
        1,
        39.99,
        current_timestamp(),
        0.1
    )

Now you can see that the `discount` column shows up, when querying.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

<a id="dropping_columns"></a>
### Dropping Columns
Now, there is a change in business requirements, we are not interested in the `discount` column anymore and need to remove that column from our table. Iceberg allows us to do that easily.

In [None]:
%%sql
ALTER TABLE glue_catalog1.salesdb.orders
DROP COLUMN discount

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

Dropping a column is purely a metadata operation in Iceberg. No new snapshots are created. Let us take a look at our snapshots before getting into Time Travel.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.snapshots;

<a id="time_travel"></a>
## Time Travel
Let us query our table as of the previous snapshot. SparkSQL does not provide a syntax for time travel yet. So we use PySpark for this. 

First let us capture all the snapshot timestamps in an array so that we can use the elements in the array to travel back and forth in time. Here we query the `commited_at` column from the Iceberg table and store its values in the `snapshotTimes` array.

As you can see, the shoes and the Nintendo switch were added as part of a recent commit don't show up in our point-in-time historical query. They still exist in the table though.

In [None]:
snapshotTimes = spark.sql("select committed_at as commitTime from  glue_catalog1.salesdb.orders.snapshots order by commitTime").collect()
print("snapshotTimes: ")
for elem in snapshotTimes: print(elem)

Iceberg provides a spark read option `as-of-timestamp` that takes the timestamp in milliseconds since epoch as a value for the time that we want to travel to. To get this, we write a simple python function `time_millis` as shown below:

In [None]:
import datetime
epoch = datetime.datetime.utcfromtimestamp(0)
def time_millis(timestamp):
    return int((timestamp - epoch).total_seconds() * 1000.0)

In our example, we have 5 snapshots indexed from [0] through [4]. For example, to get to the state of the table after the second snapshot, we use the timestamp `snapshotTimes[1][0]`

In [None]:
print("State of the table as of time: " + str(snapshotTimes[1][0]))
snapshotTimeMillis = time_millis(snapshotTimes[1][0])
spark.read.option("as-of-timestamp", snapshotTimeMillis).format("iceberg").load("glue_catalog1.salesdb.orders").show(5,False)

You can time travel to any given snapshot and see the state of the table as of different timestamps: snapshotTimes[3][0] or snapshotTimes[4][0]. You could also directly use the snapshot_id value from the snapshots table as shown below. Here we query the state of the table after the very first insert by choosing the snapshot value of `snapshotIDs[0][0]`:

In [None]:
snapshotIDs = spark.sql("select snapshot_id as snapshot from  glue_catalog1.salesdb.orders.snapshots").collect()
print("snapshots: ")
for elem in snapshotIDs: print(elem)

snapshotID = snapshotIDs[0][0]
spark.read.option("snapshot-id", snapshotID).format("iceberg").load("glue_catalog1.salesdb.orders").show(5,False)

Iceberg does give us a way to look at the history of changes to our table using the `history` metadata table.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.history;

<a id="rollback"></a>
### Rollback
To undo the recent changes, we can execute Iceberg stored procedures using `CALL` statement to rollback the state of the table to any historical commit using `rollback_to_snapshot` stored procedure. We could also use `rollback_to_timestamp`.

In [None]:
snapshotID = snapshotIDs[0][0]
query = "CALL glue_catalog1.system.rollback_to_snapshot('salesdb.orders', {})".format(snapshotID)
spark.sql(query)

Our table is now back to a single record after this rollback.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

Querying the `history` metadata table gives us the big picture. The column `is_current_ancestor` indicates how many commits or snapshots we have traveled back and which snapshot our metastore is currently pointing to. In this case the first snapshot shows up again at the end as it now the current snapshot.  This information can be tremendously helpful when managing rollbacks and roll forwards.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.history;

<a id="roll_forward"></a>
### Roll Forward

Before we roll forward our table to a more recent state, let us query the recent snapshot to make sure this is where we want to be. switching to PySpark to show this.

In [None]:
snapshotIDs = spark.sql("select snapshot_id as snapshot from  glue_catalog1.salesdb.orders.snapshots").collect()
print("snapshots: ")
for elem in snapshotIDs: print(elem)

snapshotID = snapshotIDs[5][0]
spark.read.option("snapshot-id", snapshotID).format("iceberg").load("glue_catalog1.salesdb.orders").show(10,False)

Now we can `CALL` Iceberg's `set_current_snapshot` stored procedure to move our metastore pointer to any existing snapshot we are interested in.

In [None]:
snapshotID = snapshotIDs[5][0]
query = "CALL glue_catalog1.system.set_current_snapshot('salesdb.orders', {})".format(snapshotID)
spark.sql(query)
spark.sql("select * from glue_catalog1.salesdb.orders").show()

You can now query the table to see that the table represents the state as of the point in time you chose above by selecting the snapshotID of interest.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

We can now see that `is_current_ancestor` now shows `True` for all snapshots as we have not skipped any commits.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders.history;

<a id="partition_evolution"></a>
## Partition Evolution
Let us look at the partitions we have in our table by querying the `partitions` metadata table. Iceberg keeps track of how many records (`record_count` column) and how many files (`file_count` column) are present in each partition. This is a very handy tool that could be used for performance and data quality related troubleshooting and diagnostics.

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders.partitions

Let us list our s3 bucket location to see the partitions. Remember to replace <span style="color:red">YOUR-BUCKET-NAME</span> with your bucket name and if you use different prefixes, update the path as applicable. Notice that there is one partition for each day because we had `PARTITIONED BY` the partition transform `days(order_datetime)` 

In [None]:
%%sh
aws s3 ls s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/

Let us assume one year down the time line, we realize we need to add hourly partitions. Iceberg allows us to add partitions without having to perform any data movement or any additional changes to the underlying data. `ADD PARTITION FIELD` is a simple metadata operation.

In [None]:
%%sql
ALTER TABLE glue_catalog1.salesdb.orders ADD PARTITION FIELD hours(order_datetime)

We can continue to use the old partition on the old data. There is no change to the underlying partition structure on existing data as shown below (Again remember to replace <span style="color:red">YOUR-BUCKET-NAME</span> with your bucket name and if you use different prefixes, update the path as applicable):

In [None]:
%%sh
aws s3 ls s3://sYOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/

However when we start inserting new data, the newer files will follow the new partition structure as per our new Partition Spec.

In [None]:
%%sql
INSERT INTO glue_catalog1.salesdb.orders VALUES 
    (
        7, 
        'Harry Potter and the Chamber of Secrets - Hardcover',
        'Books',
        3,
        18.99,
        current_timestamp()
    )

Before running the following cell, replace <span style="color:red">YOUR-BUCKET-NAME</span> with your bucket name and if you use different prefixes, update the path as applicable. 

Note the date partition that you inserted the record into. You will need this in the next step.

In [None]:
%%sh
aws s3 ls s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/

Iceberg adds the new hourly partition under the day partition under which we inserted our new record. Confirm that is the case by listing the contents of the parent partition in your S3 bucket which is a date in YYYY-MM-DD format (e.g. s3://<span style="color:red">YOUR-BUCKET-NAME</span>/iceberg/glue_catalog1/tables/salesdb.db/orders/data/order_datetime_day=<span style="color:red">2022-01-12</span>/). You made a note of this earlier . Replace <span style="color:red">YOUR-BUCKET-NAME</span> with your bucket name and if you use different prefixes, update the path as applicable.

Note the hour appended in the end of your `order_datetime_hour` value. You will use this in the next step.

In [None]:
%%sh
aws s3 ls s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/order_datetime_day=2022-02-09/

In [None]:
%%sql
select * from glue_catalog1.salesdb.orders

Let us query our table using the new hourly partition. In the cell below, replace <span style="color:red">recently-inserted-hour</span> with the hour value noted above. For example, hour(order_datetime)=<span style="color:red">21</span>

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=3

In [None]:
spark.sql("SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=3").show()

We can continue to query our old data with using the `day()` transform. There is only the original `order_datetime` column in the table. We don't have to store additional columns to accommodate multiple paritioning schemes. Everything is in the metadata giving us immense flexibility and making our data lake forward looking!

In the cell below, replace <span style="color:red">1</span> with a <span style="color:red">day</span> value within the range of the timestamps inserted in your `order_datetime` column.

In [None]:
%%sql
SELECT * FROM glue_catalog1.salesdb.orders where day(order_datetime)>=1