import logging import sys import time import awswrangler as wr from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import StringType logger = logging.getLogger() logger.setLevel(logging.INFO) logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO) args = getResolvedOptions( sys.argv, [ "JOB_NAME", "TARGET_DDB_TABLE", "S3_BUCKET", "S3_PREFIX_PROCESSED", "TABLE_HEADER_NAME", ], ) target_ddb_table = args["TARGET_DDB_TABLE"] s3_bucket = args["S3_BUCKET"] s3_prefix_processed = args["S3_PREFIX_PROCESSED"] table_header_name = args["TABLE_HEADER_NAME"].split(",") logger.info("Read processed file (model pipeline output) (no header) ...") source_s3_proc = f"s3://{s3_bucket}/{s3_prefix_processed}" input_df_proc = wr.s3.read_csv(source_s3_proc, header=None, chunksize=1000) glueContext = GlueContext(SparkContext.getOrCreate()) job = Job(glueContext) job.init(args["JOB_NAME"], args) logger.info("Target DDB Table: [{}]".format(target_ddb_table)) logger.info("START: Loading data to DDB Table ...") t2 = time.time() rec_cnt = 0 # Read CSV file(s) in chunks/sets of 1000 records and load to DDB table for input_df in input_df_proc: # convert all columns to str input_df = input_df.iloc[:, [0, -1]] input_df.columns = table_header_name input_df = input_df.astype(str) rec_cnt += input_df.shape[0] wr.dynamodb.put_df(df=input_df, table_name=target_ddb_table) time.sleep(1) output2 = time.time() - t2 logger.info("END : Loading data to DDB Table ...") logger.info("Loading time: [{}] seconds".format(output2)) logger.info("No. of records loaded: [{}]".format(rec_cnt)) job.commit()