spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor //Set up various input values as variables val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/18/" val hudiTableName = "order_hudi_cow" val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName) // Read data from S3 and create a DataFrame with Partition and Record Key val inputDF = spark.read.format("parquet").load(inputDataPath) // Write data into the Hudi dataset inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath