# Query `Hudi` dataset using Spark SQL

#### Topics covered in this example
* Hudi operations like Insert, Upsert, Delete, Read and Incremental querying.

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* To use Hudi with Amazon EMR Notebooks, you must first copy the Hudi jar files from the local file system to HDFS on the master node of the EMR cluster. You then use the notebook to configure your EMR notebook to use Hudi. Follow the `Setup` steps.
* With Amazon EMR release version 5.28.0 and later, Amazon EMR installs Hudi components by default when Spark, Hive, or Presto is installed. The EMR cluster attached to this notebook should have the `Spark` and `Hive` applications installed.
* This example uses a public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.
* This notebook uses the `Spark` kernel.
***

## Introduction
Hudi is a data management framework used to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert, and delete capabilities. By efficiently managing how data is laid out in Amazon S3, Hudi allows data to be ingested and updated in near real time. Hudi carefully maintains metadata of the actions performed on the dataset to help ensure that the actions are atomic and consistent.

You can use Hive, Spark, or Presto to query a Hudi dataset interactively or build data processing pipelines using incremental pull. Incremental pull refers to the ability to pull only the data that changed between two actions.

The post: <a href="https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html" target="_blank">Work with a Hudi Dataset</a> provides detailed information.

***

## Setup
1. Create an S3 bucket location to save your hudi dataset. For example: s3://EXAMPLE-BUCKET/my-hudi-dataset/

2. Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired.

```
hdfs dfs -mkdir -p /apps/hudi/lib hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
```
***

## Example

In [None]:
%%configure
{ 
    "conf": 
        {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
        }
}

Initialize a Spark Session for Hudi.  
When using Scala, make sure you import the following classes in your Spark session. This needs to be done once per Spark session.

In [None]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

// Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"]
)

// Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
"hoodie.table.name": "my_hudi_table",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": "creation_date",
"hoodie.datasource.write.precombine.field": "last_update_time",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": "my_hudi_table",
"hoodie.datasource.hive_sync.partition_fields": "creation_date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor"
}

// Write a DataFrame as a Hudi dataset to the S3 location that you created in Step 1.
inputDF.write
.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "insert")
.options(**hudiOptions)
.mode("overwrite")
.save("s3://EXAMPLE-BUCKET/my-hudi-dataset/")    // Change this to the S3 location that you created in Step 1.

#### Upsert Data 
Upsert refers to the ability to insert records into an existing dataset if they do not already exist or to update them if they do.  
The following example demonstrates how to upsert data by writing a DataFrame.  
Unlike the previous insert example, the `OPERATION_OPT_KEY` value is set to `UPSERT_OPERATION_OPT_VAL`.  
In addition, `.mode(SaveMode.Append)` is specified to indicate that the record should be appended.

In [None]:
// Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

updateDF.write
.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudiOptions)
.mode("append")
.save("s3://EXAMPLE-BUCKET/my-hudi-dataset/")  // Change this to the S3 location that you created in Step 1.

#### Delete a Record
To hard delete a record, you can upsert an empty payload. In this case, the `PAYLOAD_CLASS_OPT_KEY` option specifies the `EmptyHoodieRecordPayload` class.  
The example uses the same DataFrame, updateDF, used in the upsert example to specify the same record.

In [None]:
updateDF.write
.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
.options(**hudiOptions)
.mode("append")
.save("s3://EXAMPLE-BUCKET/my-hudi-dataset/")  // Change this to the S3 location that you created in Step 1.

#### Read from a Hudi Dataset
To retrieve data at the present point in time, Hudi performs snapshot queries by default.  
Following is an example of querying the dataset written to S3 in Write to a Hudi Dataset.  
Replace `s3://EXAMPLE-BUCKET/my-hudi-dataset` with your table path, and add wildcard asterisks for each partition level, plus one additional asterisk.  
In this example, there is one partition level, so weâ€™ve added two wildcard symbols.

In [None]:
snapshotQueryDF = spark.read
    .format("org.apache.hudi")
    .load("s3://EXAMPLE-BUCKET/my-hudi-dataset" + "/*/*")   // Change this to the S3 location that you created in Step 1.
    
snapshotQueryDF.show()

#### Incremental Queries
You can also perform incremental queries with Hudi to get a stream of records that have changed since a given commit timestamp.  
To do so, set the `QUERY_TYPE_OPT_KEY` field to `QUERY_TYPE_INCREMENTAL_OPT_VAL`. Then, add a value for `BEGIN_INSTANTTIME_OPT_KEY` to obtain all records written since the specified time.  
Incremental queries are typically ten times more efficient than their batch counterparts since they only process changed records.

When you perform incremental queries, use the root (base) table path without the wildcard asterisks used for Snapshot queries.

In [None]:
readOptions = {
  "hoodie.datasource.query.type": "incremental",
  "hoodie.datasource.read.begin.instanttime": <beginInstantTime>,
}

incQueryDF = spark.read
    .format("org.apache.hudi")
    .options(**readOptions)
    .load("s3://EXAMPLE-BUCKET/my-hudi-dataset")  // Change this to the S3 location that you created in Step 1.
    
incQueryDF.show()