{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "a8916e1b-6de5-4096-964d-87e2f7b5b746", "metadata": { "deletable": false, "editable": true, "id": "a8916e1b-6de5-4096-964d-87e2f7b5b746", "trusted": true }, "outputs": [], "source": [ "%session_id_prefix hudi-sql-\n", "%glue_version 3.0\n", "%idle_timeout 60\n", "%connections \n", "%%configure \n", "{\n", " \"--conf\": \"spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension\",\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192", "metadata": { "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192", "trusted": true }, "outputs": [], "source": [ "bucket_name = \"\"\n", "bucket_prefix = \"\"\n", "database_name = \"hudi_sql\"\n", "table_name = \"product_cow\"\n", "table_prefix = f\"{bucket_prefix}/{database_name}/{table_name}\"\n", "table_location = f\"s3://{bucket_name}/{table_prefix}\"" ] }, { "cell_type": "markdown", "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750", "metadata": { "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750" }, "source": [ "## Clean up existing resources" ] }, { "cell_type": "code", "execution_count": null, "id": "62dc44d1-4c48-4f24-bfce-60637972914b", "metadata": { "id": "62dc44d1-4c48-4f24-bfce-60637972914b", "trusted": true }, "outputs": [], "source": [ "import boto3\n", "\n", "## Create a database with the name hudi_sql to host hudi tables if not exists.\n", "try:\n", " glue = boto3.client('glue')\n", " glue.create_database(DatabaseInput={'Name': database_name})\n", " print(f\"New database {database_name} created\")\n", "except glue.exceptions.AlreadyExistsException:\n", " print(f\"Database {database_name} already exist\")\n", "\n", "## Delete files in S3\n", "s3 = boto3.resource('s3')\n", "bucket = s3.Bucket(bucket_name)\n", "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()\n", "\n", "## Drop table in Glue Data Catalog\n", "try:\n", " glue = boto3.client('glue')\n", " glue.delete_table(DatabaseName=database_name, Name=table_name)\n", "except glue.exceptions.EntityNotFoundException:\n", " print(f\"Table {database_name}.{table_name} does not exist\")\n" ] }, { "cell_type": "markdown", "id": "08706080-9af8-4721-bdfa-0872e0407c68", "metadata": { "id": "08706080-9af8-4721-bdfa-0872e0407c68" }, "source": [ "## Create Hudi table with sample data using catalog sync" ] }, { "cell_type": "code", "execution_count": null, "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8", "metadata": { "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8", "outputId": "a5d61103-8120-4869-ca80-e42f8f282055", "trusted": true }, "outputs": [], "source": [ "from pyspark.sql import Row\n", "import time\n", "\n", "ut = time.time()\n", "\n", "product = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n", "]\n", "\n", "df_products = spark.createDataFrame(Row(**x) for x in product)" ] }, { "cell_type": "code", "execution_count": null, "id": "a4c76d90-7137-40ec-9577-837e4716404b", "metadata": { "id": "a4c76d90-7137-40ec-9577-837e4716404b", "outputId": "fc98feed-2b27-4ee5-cf80-5f948863d44c", "trusted": true }, "outputs": [], "source": [ "df_products.createOrReplaceTempView(\"tmp_product_cow\")" ] }, { "cell_type": "markdown", "id": "e639391d-6195-4f99-83ee-87d0a5c21e61", "metadata": { "id": "e639391d-6195-4f99-83ee-87d0a5c21e61" }, "source": [ "The following query create an external hudi table with the configuration specified in the options. For more information, check https://hudi.apache.org/docs/table_management/#create-table-for-an-external-hudi-table" ] }, { "cell_type": "code", "execution_count": null, "id": "4b53f9a9-fe7a-456f-aca9-7d5d80ab1b01", "metadata": { "id": "4b53f9a9-fe7a-456f-aca9-7d5d80ab1b01", "trusted": true }, "outputs": [], "source": [ "query = f\"\"\"\n", "create table if not exists {database_name}.{table_name} using hudi\n", "options (\n", " type = 'cow',\n", " primaryKey = 'product_id',\n", " preCombineField = 'updated_at',\n", " path = '{table_location}',\n", " hoodie.table.name = '{table_name}',\n", " hoodie.datasource.write.operation = 'upsert',\n", " hoodie.datasource.hive_sync.enable = 'true',\n", " hoodie.datasource.hive_sync.database = '{database_name}',\n", " hoodie.datasource.hive_sync.table = '{table_name}',\n", " hoodie.datasource.hive_sync.partition_fields = 'category',\n", " hoodie.datasource.hive_sync.partition_extractor_class = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',\n", " hoodie.datasource.hive_sync.use_jdbc = 'false',\n", " hoodie.datasource.write.hive_style_partitioning = 'true'\n", ")\n", "partitioned by (category)\n", "AS SELECT * FROM tmp_product_cow\n", "\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "markdown", "id": "fda19a10-4b69-4272-99a2-1b1156e937c2", "metadata": { "id": "fda19a10-4b69-4272-99a2-1b1156e937c2" }, "source": [ "## Read from Hudi table" ] }, { "cell_type": "code", "execution_count": null, "id": "777215c7-272c-44a8-84e3-e799c0e7852f", "metadata": { "id": "777215c7-272c-44a8-84e3-e799c0e7852f", "outputId": "a3acaf5e-2ad2-4193-bcbe-e68e64879624", "trusted": true }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM hudi_sql.product_cow" ] }, { "cell_type": "markdown", "id": "c013d49c-da63-4910-b423-4ebd0e346e1f", "metadata": { "id": "c013d49c-da63-4910-b423-4ebd0e346e1f" }, "source": [ "## Upsert records into Hudi table" ] }, { "cell_type": "code", "execution_count": null, "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97", "metadata": { "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97", "outputId": "57a02e9f-9c41-4009-c7eb-c626ec6442d1", "trusted": true }, "outputs": [], "source": [ "ut = time.time()\n", "\n", "product_updates = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'updated_at': ut, 'category': 'Electronics'}, # Update\n", " {'product_id': '00006', 'product_name': 'Desk', 'price': 50, 'updated_at': ut, 'category': 'Furniture'} # Insert\n", "]\n", "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "77e3e8f8-8dce-4c92-97e3-a5ffc942950b", "metadata": { "id": "77e3e8f8-8dce-4c92-97e3-a5ffc942950b", "outputId": "6cb0dd20-281a-424b-de06-8f38c099260e", "trusted": true }, "outputs": [], "source": [ "df_product_updates.createOrReplaceTempView(\"tmp_product_cow_updates\")" ] }, { "cell_type": "code", "execution_count": null, "id": "a59c8932-d1c7-493c-b800-bff9d204c7ef", "metadata": { "id": "a59c8932-d1c7-493c-b800-bff9d204c7ef", "trusted": true }, "outputs": [], "source": [ "%%sql\n", "INSERT OVERWRITE hudi_sql.product_cow SELECT * FROM tmp_product_cow_updates" ] }, { "cell_type": "code", "execution_count": null, "id": "dd563164-fa7d-4166-b171-46d7a1a623f8", "metadata": { "id": "dd563164-fa7d-4166-b171-46d7a1a623f8", "outputId": "b8edcea0-910e-4648-ca37-e1feb3d444d3", "trusted": true }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM hudi_sql.product_cow" ] }, { "cell_type": "code", "execution_count": null, "id": "66730a3a-2242-4575-b431-3f670178de6d", "metadata": { "id": "66730a3a-2242-4575-b431-3f670178de6d", "trusted": true }, "outputs": [], "source": [ "%%sql\n", "UPDATE hudi_sql.product_cow SET price =price * 1.2, updated_at = unix_timestamp(current_timestamp())" ] }, { "cell_type": "code", "execution_count": null, "id": "6ed8df6c-b8b2-42fc-8d5a-cdaeba62e7b2", "metadata": { "id": "6ed8df6c-b8b2-42fc-8d5a-cdaeba62e7b2", "outputId": "70ce1a6a-4d17-462a-f22e-1cf8bd8a95f4", "trusted": true }, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM hudi_sql.product_cow" ] }, { "cell_type": "markdown", "id": "c200714e", "metadata": {}, "source": [ "## Stop Session" ] }, { "cell_type": "code", "execution_count": null, "id": "fbe71f6b-6cec-48be-9e00-28ceae60026f", "metadata": { "id": "fbe71f6b-6cec-48be-9e00-28ceae60026f", "outputId": "5b210a0a-32c5-4567-8c21-551d337e6e77", "trusted": true }, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "hudi_sql.ipynb", "provenance": [] }, "kernelspec": { "display_name": "Glue PySpark", "language": "python", "name": "glue_pyspark" } }, "nbformat": 4, "nbformat_minor": 5 }