{ "cells": [ { "cell_type": "markdown", "id": "cfcba146-0010-4a7b-b186-63614bb52299", "metadata": { "tags": [] }, "source": [ "# Build a high-performance, transactional data lake using Delta Lake on Amazon EMR" ] }, { "cell_type": "markdown", "id": "fc7ab19e-0126-4124-a250-6b4817369591", "metadata": { "tags": [] }, "source": [ "## Topics covered in this example\n", "\n", "1) [Configuring Delta Lake](#configure_deltalake)
\n", "2) [Creating an Delta Lake Table](#create_table)
\n", "3) [DML Statements](#dml)
\n", "    a) [Updates](#updates)
\n", "    b) [Deletes](#deletes)
\n", "    c) [Upserts](#upserts)
\n", "4) [Time Travel](#time_travel)
\n", "5) [Optimization with File Management](#Optimization_with_File_Management)
\n", "6) [Z-ordering](#z_ordering)
" ] }, { "cell_type": "markdown", "id": "7772e20c-049c-4249-97e9-5120cb237e87", "metadata": { "tags": [] }, "source": [ "## Introduction\n", "\n", "Delta Lake is an open source project that enables building a modern data architecture on top of data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, and HDFS. Specifically, Delta Lake offers:\n", "* ACID transactions on Spark: Serializable isolation levels ensure that readers never see inconsistent data.\n", "* Scalable metadata handling: Leverages Spark distributed processing power to handle all the metadata for petabyte-scale tables with billions of files at ease.\n", "* Streaming and batch unification: A table in Delta Lake is a batch table as well as a streaming source and sink. Streaming data ingest, batch historic backfill, interactive queries all just work out of the box.\n", "* Schema enforcement: Automatically handles schema variations to prevent insertion of bad records during ingestion.\n", "* Time travel: Data versioning enables rollbacks, full historical audit trails, and reproducible machine learning experiments.\n", "* Upserts and deletes: Supports merge, update and delete operations to enable complex use cases like change-data-capture, slowly-changing-dimension (SCD) operations, streaming upserts, and so on." ] }, { "cell_type": "markdown", "id": "b736bf2b-1536-4876-9485-5938015b462a", "metadata": { "tags": [] }, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* This notebook was tested using EMR 6.7 cluster and Delta Lake 2.0.0\n", "* [Download](https://github.com/delta-io/delta/releases/) the delta jar file (we used delta-core_2.12:2.0.0) and store it in a S3 Bucket (e.g. s3://your bucket/jars/) in your AWS account. Create below script and store it into a S3 bucket (e.g. s3://your bucket/bootstrap/deltajarinstall.sh) to be used for bootstrap action as shown in the following example.\n", "\n", " \n", " #!/bin/bash\n", " sudo curl -O --output-dir /usr/lib/spark/jars/ https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.0.0/delta-core_2.12-2.0.0.jar \n", " sudo curl -O --output-dir /usr/lib/spark/jars/ https://repo1.maven.org/maven2/io/delta/delta-storage/2.0.0/delta-storage-2.0.0.jar\n", " sudo python3 -m pip install delta-spark==2.0.0\n", "\n", "\n", "* Here is a sample CLI command to create an EMR cluster. Replace \"your subnet\" with one of the subnets in which your EMR Studio is running and update \"your-bucket\" with the your s3 bucket:\n", "\n", " aws emr create-cluster \\\n", " --name \"emr-delta-lake-blog\" \\\n", " --release-label emr-6.7.0 \\\n", " --applications Name=Hadoop Name=Hive Name=Livy Name=Spark Name=JupyterEnterpriseGateway \\\n", " --instance-type m5.xlarge \\\n", " --instance-count 3 \\\n", " --ec2-attributes SubnetId='' \\\n", " --use-default-roles \\\n", " --bootstrap-actions Path=\"s3:///bootstrap/deltajarinstall.sh\"\n", " \n", "\n", "* This notebook uses the `PySpark` kernel. \n", "***" ] }, { "cell_type": "markdown", "id": "261e3974-0098-4966-b345-01b6fb71c50d", "metadata": {}, "source": [ "\n", "## Configuring Delta Lake on Spark session\n", "\n", "Configure your Spark session using the %%configure magic command." ] }, { "cell_type": "code", "execution_count": null, "id": "b074a69a-55f0-40b2-826d-ffd358aea9b6", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\": {\n", " \"spark.sql.extensions\": \"io.delta.sql.DeltaSparkSessionExtension\",\n", " \"spark.sql.catalog.spark_catalog\": \"org.apache.spark.sql.delta.catalog.DeltaCatalog\"\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "1eba3efe-249a-4f2f-9a55-5a67131e9971", "metadata": { "tags": [] }, "outputs": [], "source": [ "#import libaries\n", "from delta.tables import *\n", "from pyspark.sql.functions import *" ] }, { "cell_type": "code", "execution_count": null, "id": "a5997493-5744-4100-83cf-28d26d214ffe", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaPath = \"s3:///delta-amazon-reviews-pds/\"" ] }, { "cell_type": "markdown", "id": "3f5cfb97-abb9-4ba9-88ae-39bb3ea41c15", "metadata": { "execution": { "iopub.execute_input": "2022-08-23T23:28:44.449520Z", "iopub.status.busy": "2022-08-23T23:28:44.449292Z", "iopub.status.idle": "2022-08-23T23:29:03.782049Z", "shell.execute_reply": "2022-08-23T23:29:03.781254Z", "shell.execute_reply.started": "2022-08-23T23:28:44.449498Z" }, "tags": [] }, "source": [ "\n", "## Create Delta Lake Table\n", "\n", "**We will be using Amazon Product Reviews Dataset dataset, spend some time to get familiarized with this dataset.**\n", "\n", "We are loading just one partition for sake of simplicity" ] }, { "cell_type": "code", "execution_count": null, "id": "e8799bb5-6031-49ea-89aa-8694db1ffc31", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_parquet = spark.read.parquet(\"s3://amazon-reviews-pds/parquet/product_category=Gift_Card/*.parquet\")" ] }, { "cell_type": "code", "execution_count": null, "id": "33880d8a-115c-429b-bbea-6f908e534ca9", "metadata": { "tags": [] }, "outputs": [], "source": [ "#to check schema\n", "df_parquet.printSchema()" ] }, { "cell_type": "markdown", "id": "95d0b581-3827-46a6-a582-99c4582b36f0", "metadata": { "tags": [] }, "source": [ "Convert the parquet file and write the data to s3 in Delta Lake table format" ] }, { "cell_type": "code", "execution_count": null, "id": "f524a4b0-25eb-4c32-983c-ab7dd12fdf46", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_parquet.write.mode(\"overwrite\").format(\"delta\").partitionBy(\"year\").save(deltaPath)" ] }, { "cell_type": "code", "execution_count": null, "id": "eb06d4ad-578c-4896-b2b8-e2610ab16dc1", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_delta = spark.read.format(\"delta\").load(deltaPath)\n", "df_delta.show()" ] }, { "cell_type": "code", "execution_count": null, "id": "6935c938-9953-44c4-b1a1-64b95060a939", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM delta.`s3:///delta-amazon-reviews-pds/` LIMIT 10" ] }, { "cell_type": "markdown", "id": "1e104059-a584-4dab-9a32-120d90fe0d0f", "metadata": { "execution": { "iopub.execute_input": "2022-08-23T23:40:05.411707Z", "iopub.status.busy": "2022-08-23T23:40:05.411469Z", "iopub.status.idle": "2022-08-23T23:40:05.475477Z", "shell.execute_reply": "2022-08-23T23:40:05.474528Z", "shell.execute_reply.started": "2022-08-23T23:40:05.411683Z" }, "tags": [] }, "source": [ "\n", "## DML Operations\n", "Delta Lake supports all DML statements to add or modify data in your data lake: Inserts to add new data, Updates to modify specific columns in specific rows in your existing data, Deletes for GDPR and CCPA compliance and Upserts when you have incoming data that may have a mix of inserts and updates. Let us look at each of them now." ] }, { "cell_type": "markdown", "id": "cfd2b7c8-e781-4311-b3f6-d502d54d05fe", "metadata": { "execution": { "iopub.execute_input": "2022-08-23T23:29:22.147289Z", "iopub.status.busy": "2022-08-23T23:29:22.147059Z", "iopub.status.idle": "2022-08-23T23:29:35.466738Z", "shell.execute_reply": "2022-08-23T23:29:35.465967Z", "shell.execute_reply.started": "2022-08-23T23:29:22.147265Z" }, "tags": [] }, "source": [ "\n", "### Updates\n", "Change the `marketplace` from US to USA. Delta Lake Table allows updates using a simple `UPDATE` and`SET` clause added to your query" ] }, { "cell_type": "code", "execution_count": null, "id": "18ed020e-91c5-4fb4-bdcd-76039b906279", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable = DeltaTable.forPath(spark, deltaPath)" ] }, { "cell_type": "code", "execution_count": null, "id": "9de206d2-e697-4f81-a8f7-d5ece0b6dd3c", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Update column \n", "deltaTable.update(\"marketplace = 'US'\",{ \"marketplace\":\"'USA'\"})" ] }, { "cell_type": "markdown", "id": "0388a146-6185-40f4-b444-b0f6bea2be66", "metadata": { "execution": { "iopub.execute_input": "2022-08-23T23:29:35.467904Z", "iopub.status.busy": "2022-08-23T23:29:35.467726Z", "iopub.status.idle": "2022-08-23T23:29:37.804704Z", "shell.execute_reply": "2022-08-23T23:29:37.803959Z", "shell.execute_reply.started": "2022-08-23T23:29:35.467882Z" }, "tags": [] }, "source": [ "You can also use sparkmagic %%sql " ] }, { "cell_type": "code", "execution_count": null, "id": "da18fbb2-efef-4091-a01c-ce29e564adcc", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql\n", "update delta.`s3:///delta-amazon-reviews-pds/`\n", "set marketplace = 'US' where marketplace = 'USA' " ] }, { "cell_type": "code", "execution_count": null, "id": "45d80751-6199-47ff-a0ce-c084d977eb76", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.toDF().show()" ] }, { "cell_type": "markdown", "id": "6312d204-92ba-4266-b014-cb694097bba9", "metadata": {}, "source": [ "\n", "### Deletes\n", "GDPR and CCPA regulations mandate timely removal of individual customer data and other records from datasets. Delta Lake Table is designed to be able to handle these trivially." ] }, { "cell_type": "code", "execution_count": null, "id": "c2cc4521-a0c5-4f63-a1de-7e7909f8d73f", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.delete(\"verified_purchase = 'N'\")" ] }, { "cell_type": "code", "execution_count": null, "id": "097a149c-08e4-45aa-aeb7-f45e4f64d785", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.toDF().show()" ] }, { "cell_type": "markdown", "id": "d082080f-58df-45ac-94cd-510a0a21f688", "metadata": {}, "source": [ "\n", "## Time Travel\n", "Let us query our table as of the previous snapshot." ] }, { "cell_type": "code", "execution_count": null, "id": "5c83ca0c-9b12-49d2-9919-b37ebdb52e91", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.history(100).select(\"version\", \"timestamp\", \"operation\", \"operationParameters\").show(truncate=False)" ] }, { "cell_type": "code", "execution_count": null, "id": "34ca10a1-2737-40a5-ae5f-fb38832f290d", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_time_travel = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(deltaPath)\n", "df_time_travel.show()" ] }, { "cell_type": "markdown", "id": "53fec7a8-9c47-4eca-8441-a8d8185ac887", "metadata": { "execution": { "iopub.execute_input": "2022-08-23T23:46:29.196377Z", "iopub.status.busy": "2022-08-23T23:46:29.196095Z", "iopub.status.idle": "2022-08-23T23:46:29.261504Z", "shell.execute_reply": "2022-08-23T23:46:29.260765Z", "shell.execute_reply.started": "2022-08-23T23:46:29.196344Z" }, "tags": [] }, "source": [ "\n", "### Upserts" ] }, { "cell_type": "code", "execution_count": null, "id": "0bbeb359-63be-4a7b-9d94-082d0d62ccdf", "metadata": { "tags": [] }, "outputs": [], "source": [ "data_upsert = [ {\"marketplace\":'US',\"customer_id\":'38602100', \"review_id\":'R315TR7JY5XODE',\"product_id\":'B00CHSWG6O',\"product_parent\":'336289302',\"product_title\" :'Amazon eGift Card', \"star_rating\":'5', \"helpful_votes\":'2',\"total_votes\":'0',\"vine\":'N',\"verified_purchase\":'Y',\"review_headline\":'GREAT',\"review_body\":'GOOD PRODUCT',\"review_date\":'2014-04-11',\"year\":'2014'},\n", " {\"marketplace\":'US',\"customer_id\":'38602103', \"review_id\":'R315TR7JY5XOA1',\"product_id\":\"B007V6EVY2\",\"product_parent\":'910961751',\"product_title\" :'Amazon eGift Card', \"star_rating\":'5', \"helpful_votes\":'2',\"total_votes\":'0',\"vine\":'N',\"verified_purchase\":'Y',\"review_headline\":'AWESOME',\"review_body\":'GREAT PRODUCT',\"review_date\":'2014-04-11',\"year\":'2014'}\n", "]" ] }, { "cell_type": "code", "execution_count": null, "id": "0a5cd733-fb4c-4108-9bfc-b222c33a48a4", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_data_upsert = spark.createDataFrame(data_upsert)" ] }, { "cell_type": "code", "execution_count": null, "id": "35c30ddd-f252-4a9d-b8e5-4bb3b0ff1c5d", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_data_upsert.show()" ] }, { "cell_type": "code", "execution_count": null, "id": "ddab76d1-c6f1-4d82-9f69-c68dc9ba204d", "metadata": { "tags": [] }, "outputs": [], "source": [ "(deltaTable\n", ".alias('t')\n", ".merge(df_data_upsert.alias('u'), 't.review_id = u.review_id')\n", ".whenMatchedUpdateAll()\n", ".whenNotMatchedInsertAll()\n", ".execute())" ] }, { "cell_type": "code", "execution_count": null, "id": "d923acef-0c3d-40ac-b103-e58d6cbedd50", "metadata": { "tags": [] }, "outputs": [], "source": [ "spark.read.format(\"delta\").load(deltaPath).createOrReplaceTempView(\"temp_product_reviews\")\n", "spark.sql(\"SELECT * FROM temp_product_reviews where review_id in ('R315TR7JY5XODE','R315TR7JY5XOA1')\").show()" ] }, { "cell_type": "markdown", "id": "ee51dd9a-d5c0-412f-bd92-eb392571b979", "metadata": {}, "source": [ "Compare this with original records using time travel" ] }, { "cell_type": "code", "execution_count": null, "id": "94a80d28-93bc-4efc-af6e-f8f10d55d01a", "metadata": { "tags": [] }, "outputs": [], "source": [ "df_time_travel.filter(\"review_id ='R315TR7JY5XODE'\").show()" ] }, { "cell_type": "markdown", "id": "5eaf4546-2ab0-430e-9bc5-24c891577e18", "metadata": { "tags": [] }, "source": [ "\n", "## Optimization with File Management\n", "To improve query speed, Delta Lake supports the ability to optimize the layout of data instorage. There are various ways to optimize the layout. You can use the following command to optimize the whole table" ] }, { "cell_type": "code", "execution_count": null, "id": "2934c3d0-46d4-48f2-bc99-7f72ce819cd3", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.optimize().executeCompaction()" ] }, { "cell_type": "markdown", "id": "5a3618fc-79ff-4133-bddd-0ff2b72f0e5c", "metadata": {}, "source": [ "If you have large amount of data, you can reduce the scope of optimization by using where clause condition as shown below:" ] }, { "cell_type": "code", "execution_count": null, "id": "5ace33ea-18c3-4e76-84d6-b9b29ba16d33", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.optimize().where(\"year='2015'\").executeCompaction()" ] }, { "cell_type": "markdown", "id": "13254844-0610-4087-a371-363adde56851", "metadata": {}, "source": [ "\n", "## Z-ordering\n", "Delta Lake used Z-Ordering for the data - skipping algorithms. This reduces the amount of data that delta lake needs to read. To perform the Z-Order of data you specify the columns to order on in the ZORDER BY clause. In the following example, we are z-ordering the table based on a low cardinality column “verified_purchase”." ] }, { "cell_type": "code", "execution_count": null, "id": "1c8a8fcd-ce16-4270-b11c-e70d5ac7a73a", "metadata": { "tags": [] }, "outputs": [], "source": [ "deltaTable.optimize().executeZOrderBy(\"verified_purchase\")" ] }, { "cell_type": "markdown", "id": "78852491-3679-4f0a-a1a7-daefb9c2e4b9", "metadata": {}, "source": [ "\n", "## Clean up\n", "To avoid ongoing charges, delete the Amazon S3 buckets, delete the Amazon EMR studio, and terminate the EMR cluster used for experimentation of this post." ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }