{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Iceberg Example Notebook\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Topics covered in this example\n", "\n", "1) [Configuring Iceberg](#configure_iceberg)
\n", "2) [Iceberg Catalogs and Namespaces](#catalogs)
\n", "3) [Creating an Iceberg Table](#create_table)
\n", "4) [DML Statements](#dml)
\n", "    a) [Inserts](#inserts)
\n", "    b) [Deletes](#deletes)
\n", "    c) [Upserts](#upserts)
\n", "    d) [Updates](#updates)
\n", "5) [Schema Evolution](#schema_evolution)
\n", "    a) [Renaming Columns](#renaming_columns)
\n", "    b) [Adding Columns](#adding_columns)
\n", "    c) [Dropping Columns](#dropping_columns)
\n", "6) [Time Travel](#time_travel)
\n", "    a) [Rollback](#rollback)
\n", "    b) [Roll Forward](#roll_forward)
\n", "7) [Partition Evolution](#partition_evolution)
\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* This notebook was tested using a single node r5.4xlarge EMR 6.5 cluster. Iceberg 0.12.0, Spark 3.1.2\n", "* To run this notebook\n", " - Launch an EMR 6.5+ cluster in one of the subnets on which this EMR Studio is running.\n", " - Launch the cluster with the following configuration classifications:\n", "\n", " \n", " [\n", " {\n", " \"Classification\": \"iceberg-defaults\",\n", " \"Properties\": {\n", " \"iceberg.enabled\":\"true\"\n", " }\n", " },\n", " {\n", " \"Classification\": \"spark-hive-site\",\n", " \"Properties\": {\n", " \"hive.metastore.client.factory.class\": \n", " \"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory\"\n", " }\n", " }\n", " ]\n", "\n", "The first classification enables Iceberg. The second one configures Glue Catalog as the Metastore for Spark applications in this cluster.\n", "\n", "* Here is a sample CLI command to create an EMR cluster. Do remember to replace EMR-STUDIO-SUBNET with one of the subnets in which your EMR Studio is running:\n", "\n", " aws emr create-cluster \\\n", " --name iceberg-emr-cluster\\\n", " --use-default-roles \\\n", " --release-label emr-6.5.0 \\\n", " --instance-count 1 \\\n", " --instance-type r5.4xlarge \\\n", " --applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \\\n", " --ec2-attributes SubnetId=\\\n", " --configurations '[{\"Classification\":\"iceberg-defaults\",\"Properties\":{\"iceberg.enabled\":\"true\"}},{\"Classification\":\"spark-hive-site\",\"Properties\":{\"hive.metastore.client.factory.class\":\"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory\"}}]'\n", " \n", "\n", "* This notebook uses the `PySpark` kernel. However, most of the commands are Spark SQL commands. So we use the magic command %%sql in the beginning of those cells.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "Apache Iceberg (https://iceberg.apache.org/) is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. Iceberg tracks individual data files in a table instead of directories. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time a new file is inserted to any partition in this table, a new point-in-time snapshot of all the files get created. At the query time, there is no need to list a directory to find the files we need to work with, as the snapshot already has that information pre-populated during the write time (commonly known as snapshot isolation (https://en.wikipedia.org/wiki/Snapshot_isolation) in databases).\n", "\n", "Iceberg supports write, delete, update, and time travel operations with complete support for ACID transactions (https://en.wikipedia.org/wiki/ACID). Table changes are atomic and readers never see partial or uncommitted changes (serializable isolation (https://en.wikipedia.org/wiki/Isolation_(database_systems)#Serializable))\n", "\n", "Iceberg table format is an open specification at multiple levels. At the catalog level, you can plugin multiple types of catalogs such as hive, hadoop, AWS Glue Data Catalog etc. All these can co-exist. You can join tables across different types of catalogs. In this example, we are going to work with Glue Data Catalog.\n", "\n", "The post: Build fast, ACID compliant, evolving big data processing using Apache Iceberg on Amazon EMR provides detailed information.\n", "\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "Create an S3 bucket location to save sample dataset. In this example we use the path format: s3://YOUR-BUCKET-NAME/iceberg/YOUR-CATALOG-NAME/tables/ \n", " \n", " For example: s3://EXAMPLE-BUCKET/iceberg/glue_catalog1/tables/\n", "\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "## Configuring Iceberg on Spark session\n", "\n", "Let us create a Glue catalog. In this example notebook, we use the Glue catalog name: glue_catalog1. Let us assume that the name of your catalog is YOUR-CATALOG-NAME. \n", "\n", "* Set YOUR-CATALOG-NAME.warehouse to the s3 path where you want to store your data and metadata.\n", "* To make the catalog a Glue catalog, set YOUR-CATALOG-NAME.catalog-impl to `org.apache.iceberg.aws.glue.GlueCatalog`. This key is required to point to an implementation class for any custom catalog implementation. \n", "* Use `org.apache.iceberg.aws.s3.S3FileIO` as the YOUR-CATALOG-NAME.io-impl in order to take advantage of S3 multipart upload for high parallelism. \n", "* We use a DynamoDB table for lock implementation. This is optional, and is recommended for high concurrency workloads. To do that we set `lock-impl` for our glue catalog to `org.apache.iceberg.aws.glue.DynamoLockManager` and we set `lock.table` to `myGlueLockTable` as the table name so that for every commit, Glue Catalog first obtains a lock using this table and then tries to safely modify the Glue table. If you choose this option the table gets created in your own account. Note that you need to have the necessary access permissions to create and use a DynamoDB table. Further, additional DynamoDB charges apply." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\": { \n", " \"spark.sql.catalog.glue_catalog1\":\"org.apache.iceberg.spark.SparkCatalog\",\n", " \"spark.sql.catalog.glue_catalog1.warehouse\":\"s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/\",\n", " \"spark.sql.catalog.glue_catalog1.catalog-impl\":\"org.apache.iceberg.aws.glue.GlueCatalog\",\n", " \"spark.sql.catalog.glue_catalog1.io-impl\":\"org.apache.iceberg.aws.s3.S3FileIO\",\n", " \"spark.sql.catalog.glue_catalog1.lock-impl\":\"org.apache.iceberg.aws.glue.DynamoLockManager\",\n", " \"spark.sql.catalog.glue_catalog1.lock.table\":\"myGlueLockTable\",\n", " \"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"\n", " } \n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "\n", "Checking the version of spark" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "spark.version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Iceberg Catalogs and Namespaces\n", "The default catalog is the `AwsDataCatalog`. Let us switch to our Glue catalog `glue_catalog1` that has support for Iceberg tables. Note that there are no namespaces. A namespace in iceberg is the same thing as a database in Glue." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "use glue_catalog1" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "show current namespace" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us create a database and switch to it" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "CREATE SCHEMA IF NOT EXISTS salesdb;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "use salesdb" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "show current namespace" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Creating an Iceberg Table\n", "\n", "We will use Spark SQL for most of our Iceberg operations, although you could use equivalent PySpark, Scala, or Java languages to achieve all of these as well.\n", "Let us start by creating a table. The DDL syntax looks the same as creating, say a Hive table, except that we include `USING iceberg`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "drop table glue_catalog1.salesdb.orders" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "CREATE TABLE glue_catalog1.salesdb.orders\n", " (\n", " order_id int,\n", " product_name string,\n", " product_category string,\n", " qty int,\n", " unit_price decimal(7,2),\n", " order_datetime timestamp\n", " )\n", "USING iceberg\n", "PARTITIONED BY (days(order_datetime))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "show tables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since there is no data yet, we don't expect to see any snapshots created yet." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## DML Operations\n", "Icerberg supports all DML statements to add or modify data in your data lake: Inserts to add new data, Updates to modify specific columns in specific rows in your existing data, Deletes for GDPR and CCPA compliance and Upserts when you have incoming data that may have a mix of inserts and updates. Let us look at each of them now." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Inserts\n", "Let us insert our first record. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "INSERT INTO glue_catalog1.salesdb.orders VALUES \n", " (\n", " 1, \n", " 'Harry Potter and the Prisoner of Azkaban',\n", " 'Books',\n", " 2,\n", " 7.99,\n", " current_timestamp()\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that DML statements do result in snapshots getting created. Note the `snapshot_id` and the timestamp column called `committed_at`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us insert four more records. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "INSERT INTO glue_catalog1.salesdb.orders VALUES\n", " (\n", " 2, \n", " 'Harry Potter and the Half-Blood Prince',\n", " 'Books',\n", " 1,\n", " 9.99,\n", " date_sub(current_timestamp(), 3)\n", " ),\n", " (\n", " 3, \n", " \"New Balance Mens 623 V3 Casual Comfort Cross Trainer\",\n", " 'Shoes',\n", " 1,\n", " 55.97,\n", " date_sub(current_timestamp(), 4)\n", " ),\n", " (\n", " 4, \n", " \"Skechers Womens Go Walk Joy Walking Shoe\",\n", " 'Shoes',\n", " 1,\n", " 45.00,\n", " date_sub(current_timestamp(), 9)\n", " ),\n", " (\n", " 5, \n", " \"Nintendo Switch with Neon Blue and Neon Red Joy‑Con - HAC-001(-01)\",\n", " 'Games',\n", " 1,\n", " 299.99,\n", " date_sub(current_timestamp(), 4)\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Iceberg treats it as single commit, and adds just one additional snapshot, an append operation as expected." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT count(*) FROM glue_catalog1.salesdb.orders;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Deletes\n", "GDPR and CCPA regulations mandate timely removal of individual customer data and other records from datasets. Iceberg is designed to be able to handle these trivially.\n", "Now let us delete a record from our Iceberg table." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "DELETE FROM glue_catalog1.salesdb.orders\n", "WHERE order_datetime < date_sub(current_timestamp(), 1)\n", "AND order_datetime > date_sub(current_timestamp(), 4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The book with order_id 2 happens to be within this date range and has been deleted." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A delete marker shows up as an overwrite operation." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can query the number of files deleted from your manifests too." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.manifests;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT count(*) FROM glue_catalog1.salesdb.orders;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Updates\n", "What if we want to go back and update an existing record? Let's change the `qty` for our `order_id` 5 from 1 to 10 Nintendo Switches. Iceberg allows updates using a simple `UPDATE` and`SET` clause added to your query" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "UPDATE glue_catalog1.salesdb.orders\n", "SET qty = 10\n", "WHERE order_id = 5" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see below, Iceberg has added another snapshot with `overwrite` operation for updating the `qty` of Nintendo Switches." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Upserts\n", "How about, if we get some incoming data and we don't know if those keys exist or not in our dataset? This is a common scenario when applying Change Data Capture(CDC) data on your data lake, for example. Iceberg makes it easy to merge both inserting new data and updating to existing data into your data lake with a single `MERGE INTO` statement.\n", "\n", "Before we look into the `MERGE INTO` statement, we first need some source data that has some new records to insert as well as some updates to existing records. We store this data in a table called `glue_catalog1.salesdb.orders_update`. First we create this table." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "drop table glue_catalog1.salesdb.orders_update;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "create table glue_catalog1.salesdb.orders_update as select * from glue_catalog1.salesdb.orders limit 0;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us assume that our CDC data comprises one new purchase of 10 books and an update of a previous order for shows. We add the CDC records to this table, one with a new new `order_id` (99) and one with existing `order_id` (3)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "INSERT INTO glue_catalog1.salesdb.orders_update VALUES \n", " (\n", " 3, \n", " \"New Balance Mens 623 V3 Casual Comfort Cross Trainer\",\n", " 'Shoes',\n", " 2,\n", " 40.00,\n", " current_timestamp()\n", " ),\n", " (\n", " 99, \n", " 'Harry Potter and the Sorcerers Stone',\n", " 'Books',\n", " 10,\n", " 9.99,\n", " current_timestamp()\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders_update;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have our source data ready, we can now use the `MERGE INTO` statement to upsert data to our `orders` table." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "MERGE INTO glue_catalog1.salesdb.orders target \n", "USING glue_catalog1.salesdb.orders_update source \n", "ON target.order_id = source.order_id \n", "WHEN MATCHED THEN \n", " UPDATE SET\n", " order_id = source.order_id,\n", " product_name = source.product_name,\n", " product_category = source.product_category,\n", " qty = source.qty,\n", " unit_price = source.unit_price,\n", " order_datetime = source.order_datetime\n", "WHEN NOT MATCHED THEN\n", " INSERT *" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Iceberg also lets us query the metadata such as the actual `files` that are created including `file_format`, `partition` and lot more statistics as shown below. These can be handy when troubleshooting data quality and performance issues." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.files;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Schema Evolution\n", "Borrowing from the way columns work in databases, Iceberg tracks columns by using unique IDs and not by the column name. As long as the ID is the same, all the data still remains. You can safely add, drop, rename, update, or even reorder columns. You don’t have to rewrite the data for this. Schema evolution gets first class citizen treatment in Iceberg. Your ingest and read queries now have the freedom to be evolved without having to hide the schema inside JSON blobs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Renaming Columns\n", "In Iceberg, since columns are not tracked by name, but using unique IDs instead, renaming a column is a simple metadata change. There is no data movement. Data lakes are increasingly looking like databases!" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE glue_catalog1.salesdb.orders RENAME COLUMN qty TO quantity" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "desc table glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "No new snapshots created for a DDL operation like a column rename. Snapshots are created only when there is a change in the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us check what is in our table." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Adding Columns\n", "Now we are going to add another column called `discount`. Iceberg also allows documenting the purpose for each column as `comment`, which helps a lot in a collaborative environment and quick lookup of data from business users." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE glue_catalog1.salesdb.orders\n", "ADD COLUMNS (\n", " discount decimal(7,2) comment 'discount applied to this order'\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, as you can see below, when querying the table, the new column does not get displayed yet. In Iceberg tables the columns that do not have any data in your query results, do not show up in the output." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, we can describe our table to see that a new column `discount` did get added." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "desc glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us now insert another record with a value added for the `discount` column. Our customer here has earned the right to get a discount as they are purchasing the entire set of the Harry Potter series!" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "INSERT INTO glue_catalog1.salesdb.orders VALUES \n", " (\n", " 6, \n", " 'Harry Potter Paperback Box Set',\n", " 'Books',\n", " 1,\n", " 39.99,\n", " current_timestamp(),\n", " 0.1\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now you can see that the `discount` column shows up, when querying." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Dropping Columns\n", "Now, there is a change in business requirements, we are not interested in the `discount` column anymore and need to remove that column from our table. Iceberg allows us to do that easily." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE glue_catalog1.salesdb.orders\n", "DROP COLUMN discount" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dropping a column is purely a metadata operation in Iceberg. No new snapshots are created. Let us take a look at our snapshots before getting into Time Travel." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.snapshots;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Time Travel\n", "Let us query our table as of the previous snapshot. SparkSQL does not provide a syntax for time travel yet. So we use PySpark for this. \n", "\n", "First let us capture all the snapshot timestamps in an array so that we can use the elements in the array to travel back and forth in time. Here we query the `commited_at` column from the Iceberg table and store its values in the `snapshotTimes` array.\n", "\n", "As you can see, the shoes and the Nintendo switch were added as part of a recent commit don't show up in our point-in-time historical query. They still exist in the table though." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "snapshotTimes = spark.sql(\"select committed_at as commitTime from glue_catalog1.salesdb.orders.snapshots order by commitTime\").collect()\n", "print(\"snapshotTimes: \")\n", "for elem in snapshotTimes: print(elem)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Iceberg provides a spark read option `as-of-timestamp` that takes the timestamp in milliseconds since epoch as a value for the time that we want to travel to. To get this, we write a simple python function `time_millis` as shown below:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import datetime\n", "epoch = datetime.datetime.utcfromtimestamp(0)\n", "def time_millis(timestamp):\n", " return int((timestamp - epoch).total_seconds() * 1000.0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In our example, we have 5 snapshots indexed from [0] through [4]. For example, to get to the state of the table after the second snapshot, we use the timestamp `snapshotTimes[1][0]`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "print(\"State of the table as of time: \" + str(snapshotTimes[1][0]))\n", "snapshotTimeMillis = time_millis(snapshotTimes[1][0])\n", "spark.read.option(\"as-of-timestamp\", snapshotTimeMillis).format(\"iceberg\").load(\"glue_catalog1.salesdb.orders\").show(5,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can time travel to any given snapshot and see the state of the table as of different timestamps: snapshotTimes[3][0] or snapshotTimes[4][0]. You could also directly use the snapshot_id value from the snapshots table as shown below. Here we query the state of the table after the very first insert by choosing the snapshot value of `snapshotIDs[0][0]`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "snapshotIDs = spark.sql(\"select snapshot_id as snapshot from glue_catalog1.salesdb.orders.snapshots\").collect()\n", "print(\"snapshots: \")\n", "for elem in snapshotIDs: print(elem)\n", "\n", "snapshotID = snapshotIDs[0][0]\n", "spark.read.option(\"snapshot-id\", snapshotID).format(\"iceberg\").load(\"glue_catalog1.salesdb.orders\").show(5,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Iceberg does give us a way to look at the history of changes to our table using the `history` metadata table." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.history;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Rollback\n", "To undo the recent changes, we can execute Iceberg stored procedures using `CALL` statement to rollback the state of the table to any historical commit using `rollback_to_snapshot` stored procedure. We could also use `rollback_to_timestamp`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "snapshotID = snapshotIDs[0][0]\n", "query = \"CALL glue_catalog1.system.rollback_to_snapshot('salesdb.orders', {})\".format(snapshotID)\n", "spark.sql(query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our table is now back to a single record after this rollback." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Querying the `history` metadata table gives us the big picture. The column `is_current_ancestor` indicates how many commits or snapshots we have traveled back and which snapshot our metastore is currently pointing to. In this case the first snapshot shows up again at the end as it now the current snapshot. This information can be tremendously helpful when managing rollbacks and roll forwards." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.history;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Roll Forward\n", "\n", "Before we roll forward our table to a more recent state, let us query the recent snapshot to make sure this is where we want to be. switching to PySpark to show this." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "snapshotIDs = spark.sql(\"select snapshot_id as snapshot from glue_catalog1.salesdb.orders.snapshots\").collect()\n", "print(\"snapshots: \")\n", "for elem in snapshotIDs: print(elem)\n", "\n", "snapshotID = snapshotIDs[5][0]\n", "spark.read.option(\"snapshot-id\", snapshotID).format(\"iceberg\").load(\"glue_catalog1.salesdb.orders\").show(10,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can `CALL` Iceberg's `set_current_snapshot` stored procedure to move our metastore pointer to any existing snapshot we are interested in." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "snapshotID = snapshotIDs[5][0]\n", "query = \"CALL glue_catalog1.system.set_current_snapshot('salesdb.orders', {})\".format(snapshotID)\n", "spark.sql(query)\n", "spark.sql(\"select * from glue_catalog1.salesdb.orders\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can now query the table to see that the table represents the state as of the point in time you chose above by selecting the snapshotID of interest." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now see that `is_current_ancestor` now shows `True` for all snapshots as we have not skipped any commits." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders.history;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Partition Evolution\n", "Let us look at the partitions we have in our table by querying the `partitions` metadata table. Iceberg keeps track of how many records (`record_count` column) and how many files (`file_count` column) are present in each partition. This is a very handy tool that could be used for performance and data quality related troubleshooting and diagnostics." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders.partitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us list our s3 bucket location to see the partitions. Remember to replace YOUR-BUCKET-NAME with your bucket name and if you use different prefixes, update the path as applicable. Notice that there is one partition for each day because we had `PARTITIONED BY` the partition transform `days(order_datetime)` " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sh\n", "aws s3 ls s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us assume one year down the time line, we realize we need to add hourly partitions. Iceberg allows us to add partitions without having to perform any data movement or any additional changes to the underlying data. `ADD PARTITION FIELD` is a simple metadata operation." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE glue_catalog1.salesdb.orders ADD PARTITION FIELD hours(order_datetime)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can continue to use the old partition on the old data. There is no change to the underlying partition structure on existing data as shown below (Again remember to replace YOUR-BUCKET-NAME with your bucket name and if you use different prefixes, update the path as applicable):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sh\n", "aws s3 ls s3://sYOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However when we start inserting new data, the newer files will follow the new partition structure as per our new Partition Spec." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "INSERT INTO glue_catalog1.salesdb.orders VALUES \n", " (\n", " 7, \n", " 'Harry Potter and the Chamber of Secrets - Hardcover',\n", " 'Books',\n", " 3,\n", " 18.99,\n", " current_timestamp()\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before running the following cell, replace YOUR-BUCKET-NAME with your bucket name and if you use different prefixes, update the path as applicable. \n", "\n", "Note the date partition that you inserted the record into. You will need this in the next step." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sh\n", "aws s3 ls s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Iceberg adds the new hourly partition under the day partition under which we inserted our new record. Confirm that is the case by listing the contents of the parent partition in your S3 bucket which is a date in YYYY-MM-DD format (e.g. s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/order_datetime_day=2022-01-12/). You made a note of this earlier . Replace YOUR-BUCKET-NAME with your bucket name and if you use different prefixes, update the path as applicable.\n", "\n", "Note the hour appended in the end of your `order_datetime_hour` value. You will use this in the next step." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sh\n", "aws s3 ls s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/salesdb.db/orders/data/order_datetime_day=2022-02-09/" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "select * from glue_catalog1.salesdb.orders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us query our table using the new hourly partition. In the cell below, replace recently-inserted-hour with the hour value noted above. For example, hour(order_datetime)=21" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=3" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "spark.sql(\"SELECT * FROM glue_catalog1.salesdb.orders where hour(order_datetime)=3\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can continue to query our old data with using the `day()` transform. There is only the original `order_datetime` column in the table. We don't have to store additional columns to accommodate multiple paritioning schemes. Everything is in the metadata giving us immense flexibility and making our data lake forward looking!\n", "\n", "In the cell below, replace 1 with a day value within the range of the timestamps inserted in your `order_datetime` column." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM glue_catalog1.salesdb.orders where day(order_datetime)>=1" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }