{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "ec9c4962-e6d5-4029-9913-52dfd34eefd2", "metadata": {}, "outputs": [], "source": [ "%session_id_prefix delta-dataframe-\n", "%glue_version 3.0\n", "%idle_timeout 60\n", "%connections \n", "%%configure\n", "{\n", " \"--conf\": \"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " \"--extra-py-files\": \"/tmp/etl-delta-core_2.12-1.0.0.jar\" # for custom connector\n", " # \"--extra-py-files\": \"/tmp/delta-core_2.12-1.0.0.jar\" # for marketplace connector\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192", "metadata": {}, "outputs": [], "source": [ "bucket_name = \"\"\n", "bucket_prefix = \"\"\n", "database_name = \"delta_dataframe\"\n", "database_prefix = f\"{bucket_prefix}/{database_name}\"\n", "database_location = f\"s3://{bucket_name}/{database_prefix}/\"\n", "table_name = \"products\"\n", "table_prefix = f\"{database_prefix}/{table_name}\"\n", "table_location = f\"s3://{bucket_name}/{table_prefix}/\"" ] }, { "cell_type": "markdown", "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750", "metadata": {}, "source": [ "## Clean up existing resources" ] }, { "cell_type": "code", "execution_count": null, "id": "62dc44d1-4c48-4f24-bfce-60637972914b", "metadata": {}, "outputs": [], "source": [ "import boto3\n", "\n", "## Delete files in S3\n", "s3 = boto3.resource('s3')\n", "bucket = s3.Bucket(bucket_name)\n", "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()\n", "\n", "## Drop tables in Glue Data Catalog\n", "glue = boto3.client('glue')\n", "try:\n", " glue.delete_table(DatabaseName=database_name, Name=table_name)\n", "except glue.exceptions.EntityNotFoundException:\n", " print(f\"Table {database_name}.{table_name} does not exist\")\n", "try:\n", " glue.delete_table(DatabaseName=database_name, Name='testTable')\n", "except glue.exceptions.EntityNotFoundException:\n", " print(f\"Table {database_name}.testTable does not exist\")\n" ] }, { "cell_type": "markdown", "id": "08706080-9af8-4721-bdfa-0872e0407c68", "metadata": {}, "source": [ "## Create Delta table with sample data" ] }, { "cell_type": "code", "execution_count": null, "id": "a2d32110", "metadata": {}, "outputs": [], "source": [ "try:\n", " glue = boto3.client('glue')\n", " res = glue.get_database(Name=database_name)\n", " print(f\"Database {database_name} exists.\")\n", " if 'LocationUri' not in res['Database']:\n", " print(f\"Warning: Database {database_name} does not have Location. You need to configure location in the database.\")\n", "except glue.exceptions.EntityNotFoundException:\n", " print(f\"Database {database_name} does not exist.\")\n", " glue = glue.create_database(\n", " DatabaseInput={\n", " 'Name': database_name,\n", " 'LocationUri': database_location\n", " }\n", " )\n", " print(f\"Created a new database {database_name}.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Row\n", "import time\n", "\n", "ut = time.time()\n", "\n", "product = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n", "]\n", "\n", "df_products = spark.createDataFrame(Row(**x) for x in product)" ] }, { "cell_type": "code", "execution_count": null, "id": "9e4eca5d-b71a-43f4-963a-7841fff73c8a", "metadata": {}, "outputs": [], "source": [ "# Create table in the metastore using DataFrame's schema and write data to it\n", "df_products.write.format(\"delta\").mode(\"overwrite\").option(\"path\",table_location).saveAsTable(f\"{database_name}.{table_name}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "a0a38c88", "metadata": {}, "outputs": [], "source": [ "# Create a Deltatable with path (not in metastore) using DataFrame's schema and write/overwrite data to it\n", "df_products.write.format(\"delta\").mode(\"overwrite\").save(table_location)" ] }, { "cell_type": "code", "execution_count": null, "id": "d46e5251", "metadata": {}, "outputs": [], "source": [ "# Create a Deltatable using the DeltaTableBuilder API using a dataframe's schema\n", "from delta.tables import DeltaTable\n", "deltaTable = DeltaTable.create(spark).tableName(f\"{database_name}.testTable\").addColumns(df_products.schema).location(table_location+\"_testTable\").execute()" ] }, { "cell_type": "markdown", "id": "fda19a10-4b69-4272-99a2-1b1156e937c2", "metadata": {}, "source": [ "## Read from Delta table via DataFrame" ] }, { "cell_type": "code", "execution_count": null, "id": "14a6f45f-1cf6-4f6f-9e63-4c89d6ce2cbd", "metadata": {}, "outputs": [], "source": [ "# query table in the metastore\n", "df_products_read = spark.table(f\"{database_name}.{table_name}\")\n", "df_products_read.show()\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "d142f31d", "metadata": {}, "outputs": [], "source": [ "# query table by path\n", "df_products_read = spark.read. \\\n", " format(\"delta\"). \\\n", " load(table_location)\n", "df_products_read.show()" ] }, { "cell_type": "markdown", "id": "86dc4db0-ce1d-40d7-bbc8-b32e0e769d43", "metadata": {}, "source": [ "## Read from Delta table via DeltaLake library" ] }, { "cell_type": "code", "execution_count": null, "id": "7057cb91-d80e-4807-a3fd-faf4575adba1", "metadata": {}, "outputs": [], "source": [ "from delta.tables import *\n", "\n", "deltaTable = DeltaTable.forPath(spark, table_location) #query table from path\n", "deltaTable.toDF().show()" ] }, { "cell_type": "code", "execution_count": null, "id": "ab642097", "metadata": {}, "outputs": [], "source": [ "deltaTable = DeltaTable.forName(spark,f\"{database_name}.{table_name}\") #query table from metastore\n", "deltaTable.toDF().show()" ] }, { "cell_type": "code", "execution_count": null, "id": "59bddfcf", "metadata": {}, "outputs": [], "source": [ "deltaTabletest = DeltaTable.forName(spark,f\"{database_name}.testTable\").toDF().show()" ] }, { "cell_type": "markdown", "id": "28f357f7", "metadata": {}, "source": [ "## Modify schema/properties of DeltaLake table" ] }, { "cell_type": "code", "execution_count": null, "id": "ffa9b669", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import lit\n", "\n", "df_products.withColumn(\"Currency\",lit(\"USD\")).write \\\n", " .format(\"delta\") \\\n", " .mode(\"overwrite\") \\\n", " .option(\"overwriteSchema\", \"true\") \\\n", " .option(\"path\", table_location) \\\n", " .saveAsTable(f\"{database_name}.{table_name}\") # External table" ] }, { "cell_type": "markdown", "id": "0b80b1cb", "metadata": {}, "source": [ "## Insert records" ] }, { "cell_type": "code", "execution_count": null, "id": "8550f5ff", "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "product_insert = [\n", " {'product_id': '00006', 'product_name': 'Book', 'price': 500, 'category': 'Stationery','Currency': 'INR', 'updated_at': ut}, # Insert\n", " {'product_id': '00007', 'product_name': 'Pen', 'price': 50, 'category': 'Stationery','Currency': 'USD', 'updated_at': ut} # Insert\n", "]\n", "df_product_insert = spark.createDataFrame(Row(**x) for x in product_insert)\n", "\n", "df_product_insert.write.format(\"delta\").mode(\"append\").saveAsTable(f\"{database_name}.{table_name}\")" ] }, { "cell_type": "markdown", "id": "9cc12691", "metadata": {}, "source": [ "## Update records" ] }, { "cell_type": "code", "execution_count": null, "id": "50483876", "metadata": {}, "outputs": [], "source": [ "from delta.tables import *\n", "from pyspark.sql.functions import *\n", "\n", "deltaTable = DeltaTable.forName(spark, f\"{database_name}.{table_name}\")\n", "\n", "# Declare the predicate by using a SQL-formatted string.\n", "deltaTable.update(\n", " condition = \"product_id = '00006'\",\n", " set = { \"Currency\": \"'USD'\" }\n", ")\n" ] }, { "cell_type": "markdown", "id": "011c5d64", "metadata": {}, "source": [ "## Delete records" ] }, { "cell_type": "code", "execution_count": null, "id": "7e2f08a8", "metadata": {}, "outputs": [], "source": [ "# delete product_id 00007\n", "\n", "deltaTable = DeltaTable.forName(spark, f\"{database_name}.{table_name}\")\n", "deltaTable.delete(\"product_id = '00007'\")" ] }, { "cell_type": "markdown", "id": "c013d49c-da63-4910-b423-4ebd0e346e1f", "metadata": {}, "source": [ "## Upsert records into Delta table" ] }, { "cell_type": "code", "execution_count": null, "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97", "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "\n", "product_updates = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'Currency': 'INR','updated_at': ut}, # Update\n", " {'product_id': '00007', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'Currency': 'INR','updated_at': ut} # Insert\n", "]\n", "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n", "\n", "deltaTable.alias(\"products\").merge(\n", " df_product_updates.alias(\"updates\"),\n", " \"products.product_id = updates.product_id\") \\\n", " .whenMatchedUpdate(set = {\n", " \"product_name\": \"updates.product_name\",\n", " \"price\": \"updates.price\",\n", " \"category\": \"updates.category\",\n", " \"updated_at\": \"updates.updated_at\",\n", " \"Currency\": \"updates.Currency\"\n", " } ) \\\n", " .whenNotMatchedInsert(values = {\n", " \"product_id\": \"updates.product_id\",\n", " \"product_name\": \"updates.product_name\",\n", " \"price\": \"updates.price\",\n", " \"category\": \"updates.category\",\n", " \"updated_at\": \"updates.updated_at\",\n", " \"Currency\": \"updates.Currency\"\n", " }\n", ") \\\n", ".execute()\n", "deltaTable.toDF().show()" ] }, { "cell_type": "markdown", "id": "885a8de2", "metadata": {}, "source": [ "## View History" ] }, { "cell_type": "code", "execution_count": null, "id": "a646a330", "metadata": {}, "outputs": [], "source": [ "from delta.tables import *\n", "\n", "deltaTable = DeltaTable.forPath(spark, table_location)\n", "\n", "fullHistoryDF = deltaTable.history() \n", "fullHistoryDF.show()" ] }, { "cell_type": "markdown", "id": "7ac51f2b", "metadata": {}, "source": [ "## Query with time travel" ] }, { "cell_type": "code", "execution_count": null, "id": "1b5861f9", "metadata": {}, "outputs": [], "source": [ "df1 = spark.read.format(\"delta\").option(\"timestampAsOf\", \"2022-04-28 12:44:07\").load(table_location) #timestamp in YYYY-MM-DD HH:MM:SS\n", "df2 = spark.read.format(\"delta\").option(\"versionAsOf\", 3).load(table_location)\n", "df1.show()\n", "df2.show()\n" ] }, { "cell_type": "markdown", "id": "4a9a0ae7", "metadata": {}, "source": [ "# Roll Back" ] }, { "cell_type": "code", "execution_count": null, "id": "422e870d", "metadata": {}, "outputs": [], "source": [ "#Restore to version 2 using dataframe\n", "\n", "spark.read.format(\"delta\") \\\n", " .option(\"versionAsOf\", 2) \\\n", " .load(table_location) \\\n", " .write \\\n", " .mode(\"overwrite\") \\\n", " .format(\"delta\") \\\n", " .save(table_location)\n" ] }, { "cell_type": "markdown", "id": "dee36fbb", "metadata": {}, "source": [ "## Stop Session" ] }, { "cell_type": "code", "execution_count": null, "id": "e4fd69d3", "metadata": {}, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "kernelspec": { "display_name": "Glue PySpark", "language": "python", "name": "glue_pyspark" }, "toc-autonumbering": true, "toc-showcode": true, "toc-showmarkdowntxt": true, "toc-showtags": true }, "nbformat": 4, "nbformat_minor": 5 }