from os import environ import sys from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .config("spark.sql.warehouse.dir", sys.argv[1]+"/warehouse/" ) \ .enableHiveSupport() \ .getOrCreate() # Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable test_tableName = "hudi_tbl" hudiOptions = { 'hoodie.table.name': test_tableName, 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': test_tableName, 'hoodie.datasource.hive_sync.database': 'default', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.mode': 'hms' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'bulk_insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save(sys.argv[1]+"/hudi_hive_insert") print("After {}".format(spark.catalog.listTables()))