In [None]:
%session_id_prefix hudi-sql-
%glue_version 3.0
%idle_timeout 60
%connections 
%%configure 
{
 "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
}

In [None]:
bucket_name = ""
bucket_prefix = ""
database_name = "hudi_sql"
table_name = "product_cow"
table_prefix = f"{bucket_prefix}/{database_name}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}"

## Clean up existing resources

In [None]:
import boto3

## Create a database with the name hudi_sql to host hudi tables if not exists.
try:
 glue = boto3.client('glue')
 glue.create_database(DatabaseInput={'Name': database_name})
 print(f"New database {database_name} created")
except glue.exceptions.AlreadyExistsException:
 print(f"Database {database_name} already exist")

## Delete files in S3
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.objects.filter(Prefix=f"{table_prefix}/").delete()

## Drop table in Glue Data Catalog
try:
 glue = boto3.client('glue')
 glue.delete_table(DatabaseName=database_name, Name=table_name)
except glue.exceptions.EntityNotFoundException:
 print(f"Table {database_name}.{table_name} does not exist")


## Create Hudi table with sample data using catalog sync

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

In [None]:
df_products.createOrReplaceTempView("tmp_product_cow")

The following query create an external hudi table with the configuration specified in the options. For more information, check https://hudi.apache.org/docs/table_management/#create-table-for-an-external-hudi-table

In [None]:
query = f"""
create table if not exists {database_name}.{table_name} using hudi
options (
 type = 'cow',
 primaryKey = 'product_id',
 preCombineField = 'updated_at',
 path = '{table_location}',
 hoodie.table.name = '{table_name}',
 hoodie.datasource.write.operation = 'upsert',
 hoodie.datasource.hive_sync.enable = 'true',
 hoodie.datasource.hive_sync.database = '{database_name}',
 hoodie.datasource.hive_sync.table = '{table_name}',
 hoodie.datasource.hive_sync.partition_fields = 'category',
 hoodie.datasource.hive_sync.partition_extractor_class = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
 hoodie.datasource.hive_sync.use_jdbc = 'false',
 hoodie.datasource.write.hive_style_partitioning = 'true'
)
partitioned by (category)
AS SELECT * FROM tmp_product_cow
"""
spark.sql(query)

## Read from Hudi table

In [None]:
%%sql
SELECT * FROM hudi_sql.product_cow

## Upsert records into Hudi table

In [None]:
ut = time.time()

product_updates = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'updated_at': ut, 'category': 'Electronics'}, # Update
 {'product_id': '00006', 'product_name': 'Desk', 'price': 50, 'updated_at': ut, 'category': 'Furniture'} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)


In [None]:
df_product_updates.createOrReplaceTempView("tmp_product_cow_updates")

In [None]:
%%sql
INSERT OVERWRITE hudi_sql.product_cow SELECT * FROM tmp_product_cow_updates

In [None]:
%%sql
SELECT * FROM hudi_sql.product_cow

In [None]:
%%sql
UPDATE hudi_sql.product_cow SET price =price * 1.2, updated_at = unix_timestamp(current_timestamp())

In [None]:
%%sql
SELECT * FROM hudi_sql.product_cow

## Stop Session

In [None]:
%stop_session