{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%session_id_prefix native-delta-sql-s3path\n", "%glue_version 3.0\n", "%idle_timeout 60\n", "%%configure \n", "{\n", " \"--conf\": \"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " \"--datalake-formats\": \"delta\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket_name = \"\"\n", "bucket_prefix = \"\"\n", "database_name = \"delta_sql_s3path\"\n", "database_prefix = f\"{bucket_prefix}/{database_name}\"\n", "database_location = f\"s3://{bucket_name}/{database_prefix}/\"\n", "table_name = \"products\"\n", "table_prefix = f\"{database_prefix}/{table_name}\"\n", "table_location = f\"s3://{bucket_name}/{table_prefix}/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up existing resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "\n", "## Delete files in S3\n", "s3 = boto3.resource('s3')\n", "bucket = s3.Bucket(bucket_name)\n", "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Delta table with sample data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Row\n", "import time\n", "\n", "ut = time.time()\n", "\n", "product = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n", "]\n", "\n", "df_products = spark.createDataFrame(Row(**x) for x in product)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_products.write.format(\"delta\"). \\\n", " mode(\"overwrite\"). \\\n", " save(table_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read from Delta Lake table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Insert records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "query=f\"\"\"INSERT INTO delta.`s3://{bucket_name}/{table_prefix}` VALUES('00006', 'Pen', 30,'Stationery',{ut}), ('00007', 'Book', 500,'Stationery',{ut})\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "query=f\"\"\"UPDATE delta.`s3://{bucket_name}/{table_prefix}` SET price=300, updated_at={ut} WHERE product_id == '00007'\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Upsert records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "product_updates = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n", " {'product_id': '00008', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n", "]\n", "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n", "\n", "df_product_updates.createOrReplaceTempView(\"tmp_products_updates\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"MERGE INTO delta.`s3://{bucket_name}/{table_prefix}` AS old \n", "USING tmp_products_updates AS new \n", "ON old.product_id=new.product_id \n", "WHEN MATCHED THEN \n", "UPDATE SET \n", " old.product_name=new.product_name,\n", " old.price=new.price,\n", " old.category=new.category,\n", " old.updated_at=new.updated_at\n", "WHEN NOT MATCHED \n", "THEN INSERT (product_id, product_name, price,category,updated_at) \n", "VALUES ( \n", " new.product_id, \n", " new.product_name, \n", " new.price, \n", " new.category, \n", " new.updated_at \n", ")\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Alter DeltaLake table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"ALTER TABLE delta.`s3://{bucket_name}/{table_prefix}` ADD COLUMNS (CURRENCY STRING AFTER PRICE)\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query=f\"\"\"UPDATE delta.`s3://{bucket_name}/{table_prefix}` SET CURRENCY =\"INR\" \"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"DELETE FROM delta.`s3://{bucket_name}/{table_prefix}` WHERE product_name == \"Pen\" \"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## View History" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"DESCRIBE HISTORY delta.`s3://{bucket_name}/{table_prefix}` \"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query with time travel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from delta import *\n", "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@v5`\"\"\" #Using a version number\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You need timestamp value in yyyyMMddHHmmssSSS format, and replace the folloiwng timestamp with the value." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@20220414145923000`\"\"\" #by passing the timestamp in yyyyMMddHHmmssSSS format to the path\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: Spark SQL does not support TIMESTAMP AS OF/VERSION keywords as of now https://github.com/delta-io/delta/issues/128 #https://issues.apache.org/jira/browse/SPARK-34978" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Roll Back" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "##Fix accidental deletes to product_name pen\n", "query = f\"\"\"INSERT INTO delta.`s3://{bucket_name}/{table_prefix}`\n", " SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@v5`\n", " WHERE product_id = 00006\"\"\"\n", "\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "##Fix accidental updates to product_id 00005. First we update the price of product_id 00005\n", "\n", "ut=time.time()\n", "query=f\"\"\"UPDATE delta.`s3://{bucket_name}/{table_prefix}` SET price=100,updated_at={ut} WHERE product_id == '00005'\"\"\"\n", "\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"DESCRIBE HISTORY delta.`s3://{bucket_name}/{table_prefix}` \"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Roll Back the update just made\n", "query=f\"\"\"MERGE INTO delta.`s3://{bucket_name}/{table_prefix}` dest\n", "USING delta.`s3://{bucket_name}/{table_prefix}@v7` src\n", "ON src.product_id = dest.product_id\n", "WHEN MATCHED THEN UPDATE SET * \"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n", "spark.sql(query).show(truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Stop Session" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "kernelspec": { "display_name": "Glue PySpark", "language": "python", "name": "glue_pyspark" } }, "nbformat": 4, "nbformat_minor": 4 }