{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Query `Apache Hudi` dataset using Spark SQL\n", "\n", "#### Topics covered in this example\n", "* Hudi operations like Insert, Update, MergeInto, Read, Delete, Insert Overwrite, Alter Table and Virtual Keys using Spark SQL DMLs and DDLs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***\n", "\n", "## Prerequisites\n", "
\n", "NOTE : In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.
\n", "\n", "* To use Hudi with Amazon EMR Notebooks, you must first copy the Hudi jar files from the local file system to HDFS, present on the master node of the EMR cluster. You then use the notebook to configure your EMR notebook to use Hudi. Follow the `Setup` steps.\n", "* EMR 6.5.0 cluster should be attached to this notebook and should have the `Spark` and `Hive` applications installed. At the time of this article writing, Hudi 0.9.0 is available as default on EMR 6.5.0\n", "* This example uses a [Amazon Customer reviews](https://s3.amazonaws.com/amazon-reviews-pds/readme.html) public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.\n", "* This notebook uses the `Spark` kernel.\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "Hudi is a data management framework used to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert,merge and delete capabilities. By efficiently managing how data is laid out in Amazon S3, Hudi allows data to be ingested and updated in near real time. Hudi carefully maintains metadata of the actions performed on the dataset to help ensure that the actions are atomic and consistent.\n", "\n", "You can use Hive, Spark, Presto, Athena and Redshift to query a Hudi dataset interactively or build data processing pipelines using incremental pull. Incremental pull refers to the ability to pull only the data that changed between two actions.\n", "\n", "Hudi now supports for DDL/DMLs using Spark SQL, taking a huge step towards making Hudi more easily accessible and operable by all personas (non-engineers, analysts etc) enabling seamless migration of existing data set to Hudi seamless, taking a step toward less code pradigm. \n", "\n", "The Quick start guide to use : Spark SQL DML and DDL capabilities should be referenced for further detailed information.\n", "\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "\n", "1. Create an S3 bucket location to save your hudi dataset. For example: s3://EXAMPLE-BUCKET/my-hudi-dataset/\n", "\n", "2. Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired. \n", "\n", "```\n", "hdfs dfs -mkdir -p /apps/hudi/lib \n", "hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar\n", "hdfs dfs -ls /apps/hudi/lib/hudi-spark-bundle.jar\n", "```\n", "\n", "\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Spark Configuration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "The Notebook has been tested with EMR 6.5.0 having Spark 3.1.2 installed \n", "\n", "Important links for Spark and Hudi congiurations are \n", "* EMR [app version Doc](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-6.x.html)\n", "* [Apache Hudi Github](https://github.com/apache/hudi)\n", "* Apache Hudi SparkSQL [Quick start guide ](https://hudi.apache.org/docs/quick-start-guide)\n", "* Apache Hudi [Configuration](https://hudi.apache.org/docs/configurations)\n", "\n", "\n", "**Before running the above configure statement, Please ensure the hudi jars are located in HDFS** \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\" : {\n", " \"spark.jars\":\"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar\", \n", " \"spark.serializer\":\"org.apache.spark.serializer.KryoSerializer\",\n", " \"spark.sql.extensions\":\"org.apache.spark.sql.hudi.HoodieSparkSessionExtension\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create an External Table over Input data set \n", "\n", "Amazon Customer Reviews is used as source input dataset and will be migrated into a Hudi Table \n", "\n", "* Input Date set : [Amazon Customer Reviews Data set](https://s3.amazonaws.com/amazon-reviews-pds/readme.html)\n", "* [S3](s3://amazon-reviews-pds/parquet/) location for [Amazon Customer Reviews Data set](https://s3.amazonaws.com/amazon-reviews-pds/readme.html)\n", "* Lets extract columns of Interest from Amazon Customer Reviews Data set\n", "\n", "#### Querying Amazon Customer Reviews source dataset " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Input Data set used : Amazons Review (https://s3.amazonaws.com/amazon-reviews-pds/readme.html)\n", "// Dataset available at s3://amazon-reviews-pds/parquet/. \n", "\n", "// Lets look at Schema of the Amazon Customer Reviews Data set.\n", "spark.read.format(\"parquet\").load(\"s3://amazon-reviews-pds/parquet/*\").printSchema()\n", "\n", "\n", "// Show columns of interest from Amazon Customer Reviews Dataset\n", "\n", "(spark.read.parquet(\"s3://amazon-reviews-pds/parquet/*\")\n", " .select(\"marketplace\", \"review_id\", \"customer_id\", \"product_title\", \"star_rating\", \"review_date\")\n", " .show(10))\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create an External Table amazon_customer_review_parquet\n", "\n", "External table amazon_customer_review_parquet is created over Amazon Customer Reviews as input data source " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/****************************\n", "Create a external table over Amazon customer reviews table\n", "Path = s3://amazon-reviews-pds/parquet/product_category=Home_Improvement/\n", "*****************************/\n", "\n", "\n", "create external table if not exists amazon_customer_review_parquet \n", " (\n", " marketplace string, \n", " review_id string, \n", " customer_id string,\n", " product_title string,\n", " star_rating int,\n", " review_date date\n", " )\n", " STORED AS PARQUET\n", " LOCATION 's3://amazon-reviews-pds/parquet/product_category=Home_Improvement'\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/****************************\n", "Read from external table Amazon customercustomer reviews table\n", "created over the Path = s3://amazon-reviews-pds/parquet/product_category=Home_Improvement/\n", "*****************************/\n", "\n", "select * from amazon_customer_review_parquet limit 10 " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Apache Hudi Table \n", "\n", "\n", "* Lets create an Apache hudi table **amazon_customer_review_hudi** partitioned by (year,month,date) where year, month and date will be extracted from **review_date** column of amazon_customer_review_parquet table\n", "* External **amazon_customer_review_hudi** table will be populated with data from **amazon_customer_review_parquet** table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/****************************\n", "Create a HUDI table for Amazon customer reviews table containing selected columns \n", "*****************************/\n", "\n", "-- Hoodi 9 configuration https://hudi.apache.org/docs/configurations\n", "-- Hoodie configurations can be set in options as hoodie.datasource.hive_sync.assume_date_partitioning = 'false',\n", "\n", "\n", "create table if not exists amazon_customer_review_hudi\n", " ( marketplace string, \n", " review_id string, \n", " customer_id string,\n", " product_title string,\n", " star_rating int,\n", " timestamp long ,\n", " review_date date,\n", " year string,\n", " month string ,\n", " day string\n", " )\n", " using hudi\n", " location 's3://EXAMPLE-BUCKET/my-hudi-dataset/'\n", " options ( \n", " type = 'cow', \n", " primaryKey = 'review_id', \n", " preCombineField = 'timestamp'\n", " )\n", " partitioned by (year,month,day);\n", " \n", "\n", "-- Change Location to the S3 location that you created in Step 1. s3://EXAMPLE-BUCKET/my-hudi-dataset/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Copy Data from Amazon Review (parquet) to Hudi table\n", "\n", "\n", "Lets now begin with copying data from **amazon_customer_review_parquet** to **amazon_customer_review_hudi** table using SQL INSERT INTO statement \n", "* It is possible to copy large datasets from S3 over to Hudi tables using a simple INSERT INTO command. \n", "* In our notebook, we will be referring to the previously defined **EXTERNAL amazon_customer_review_parquet** as **source table**, and \n", " **amazon_customer_review_hudi** as **target table.**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "\n", "/*************************************\n", "Copy data from amazon_customer_review_parquet to amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "\n", "INSERT INTO amazon_customer_review_hudi\n", " select \n", " marketplace , \n", " review_id , \n", " customer_id,\n", " product_title,\n", " star_rating,\n", " unix_timestamp(current_timestamp()) as timestamp,\n", " review_date,\n", " date_format(review_date, \"yyyy\") as year, \n", " date_format(review_date, \"MM\") as month,\n", " date_format(review_date, \"dd\") as day \n", " from amazon_customer_review_parquet limit 20 \n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Show Records in amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "\n", "select * from amazon_customer_review_hudi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Insert a record in Hudi Table \n", "\n", "Lets now see how easy it is to insert a record in Hudi table. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/****************************\n", " Insert a record into amazon_customer_review_hudi table\n", "*****************************/\n", "\n", "-- Spark SQL date time functions https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html\n", "-- Spark SQL date time functions https://spark.apache.org/docs/latest/api/sql/index.html#date_add\n", "-- TO_DATE function returns date in YYYY-MM-DD format \n", "\n", "insert into amazon_customer_review_hudi\n", " select \n", " 'US',\n", " 'Q1WWG70WK9VUCH365',\n", " '15444933',\n", " 'Standing Qigong',\n", " 5,\n", " 123455,\n", " TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n", " date_format(date '2015-05-02', \"yyyy\") as year, \n", " date_format(date '2015-05-02', \"MM\") as month,\n", " date_format(date '2015-05-02', \"dd\") as day \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Pay Special attention to use of date_format to extract year, month and day\n", "* Hudi is smart enough to pad it with 0 in s3 partition path . Example date_format(date '2015-05-02', \"MM\") = 5 but the S3 parition path will have month=05 \n", "* Use of UDF like string_split, extract etc will result into creation of S3 parition path without 0 being padded. example month=5 " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/****************************\n", " Read the inserted record into amazon_customer_review_hudi table\n", "*****************************/\n", "\n", "select * from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365' \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update Data \n", "\n", "After insert, lets now try to accomplish an update, Hudi 0.9.0 supports two kinds of DML to update hudi table . \n", "\n", "* **Update**\n", "* **Merge-Into** \n", "\n", "\n", "#### Update\n", "Lets Update a pre-existing record. \n", "\n", "* Update refers to the ability to insert records into an existing dataset if they do not already exist or to update a specific pre-existing column. \n", "* The following example demonstrates how to update star_rating ( condition star_rating = 0 where star_rating = 5) by using Spark SQL statements." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "Lets take a look at our data in amazon_customer_review_hudi. \n", "Lets say someone says there is something odd going on with star ratings.\n", "**************************************/\n", "\n", "select star_rating, count(*) from amazon_customer_review_hudi group by star_rating order by star_rating ASC\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "\n", "Update The records where Start rating = 5 \n", "**************************************/\n", "\n", "\n", "update amazon_customer_review_hudi set star_rating = 0 where star_rating = 5\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "Lets take a re-look at our data in amazon_customer_review_hudi after the update operation\n", "**************************************/\n", "\n", "\n", "select star_rating, count(*) from amazon_customer_review_hudi group by star_rating order by star_rating ASC\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### MergeInto\n", "\n", "Now Lets create dummy **amazon_customer_review_parquet_merge_source** table and insert data into the table which will eventually be **Merged Into** amazon_customer_review_hudi table.\n", "Following steps will be required to perform a merge and understand various possible options of Merge command. \n", "\n", "* Create amazon_customer_review_parquet_merge_source table with an additonal column for tracking deletion\n", "* Insert Records into amazon_customer_review_parquet_merge_source to showcase update, delete and insert \n", "* Insert Records into amazon_customer_review_hudi to showcase update, delete and insert \n", "* Perform a merge from amazon_customer_review_parquet_merge_source into amazon_customer_review_hudi" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql \n", "\n", "/****************************\n", "Create a table to be used for merging into amazon_customer_review_hudi \n", "*****************************/\n", "\n", "\n", "create table if not exists amazon_customer_review_parquet_merge_source \n", " (\n", " marketplace string, \n", " review_id string, \n", " customer_id string,\n", " product_title string,\n", " star_rating int,\n", " review_date date,\n", " deleteRecord string\n", " )\n", " STORED AS PARQUET\n", " LOCATION 's3://EXAMPLE-BUCKET-1/toBeMergeData/'\n", "\n", "\n", "-- Change Location ('s3://EXAMPLE-BUCKET-1/toBeMergeData/') to appropriate S3 bucket you have created in your AWS account" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Insert record into amazon_customer_review_parquet_merge_source table\n", "\n", "Lets Insert Records into **amazon_customer_review_parquet_merge_source** table for update, delete and insert " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql \n", "\n", "\n", "/****************************\n", " Insert a record into amazon_customer_review_parquet_merge_source for Deletion \n", "*****************************/\n", "\n", "-- The record will be deleted from amazon_customer_review_hudi after merge as deleteRecord is set to yes\n", "\n", "insert into amazon_customer_review_parquet_merge_source\n", " select\n", " 'italy',\n", " '11',\n", " '1111',\n", " 'table',\n", " 5,\n", " TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n", " 'yes' \n", "\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql \n", "\n", "\n", "/****************************\n", " Insert a record into amazon_customer_review_parquet_merge_source used for Update\n", "*****************************/\n", "\n", "-- The record will be updated from amazon_customer_review_hudi with new Star rating and product_title after merge\n", "\n", "insert into amazon_customer_review_parquet_merge_source\n", " select\n", " 'spain',\n", " '22',\n", " '2222',\n", " 'Relaxing chair',\n", " 4,\n", " TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n", " 'no' \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "\n", "/****************************\n", " Insert a record into amazon_customer_review_parquet_merge_source for Insert \n", "*****************************/\n", "\n", "-- The record will be inserted into amazon_customer_review_hudi after merge \n", "\n", "\n", "insert into amazon_customer_review_parquet_merge_source\n", " select\n", " 'uk',\n", " '33',\n", " '3333',\n", " 'hanger',\n", " 3,\n", " TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n", " 'no' \n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Read inserted records from amazon_customer_review_parquet_merge_source\n", "**************************************/\n", "\n", "select * from amazon_customer_review_parquet_merge_source " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Insert record into amazon_customer_review_hudi table\n", "\n", "Lets go ahead and Insert Records into **amazon_customer_review_hudi** for update and delete. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/****************************\n", " Insert a record into amazon_customer_review_hudi table for deletion after merge \n", "*****************************/\n", "\n", "-- Spark SQL date time functions https://spark.apache.org/docs/latest/api/sql/index.html#date_add\n", "\n", "\n", "insert into amazon_customer_review_hudi\n", " select \n", " 'italy',\n", " '11',\n", " '1111',\n", " 'table',\n", " 5,\n", " unix_timestamp(current_timestamp()) as timestamp,\n", " TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n", " date_format(date '2015-05-02', \"yyyy\") as year, \n", " date_format(date '2015-05-02', \"MM\") as month,\n", " date_format(date '2015-05-02', \"dd\") as day \n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/****************************\n", " Insert a record into amazon_customer_review_hudi table for update after merge \n", "*****************************/\n", "\n", "\n", "insert into amazon_customer_review_hudi\n", " select \n", " 'spain',\n", " '22',\n", " '2222',\n", " 'chair ',\n", " 5,\n", " unix_timestamp(current_timestamp()) as timestamp,\n", " TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n", " date_format(date '2015-05-02', \"yyyy\") as year, \n", " date_format(date '2015-05-02', \"MM\") as month,\n", " date_format(date '2015-05-02', \"dd\") as day " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Perform Merge operation\n", "\n", "Lets go ahead and perform the **merge** from **amazon_customer_review_parquet_merge_source** into **amazon_customer_review_hudi**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "\n", "/*************************************\n", "MergeInto : Merge Source Into Traget \n", "**************************************/\n", "\n", "-- Source amazon_customer_review_parquet_merge_source \n", "-- Taget amazon_customer_review_hudi\n", "\n", "\n", "merge into amazon_customer_review_hudi as target\n", "using ( \n", " select\n", " marketplace, \n", " review_id, \n", " customer_id,\n", " product_title,\n", " star_rating,\n", " review_date,\n", " deleteRecord,\n", " date_format(review_date, \"yyyy\") as year,\n", " date_format(review_date, \"MM\") as month,\n", " date_format(review_date, \"dd\") as day\n", " from amazon_customer_review_parquet_merge_source ) source\n", "\n", "on target.review_id = source.review_id \n", "\n", "when matched and deleteRecord != 'yes' then update set target.timestamp = unix_timestamp(current_timestamp()), target.star_rating = source.star_rating, target.product_title = source.product_title\n", "\n", "when matched and deleteRecord = 'yes' then delete\n", "\n", "when not matched then insert \n", " ( target.marketplace, \n", " target.review_id, \n", " target.customer_id,\n", " target.product_title,\n", " target.star_rating,\n", " target.timestamp ,\n", " target.review_date,\n", " target.year ,\n", " target.month ,\n", " target.day\n", " ) \n", " values\n", " (\n", " source.marketplace,\n", " source.review_id, \n", " source.customer_id,\n", " source.product_title,\n", " source.star_rating,\n", " unix_timestamp(current_timestamp()),\n", " source.review_date,\n", " source.year , \n", " source.month ,\n", " source.day \n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lets Quicky verify record with review_id == '11' is deleted , review_id == '22' is updated and review_id == '33' is inserted " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "After Merge Operation Read the deleted record from amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi where review_id == '11'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "After Merge Operation Read the updated record from amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi where review_id == '22' " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "After Merge Operation Read the inserted record from amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi where review_id == '33' " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### MergInto Constraints \n", "\n", "* The merge-on condition can only be applied on primary key as of now. \n", " example **on target.review_id = source.review_id**\n", "\n", "* Support for partial updates is supported for COW table but not supported for MOR tables. \n", "\n", "* Target table's fields cannot be the right-value of the update expression for Merge-On-Read table. \n", "\n", " The update will result in an error as taget columns are present on right handside of the expression\n", " **update set target.star_rating = target.star_rating +1**\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read from a Hudi Table\n", "\n", "Retrieving data from Hudi table is as simple as reading from any other table.\n", "\n", "* To retrieve data, Hudi Spark SQL DML performs a Snapshot Query for Read operations.\n", "* Lets now go ahead and query the Hudi table amazon_customer_review_hudi\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "Query records data from amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Query a sepcific record \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete a Record\n", "\n", "Lets go ahead and perform a delete operation " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Perform a delete over the inserted record" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "Delete the inserted record from amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "Delete from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365'\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Query the deleted record from amazon_customer_review_hudi table \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Insert Overwrite\n", "\n", "After Insert, Quey, Delete operations lets go ahead and execute Insert Overwrite comand\n", "\n", "* This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once \n", "(as opposed to incrementally updating the target tables). \n", "This is because Hudi is able to bypass indexing, precombining and other repartitioning steps \n", "in the upsert write path completely." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql \n", "\n", "\n", "/*************************************\n", "Insert Overwrite amazon_customer_review_hudi table\n", "**************************************/\n", "\n", "-- Insert Record into Apache Hudi table : amazon_customer_review_hudi from amazon_customer_review_parquet\n", "\n", "\n", "INSERT Overwrite table amazon_customer_review_hudi\n", " select \n", " marketplace , \n", " review_id , \n", " customer_id,\n", " product_title,\n", " star_rating,\n", " unix_timestamp(current_timestamp()) as timestamp,\n", " review_date,\n", " date_format(review_date, \"yyyy\") as year, \n", " date_format(review_date, \"MM\") as month,\n", " date_format(review_date, \"dd\") as day \n", " from amazon_customer_review_parquet limit 100 " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Read the inserted records \n", "**************************************/\n", "\n", "select * from amazon_customer_review_hudi limit 10 \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Alter table\n", "\n", "Finally lets see how alter our Hudi table and perform operation of Table rename and Column addition. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Show tables from default databases\n", "**************************************/\n", "\n", "show tables from default " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "Table rename is NOT supported.\n", "**************************************/\n", "\n", "ALTER TABLE amazon_customer_review_hudi rename to amazon_customer_review_hudi_alter" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "\n", "/*************************************\n", "Column addition in an existing table \n", "**************************************/\n", "\n", " ALTER TABLE amazon_customer_review_hudi add columns (name string) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "describe the new table with additional column \n", "**************************************/\n", "\n", "\n", "DESCRIBE TABLE amazon_customer_review_hudi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Virtual Keys\n", "\n", "Apache Hudi 0.9.0, has introduced support for Virtual Keys which allows users to disable generation of these metadata columns, and instead depend on actual data columns to construct the record key/partition paths dynamically using appropriate key generators.\n", "\n", "* Virtual key feature is enabled on Hudi table by setting hoodie.populate.meta.fields = 'true'\n", "* When enabled, populates all meta fields. When disabled, no meta fields are populated and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Create external table amazon_customer_review_hudi_vir_key\n", "**************************************/\n", "\n", "create table if not exists amazon_customer_review_hudi_vir_key \n", " ( \n", " marketplace string, \n", " review_id string, \n", " customer_id string,\n", " product_title string,\n", " star_rating int,\n", " timestamp long ,\n", " review_date date,\n", " year string,\n", " month string ,\n", " day string\n", " )\n", " using hudi\n", " location 's3://EXAMPLE-BUCKET-2/my-hudi-dataset-vir-key/'\n", " options ( \n", " type = 'cow', \n", " primaryKey = 'review_id', \n", " preCombineField = 'timestamp'\n", " )\n", " partitioned by (year,month,day);\n", " \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql \n", "\n", "/*************************************\n", "Enabling Virtual Key by setting serdeproperties hoodie.populate.meta.fields = 'true'\n", "**************************************/\n", "\n", "alter table amazon_customer_review_hudi_vir_key set serdeproperties (hoodie.populate.meta.fields = 'true') " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Lets go ahead and Insert a record into amazon_customer_review_hudi_vir_key hudi table " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "\n", "spark.sql(\n", " \"\"\"INSERT INTO amazon_customer_review_hudi_vir_key\n", " select marketplace , \n", " review_id , \n", " customer_id,\n", " product_title,\n", " star_rating,\n", " unix_timestamp(current_timestamp()) as timestamp,\n", " review_date,\n", " date_format(review_date, \"yyyy\") as year, \n", " date_format(review_date, \"MM\") as month,\n", " date_format(review_date, \"dd\") as day \n", " from amazon_customer_review_parquet limit 20 \n", "\"\"\")\n" ] }, { "cell_type": "markdown", "metadata": { "execution": { "iopub.execute_input": "2022-03-04T21:10:41.527582Z", "iopub.status.busy": "2022-03-04T21:10:41.527227Z", "iopub.status.idle": "2022-03-04T21:10:42.604057Z", "shell.execute_reply": "2022-03-04T21:10:42.603350Z", "shell.execute_reply.started": "2022-03-04T21:10:41.527553Z" } }, "source": [ "We have arrive at the end of this notebook and accomplished the following task in this Notebook using simple SQL statements \n", "\n", "* Create an external table ( amazon_customer_review_parquet) over Amazon Customer reviews Public Dataset (https://s3.amazonaws.com/amazon-reviews-pds/readme.html) \n", "* Create an Apache Hudi Table (amazon_customer_review_hudi ) with partitions \n", "* Copy data from amazon_customer_review_parquet to amazon_customer_review_hudi table. \n", "* Perform insert record , query table, delete record, update, mergeInto ,Insert overwrite table and alter table over the Hudi table amazon_customer_review_hudi\n", "\n", "Support for Spark SQL DML and DDL by Hudi makes it super easy for Creating Table, Inserting,Querying, Updating, Merging and Deleting records. \n", "For deep dive Apache [Hudi 0.9.0 Qucik Start](https://hudi.apache.org/docs/quick-start-guide/) can be further referenced. \n", " " ] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 4 }