# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import sys from datetime import datetime from awsglue.transforms import ApplyMapping, SelectFields, ResolveChoice from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job # System arguments args_list = ["job_type", "ddb_issues_table_name", "ddb_data_hierarchy_table_name", "glue_db_name", "glue_issues_table_name", "glue_data_hierarchy_table_name", "glue_output_bucket"] args = getResolvedOptions(sys.argv, args_list) # NOSONAR: python:S4823 JOB_TYPE = args["job_type"] DDB_ISSUES_TABLE_NAME = args["ddb_issues_table_name"] DDB_DATA_HIERARCHY_TABLE_NAME = args["ddb_data_hierarchy_table_name"] GLUE_ISSUES_TABLE_NAME = args["glue_issues_table_name"] GLUE_DATA_HIERARCHY_TABLE_NAME = args["glue_data_hierarchy_table_name"] GLUE_DB_NAME = args["glue_db_name"] GLUE_OUTPUT_BUCKET = args["glue_output_bucket"] # Sets Glue context and logging spark_context = SparkContext() glue_context = GlueContext(spark_context) job = Job(glue_context) class JobInputException(Exception): """Raised when input to the job is not valid""" pass def log_message(msg): msg_arr = [f'****** LOG_MSG {datetime.now()} ******'] if not isinstance(msg, list): msg = [msg] # Add some preceding whitespace to each line for the log message. # This makes it easier to read in the Glue logs on Cloudwatch msg = list(map(lambda x: f' {x}', msg)) msg_arr.extend(msg) msg_arr.append('') # empty line # Glue sends Python logging messages (using logger) to the error logs in CloudWatch. # Instead, we will use the print statement as they appear in the normal Logs section # of the Glue job. print('\n'.join(msg_arr)) def get_column_mapping(column_name): """ Maps the columns from the Glue Data Catalog that was generated by Crawling the DynamoDB table with the column in the table that was crated in Glue. Defaults to string if no other data type is specified """ bigint_const = "bigint##bigint" if (JOB_TYPE == "issues"): data_types = { "version": bigint_const, "createddateutc": "date##date", "acknowledgedtime": bigint_const, "resolutiontime": bigint_const } elif (JOB_TYPE == "hierarchy"): data_types = { "version": bigint_const, "rootcauses": "array##string", "filterPolicy": "struct##string" } if column_name in data_types is not None: split_type = data_types[column_name].split("##") return (column_name, split_type[0], column_name, split_type[1]) else: return (column_name, "string", column_name, "string") def main(): """This script will load data from the supplied DynamoDB Table to S3 so it can be analyzed with Athena""" if (JOB_TYPE == "issues"): DDB_TABLE_NAME = DDB_ISSUES_TABLE_NAME GLUE_TABLE_NAME = GLUE_ISSUES_TABLE_NAME FIELD_PATHS = [ "eventid", "acknowledged", "created", "sitename", "issuesource", "priority", "areaname#status#processname#eventdescription#stationname#devicename#created", "version", "devicename", "devicename#eventid", "createdat", "areaname", "processname", "createddateutc", "eventdescription", "areaname#status#processname#stationname#devicename#created", "stationname", "id", "acknowledgedtime", "status", "updatedat", "closed", "resolutiontime", "createdby", "acknowledgedby", "closedby", "rejectedby", "additionaldetails" ] elif (JOB_TYPE == "hierarchy"): DDB_TABLE_NAME = DDB_DATA_HIERARCHY_TABLE_NAME GLUE_TABLE_NAME = GLUE_DATA_HIERARCHY_TABLE_NAME FIELD_PATHS = [ "createdat", "name", "description", "id", "devicestationid", "type", "version", "parentid", "updatedat", "areasiteid", "eventprocessid", "eventtype", "priority", "rootcauses", "sms", "eventimgkey", "email", "protocol", "endpoint", "filterpolicy", "subscriptionarn", "stationareaid", "processareaid", "alias" ] else: raise JobInputException(f"JOB_TYPE was invalid ({JOB_TYPE}). Expecting either \"issues\" or \"hierarchy\"") log_message([ "Running with the following context:", f"DDB_TABLE_NAME: {DDB_TABLE_NAME}", f"GLUE_TABLE_NAME: {GLUE_TABLE_NAME}", f"GLUE_DB_NAME: {GLUE_DB_NAME}", f"GLUE_OUTPUT_BUCKET: {GLUE_OUTPUT_BUCKET}" ]) DDB_TABLE_NAME_FORMATTED = DDB_TABLE_NAME.lower().replace('-', '_') log_message("Mapping columns") COLUMN_MAPPINGS = list(map(lambda x: get_column_mapping(x), FIELD_PATHS)) log_message("Creating a Dynamic Frame from the DynamoDB table schema") datasource0 = glue_context.create_dynamic_frame.from_catalog( database = GLUE_DB_NAME, table_name = DDB_TABLE_NAME_FORMATTED, transformation_ctx = "datasource0" ) log_message("Applying column mappings") applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = COLUMN_MAPPINGS, transformation_ctx = "applymapping1" ) log_message("Selecting fields") selectfields2 = SelectFields.apply( frame = applymapping1, paths = FIELD_PATHS, transformation_ctx = "selectfields2" ) log_message("Resolving") resolvechoice3 = ResolveChoice.apply( frame = selectfields2, choice = "MATCH_CATALOG", database = GLUE_DB_NAME, table_name = GLUE_TABLE_NAME, transformation_ctx = "resolvechoice3" ) resolvechoice4 = ResolveChoice.apply( frame = resolvechoice3, choice = "make_struct", transformation_ctx = "resolvechoice4" ) log_message("Persisting data in S3") glue_context.write_dynamic_frame.from_catalog( frame = resolvechoice4, database = GLUE_DB_NAME, table_name = GLUE_TABLE_NAME, transformation_ctx = "datasink5" ) job.commit() log_message("Done") if __name__ == '__main__': main()