# Query `Apache Hudi` dataset using Spark SQL

#### Topics covered in this example
* Hudi operations like Insert, Update, MergeInto, Read, Delete, Insert Overwrite, Alter Table and Virtual Keys using Spark SQL DMLs and DDLs.

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* To use Hudi with Amazon EMR Notebooks, you must first copy the Hudi jar files from the local file system to HDFS, present on the master node of the EMR cluster. You then use the notebook to configure your EMR notebook to use Hudi. Follow the `Setup` steps.
* <b> EMR 6.5.0</b> cluster should be attached to this notebook and should have the `Spark` and `Hive` applications installed. At the time of this article writing, Hudi 0.9.0 is available as default on EMR 6.5.0
* This example uses a [Amazon Customer reviews](https://s3.amazonaws.com/amazon-reviews-pds/readme.html) public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.
* This notebook uses the `Spark` kernel.
***

## Introduction
Hudi is a data management framework used to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert,merge and delete capabilities. By efficiently managing how data is laid out in Amazon S3, Hudi allows data to be ingested and updated in near real time. Hudi carefully maintains metadata of the actions performed on the dataset to help ensure that the actions are atomic and consistent.

You can use Hive, Spark, Presto, Athena and Redshift to query a Hudi dataset interactively or build data processing pipelines using incremental pull. Incremental pull refers to the ability to pull only the data that changed between two actions.

Hudi now supports for DDL/DMLs using Spark SQL, taking a huge step towards making Hudi more easily accessible and operable by all personas (non-engineers, analysts etc) enabling seamless migration of  existing data set to Hudi seamless, taking a step toward less code pradigm. 

The Quick start guide to use : <a href="https://hudi.apache.org/docs/quick-start-guide/"> Spark SQL DML and DDL</a> capabilities should be referenced for further detailed information.

***

## Setup

1. Create an S3 bucket location to save your hudi dataset. For example: s3://EXAMPLE-BUCKET/my-hudi-dataset/

2. Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired. 

```
hdfs dfs -mkdir -p /apps/hudi/lib 
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -ls /apps/hudi/lib/hudi-spark-bundle.jar
```


***

## Example

## Spark Configuration



The Notebook has been tested  with <b> EMR 6.5.0</b> having Spark 3.1.2 installed 

Important links for Spark and Hudi congiurations are 
* EMR [app version Doc](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-6.x.html)
* [Apache Hudi Github](https://github.com/apache/hudi)
* Apache Hudi SparkSQL [Quick start guide ](https://hudi.apache.org/docs/quick-start-guide)
* Apache Hudi [Configuration](https://hudi.apache.org/docs/configurations)


**Before running the above configure statement, Please ensure the hudi jars are located in HDFS** 


In [None]:
%%configure -f
{
    "conf" : {
        "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",            
        "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
        "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

## Create an External Table over Input data set 

Amazon Customer Reviews is used as source input dataset and will be migrated into a Hudi Table 

* Input Date set : [Amazon Customer Reviews Data set](https://s3.amazonaws.com/amazon-reviews-pds/readme.html)
* [S3](s3://amazon-reviews-pds/parquet/) location for [Amazon Customer Reviews Data set](https://s3.amazonaws.com/amazon-reviews-pds/readme.html)
* Lets extract columns of Interest from Amazon Customer Reviews Data set

#### Querying Amazon Customer Reviews source dataset 

In [None]:
// Input Data set used :  Amazons Review (https://s3.amazonaws.com/amazon-reviews-pds/readme.html)
// Dataset available at s3://amazon-reviews-pds/parquet/. 

// Lets look at Schema of the Amazon Customer Reviews Data set.
spark.read.format("parquet").load("s3://amazon-reviews-pds/parquet/*").printSchema()


// Show columns of interest from Amazon Customer Reviews Dataset

(spark.read.parquet("s3://amazon-reviews-pds/parquet/*")
           .select("marketplace", "review_id", "customer_id", "product_title", "star_rating", "review_date")
           .show(10))



#### Create an External Table amazon_customer_review_parquet

External table <b>amazon_customer_review_parquet</b> is created over Amazon Customer Reviews as input data source 

In [None]:
%%sql 

/****************************
Create a external table over  Amazon customer reviews table
Path = s3://amazon-reviews-pds/parquet/product_category=Home_Improvement/
*****************************/


create external table if not exists amazon_customer_review_parquet 
       (
        marketplace string, 
        review_id string, 
        customer_id string,
        product_title string,
        star_rating int,
        review_date date
       )
       STORED AS PARQUET
       LOCATION 's3://amazon-reviews-pds/parquet/product_category=Home_Improvement'


In [None]:
%%sql 

/****************************
Read from external table  Amazon customercustomer reviews table
created over the Path = s3://amazon-reviews-pds/parquet/product_category=Home_Improvement/
*****************************/

select * from amazon_customer_review_parquet limit 10 

## Create Apache Hudi Table 


* Lets create an Apache hudi table **amazon_customer_review_hudi** partitioned by (year,month,date) where year, month and date will be extracted from **review_date** column of amazon_customer_review_parquet table
* External **amazon_customer_review_hudi** table will be populated with data from **amazon_customer_review_parquet** table

In [None]:
%%sql 

/****************************
Create a HUDI table for  Amazon customer reviews table containing selected columns 
*****************************/

-- Hoodi 9 configuration https://hudi.apache.org/docs/configurations
-- Hoodie configurations can be set in options as hoodie.datasource.hive_sync.assume_date_partitioning = 'false',


create table if not exists amazon_customer_review_hudi
    ( marketplace string, 
      review_id string, 
      customer_id string,
      product_title string,
      star_rating int,
      timestamp long ,
      review_date date,
      year string,
      month string ,
      day string
      )
      using hudi
      location 's3://EXAMPLE-BUCKET/my-hudi-dataset/'
      options ( 
      type = 'cow',  
      primaryKey = 'review_id', 
      preCombineField = 'timestamp'
      )
      partitioned by (year,month,day);
      

-- Change Location to the S3 location that you created in Step 1. s3://EXAMPLE-BUCKET/my-hudi-dataset/

## Copy Data from Amazon Review (parquet) to Hudi table


Lets now begin with copying data from **amazon_customer_review_parquet** to **amazon_customer_review_hudi** table using SQL INSERT INTO statement 
* It is possible to copy large datasets from S3 over to Hudi tables using a simple INSERT INTO command. 
* In our notebook, we will be referring to the previously defined **EXTERNAL amazon_customer_review_parquet** as **source table**, and       
  **amazon_customer_review_hudi** as **target table.**

In [None]:
%%sql 


/*************************************
Copy data from  amazon_customer_review_parquet to amazon_customer_review_hudi table 
**************************************/


INSERT INTO amazon_customer_review_hudi
        select 
        marketplace , 
        review_id , 
        customer_id,
        product_title,
        star_rating,
        unix_timestamp(current_timestamp()) as timestamp,
        review_date,
        date_format(review_date, "yyyy") as year, 
        date_format(review_date, "MM") as month,
        date_format(review_date, "dd") as day  
        from amazon_customer_review_parquet limit 20   



In [None]:
%%sql 

/*************************************
Show Records in amazon_customer_review_hudi table 
**************************************/


select * from amazon_customer_review_hudi

## Insert a record in Hudi Table 

Lets now see how easy it is to insert a record in Hudi table. 

In [None]:
%%sql

/****************************
 Insert a record into amazon_customer_review_hudi table
*****************************/

-- Spark SQL date time functions  https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
-- Spark SQL date time functions https://spark.apache.org/docs/latest/api/sql/index.html#date_add
-- TO_DATE function returns date in YYYY-MM-DD format 

insert into  amazon_customer_review_hudi
    select 
    'US',
    'Q1WWG70WK9VUCH365',
    '15444933',
    'Standing Qigong',
     5,
    123455,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    date_format(date '2015-05-02', "yyyy") as year, 
    date_format(date '2015-05-02', "MM") as month,
    date_format(date '2015-05-02', "dd") as day  


* Pay Special attention to use of date_format to extract year, month and day
* Hudi is smart enough to pad it with 0 in s3 partition path . Example date_format(date '2015-05-02', "MM") = 5 but the S3 parition path will have month=05 
* Use of UDF like string_split, extract etc will result into creation of S3 parition path without 0 being padded. example   month=5 

In [None]:
%%sql

/****************************
 Read the inserted record into amazon_customer_review_hudi table
*****************************/

select * from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365' 


## Update Data 

After insert, lets now try to accomplish an update, Hudi 0.9.0 supports two kinds of DML to update hudi table . 

* **Update**
* **Merge-Into** 


#### Update
Lets Update a pre-existing record. 

* Update refers to the ability to insert records into an existing dataset if they do not already exist or to update a specific pre-existing column. 
* The following example demonstrates how to update star_rating ( condition star_rating = 0 where star_rating = 5) by using Spark SQL statements.

In [None]:
%%sql

/*************************************
Lets take a look at our data in amazon_customer_review_hudi. 
Lets say someone says there is something odd going on with star ratings.
**************************************/

select star_rating, count(*) from amazon_customer_review_hudi  group by star_rating order by star_rating ASC


In [None]:
%%sql

/*************************************

Update The records where Start rating = 5 
**************************************/


update amazon_customer_review_hudi  set star_rating = 0 where star_rating = 5


In [None]:
%%sql

/*************************************
Lets take a re-look at our data in amazon_customer_review_hudi after the update operation
**************************************/


select star_rating, count(*) from amazon_customer_review_hudi group by star_rating order by star_rating ASC


#### MergeInto

Now Lets create dummy **amazon_customer_review_parquet_merge_source** table and insert data into the table which will eventually be **Merged Into** amazon_customer_review_hudi table.
Following steps will be required to perform a merge and understand various possible options of Merge command. 

* Create amazon_customer_review_parquet_merge_source  table with an additonal column for tracking deletion
* Insert Records into amazon_customer_review_parquet_merge_source to showcase update, delete and insert 
* Insert Records into amazon_customer_review_hudi to showcase update, delete and insert 
* Perform a merge from amazon_customer_review_parquet_merge_source into amazon_customer_review_hudi

In [None]:
%%sql 

/****************************
Create a table to be used for merging into amazon_customer_review_hudi 
*****************************/


create table if not exists amazon_customer_review_parquet_merge_source 
       (
        marketplace string, 
        review_id string, 
        customer_id string,
        product_title string,
        star_rating int,
        review_date date,
        deleteRecord string
       )
       STORED AS PARQUET
       LOCATION 's3://EXAMPLE-BUCKET-1/toBeMergeData/'


-- Change Location ('s3://EXAMPLE-BUCKET-1/toBeMergeData/') to appropriate S3 bucket you have created in your AWS account

##### Insert record into amazon_customer_review_parquet_merge_source table

Lets Insert Records into **amazon_customer_review_parquet_merge_source** table for update, delete and insert 

In [None]:
%%sql 


/****************************
 Insert a record into amazon_customer_review_parquet_merge_source for Deletion 
*****************************/

-- The record will be deleted from amazon_customer_review_hudi after merge as deleteRecord  is set to yes

insert into  amazon_customer_review_parquet_merge_source
    select
    'italy',
    '11',
    '1111',
    'table',
     5,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    'yes' 

 

In [None]:
%%sql 


/****************************
 Insert a record into amazon_customer_review_parquet_merge_source used for Update
*****************************/

-- The record will be updated from amazon_customer_review_hudi with new Star rating and product_title after merge

insert into  amazon_customer_review_parquet_merge_source
    select
    'spain',
    '22',
    '2222',
    'Relaxing chair',
     4,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    'no' 


In [None]:
%%sql 


/****************************
 Insert a record into amazon_customer_review_parquet_merge_source for Insert 
*****************************/

-- The record will be inserted into amazon_customer_review_hudi after merge 


insert into  amazon_customer_review_parquet_merge_source
    select
    'uk',
    '33',
    '3333',
    'hanger',
     3,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    'no' 



In [None]:
%%sql 

/*************************************
Read inserted records from amazon_customer_review_parquet_merge_source
**************************************/

select * from amazon_customer_review_parquet_merge_source 

##### Insert record into amazon_customer_review_hudi table

Lets go ahead and Insert Records into **amazon_customer_review_hudi** for update and delete. 

In [None]:
%%sql

/****************************
 Insert a record into amazon_customer_review_hudi table for deletion after merge 
*****************************/

-- Spark SQL date time functions https://spark.apache.org/docs/latest/api/sql/index.html#date_add


insert into  amazon_customer_review_hudi
    select 
    'italy',
    '11',
    '1111',
    'table',
     5,
    unix_timestamp(current_timestamp()) as timestamp,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    date_format(date '2015-05-02', "yyyy") as year, 
    date_format(date '2015-05-02', "MM") as month,
    date_format(date '2015-05-02', "dd") as day  



In [None]:
%%sql

/****************************
 Insert a record into amazon_customer_review_hudi table for update after merge 
*****************************/


insert into  amazon_customer_review_hudi
    select 
    'spain',
    '22',
    '2222',
    'chair ',
     5,
    unix_timestamp(current_timestamp()) as timestamp,
    TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as  review_date,
    date_format(date '2015-05-02', "yyyy") as year, 
    date_format(date '2015-05-02', "MM") as month,
    date_format(date '2015-05-02', "dd") as day  

##### Perform Merge operation

Lets go ahead and perform the **merge** from  **amazon_customer_review_parquet_merge_source** into **amazon_customer_review_hudi**

In [None]:
%%sql 


/*************************************
MergeInto : Merge Source Into Traget 
**************************************/

-- Source amazon_customer_review_parquet_merge_source 
-- Taget amazon_customer_review_hudi


merge into amazon_customer_review_hudi as target
using ( 
        select
        marketplace, 
        review_id, 
        customer_id,
        product_title,
        star_rating,
        review_date,
        deleteRecord,
        date_format(review_date, "yyyy") as year,
        date_format(review_date, "MM") as month,
        date_format(review_date, "dd") as day
        from amazon_customer_review_parquet_merge_source ) source

on target.review_id = source.review_id 

when matched and deleteRecord != 'yes' then update set target.timestamp = unix_timestamp(current_timestamp()),  target.star_rating = source.star_rating, target.product_title = source.product_title

when matched and deleteRecord = 'yes' then delete

when not matched then insert 
      ( target.marketplace, 
        target.review_id, 
        target.customer_id,
        target.product_title,
        target.star_rating,
        target.timestamp ,
        target.review_date,
        target.year ,
        target.month  ,
        target.day
      ) 
      values
      (
        source.marketplace,
        source.review_id, 
        source.customer_id,
        source.product_title,
        source.star_rating,
        unix_timestamp(current_timestamp()),
        source.review_date,
        source.year , 
        source.month ,
        source.day 
       )

Lets Quicky verify record with review_id == '11' is deleted , review_id == '22' is updated and review_id == '33' is inserted 

In [None]:
%%sql 

/*************************************
After Merge Operation Read the deleted record from amazon_customer_review_hudi table 
**************************************/

select * from amazon_customer_review_hudi where review_id == '11'

In [None]:
%%sql 

/*************************************
After Merge Operation Read the updated record from amazon_customer_review_hudi table 
**************************************/

select * from amazon_customer_review_hudi where review_id == '22' 

In [None]:
%%sql 

/*************************************
After Merge Operation Read the inserted record from amazon_customer_review_hudi table 
**************************************/

select * from amazon_customer_review_hudi where review_id == '33' 

##### MergInto Constraints 

* The merge-on condition can only be applied on primary key as of now. 
  example **on target.review_id = source.review_id**

* Support for partial updates is supported for COW table but not supported for MOR tables. 

* Target table's fields cannot be the right-value of the update expression for Merge-On-Read table. 

  The update will result in an error as taget columns are present on right handside of the expression
  **update set target.star_rating =  target.star_rating +1**


## Read from a Hudi Table

Retrieving data from Hudi table is as simple as reading from any other table.

* To retrieve data, Hudi Spark SQL DML performs a Snapshot Query for Read operations.
* Lets now go ahead and query the Hudi table amazon_customer_review_hudi



In [None]:
%%sql

/*************************************
Query records data from amazon_customer_review_hudi table 
**************************************/

select * from amazon_customer_review_hudi

In [None]:
%%sql 

/*************************************
Query a sepcific record 
**************************************/

select * from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365'

## Delete a Record

Lets go ahead and perform a delete operation 

* Perform a delete over the inserted record

In [None]:
%%sql

/*************************************
Delete the inserted record  from amazon_customer_review_hudi table 
**************************************/

Delete from  amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365'


In [None]:
%%sql 

/*************************************
Query the deleted record from amazon_customer_review_hudi table 
**************************************/

select * from amazon_customer_review_hudi where review_id == 'Q1WWG70WK9VUCH365'

## Insert Overwrite

After Insert, Quey, Delete operations lets go ahead and execute Insert Overwrite comand

* This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once 
(as opposed to incrementally updating the target tables). 
This is because Hudi is able to bypass indexing, precombining and other repartitioning steps 
in the upsert write path completely.

In [None]:
%%sql 


/*************************************
Insert Overwrite amazon_customer_review_hudi table
**************************************/

-- Insert Record into Apache Hudi table : amazon_customer_review_hudi from amazon_customer_review_parquet


INSERT Overwrite table amazon_customer_review_hudi
        select 
        marketplace , 
        review_id , 
        customer_id,
        product_title,
        star_rating,
        unix_timestamp(current_timestamp()) as timestamp,
        review_date,
        date_format(review_date, "yyyy") as year, 
        date_format(review_date, "MM") as month,
        date_format(review_date, "dd") as day  
        from amazon_customer_review_parquet limit 100 

In [None]:
%%sql  

/*************************************
Read the inserted records 
**************************************/

select *  from amazon_customer_review_hudi limit 10 


## Alter table

Finally lets see how alter our Hudi table and perform operation of Table rename and Column addition. 


In [None]:
%%sql 

/*************************************
Show tables from default databases
**************************************/

show tables from default 

In [None]:
%%sql

/*************************************
Table rename is NOT supported.
**************************************/

ALTER TABLE amazon_customer_review_hudi rename to amazon_customer_review_hudi_alter

In [None]:
%%sql

/*************************************
Column addition in an existing table 
**************************************/

 ALTER TABLE amazon_customer_review_hudi add columns (name string) 

In [None]:
%%sql 

/*************************************
describe the new table with additional column 
**************************************/


DESCRIBE TABLE amazon_customer_review_hudi

## Virtual Keys

Apache Hudi 0.9.0, has introduced support for Virtual Keys which allows users to disable generation of these metadata columns, and instead depend on actual data columns to construct the record key/partition paths dynamically using appropriate key generators.

* Virtual key feature is enabled on Hudi table by setting  hoodie.populate.meta.fields = 'true'
* When enabled, populates all meta fields. When disabled, no meta fields are populated and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing.

In [None]:
%%sql 

/*************************************
Create external table amazon_customer_review_hudi_vir_key
**************************************/

create table if not exists amazon_customer_review_hudi_vir_key 
      ( 
      marketplace string, 
      review_id string, 
      customer_id string,
      product_title string,
      star_rating int,
      timestamp long ,
      review_date date,
      year string,
      month string ,
      day string
      )
      using hudi
      location 's3://EXAMPLE-BUCKET-2/my-hudi-dataset-vir-key/'
      options ( 
      type = 'cow',  
      primaryKey = 'review_id', 
      preCombineField = 'timestamp'
      )
      partitioned by (year,month,day);
     


In [None]:
%%sql 

/*************************************
Enabling Virtual Key by setting serdeproperties hoodie.populate.meta.fields = 'true'
**************************************/

alter table amazon_customer_review_hudi_vir_key set serdeproperties (hoodie.populate.meta.fields = 'true') 

* Lets go ahead and Insert a record into amazon_customer_review_hudi_vir_key hudi table 

In [None]:

spark.sql(
    """INSERT INTO amazon_customer_review_hudi_vir_key
        select marketplace , 
        review_id , 
        customer_id,
        product_title,
        star_rating,
        unix_timestamp(current_timestamp()) as timestamp,
        review_date,
        date_format(review_date, "yyyy") as year, 
        date_format(review_date, "MM") as month,
        date_format(review_date, "dd") as day  
       from amazon_customer_review_parquet limit 20   
""")


We have arrive at the end of this notebook and accomplished the following task in this Notebook using simple SQL statements 

* Create an external table  ( amazon_customer_review_parquet) over   Amazon Customer reviews Public Dataset (https://s3.amazonaws.com/amazon-reviews-pds/readme.html)  
* Create an Apache Hudi Table (amazon_customer_review_hudi ) with partitions 
* Copy data from amazon_customer_review_parquet to amazon_customer_review_hudi  table. 
* Perform insert record  , query table,  delete record, update, mergeInto ,Insert overwrite table and alter table over the Hudi table  amazon_customer_review_hudi

Support for Spark SQL DML and DDL by Hudi makes it super easy for Creating Table, Inserting,Querying, Updating, Merging and  Deleting records. 
For deep dive Apache [Hudi 0.9.0 Qucik Start](https://hudi.apache.org/docs/quick-start-guide/) can be further referenced. 
    