AWSTemplateFormatVersion: "2010-09-09" Description: "CloudFormation Template to deploy an Serverless Data Loader for Redshift" Parameters: RedshiftClusterIdentifier: Description: Cluster Identifier for your redshift cluster Type: String Default: 'YourRedshiftClusterIdentiifer' DatabaseUserName: Description: Redshift database user name which has access to run SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" Default: 'awsuser' DatabaseName: Description: Name of the Redshift primary database where SQL Script would be run. Type: String Default: 'dev' DatabaseSchemaName: Description: Name of the Redshift schema name where the tables are created. Type: String Default: 'public' # RedshiftIAMRoleName: # Description: AWS IAM Role Name associated with the Redshift cluster. Default is default Iam role # Type: String # #Default: 'default' # Default: 'default' RedshiftIAMRoleARN: Description: AWS IAM Role ARN associated with the Redshift cluster. If default IAM role is set for cluster and has access to your s3 bucket leave it to default Type: String Default: 'YourClusterRoleARN' # NotificationEmailId: # Type: String # Description: EmailId to send event notifications through Amazon SNS # AllowedPattern: '^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$' # ConstraintDescription: provide a valid email address. # #Default: 'test@test.com' # Default: 'bhanuaws@amazon.com' CopyCommandOptions: Description: Provide the additional COPY command data format parameters Type: String Default: "delimiter '|' gzip" CopyCommandSchedule: Type: String Description: Cron expression to schedule S3 loader process through EventBridge rule Default: 'cron(0/5 * ? * * *)' InitiateSchemaDetection: Description: 'BETA: Dynamically detect schema prior to file load' Type: String Default: 'Yes' AllowedValues: - 'Yes' - 'No' # SqlText: # Type: String # Description: SQL Text to run at the start up # Default: 'call run_elt_process();' SourceS3Bucket: Type: String Description: S3 bucket name where data is stored. Make sure IAM Role that is associated Redshift cluster has access to this bucket Default: 'source-s3-bucket-name' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Input Parameters" Parameters: - RedshiftClusterIdentifier - DatabaseUserName - DatabaseName - DatabaseSchemaName - RedshiftIAMRoleARN - CopyCommandOptions - CopyCommandSchedule - SourceS3Bucket Mappings: RegionMap: us-east-1: # Virginia "S3BucketMap" : "redshift-blogs" "S3KeyMap" : "redshift-s3-auto-loader/awswrangler-layer-2.15.1-py3.9.zip" us-west-2: # Oregon "S3BucketMap" : "redshift-immersionday-labs" "S3KeyMap" : "awswrangler-layer-2.15.1-py3.9.zip" Resources: # RedshiftNotificationTopicSNS: # Type: AWS::SNS::Topic # Properties: # KmsMasterKeyId: alias/aws/sns # Subscription: # - Endpoint: !Ref NotificationEmailId # Protocol: email #Policies #Roles #Role for Lambda functions s3LoaderLambdaRole: Type: AWS::IAM::Role Properties: Description : IAM Role for lambda functions to access Redshift, SQS and DynamoDB ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: RedshiftAccessPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: - redshift:GetClusterCredentials - redshift:DescribeClusters Resource: - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${RedshiftClusterIdentifier} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${RedshiftClusterIdentifier}/${DatabaseName} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${RedshiftClusterIdentifier}/${DatabaseUserName} - Effect: "Allow" Action: - redshift-data:ExecuteStatement - redshift-data:ListStatements - redshift-data:GetStatementResult - redshift-data:DescribeStatement Resource: "*" - PolicyName: DynamoDBAccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams - dynamodb:GetItem - dynamodb:PutItem - dynamodb:Scan - dynamodb:Query - dynamodb:UpdateItem - dynamodb:BatchWriteItem - dynamodb:DeleteItem Resource: "arn:aws:dynamodb:*:*:table/s3_data_loader*" - PolicyName: SQSAccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - sqs:ReceiveMessage - sqs:SendMessage - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:SetQueueAttributes - sqs:GetQueueUrl Resource: - !Sub arn:aws:sqs:${AWS::Region}:${AWS::AccountId}:s3-loader-utility.fifo - PolicyName: s3EventNotification PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketNotification' - 's3:PutBucketNotification' - 's3:ListObjectsV2' - 's3:GetObject' - 's3:PutObject' Resource: - !Sub 'arn:aws:s3:::${SourceS3Bucket}' - !Sub 'arn:aws:s3:::${SourceS3Bucket}/*' - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*' - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:*s3LoaderUtil* #- # PolicyName: SNSPublishPolicy # PolicyDocument : # Version: 2012-10-17 # Statement: # - # Effect: Allow # Action: sns:Publish # Resource: !Ref RedshiftNotificationTopicSNS #Event Bridge Rules #First Event bridge rule to trigger Lambda 2 s3LoaderUtilKickoffFileProcessing KickoffFileProcessingSchedule: Type: "AWS::Events::Rule" Properties: Description: First Event Rule to be triggered periodically based on cron expression. This is Scheduled event and triggers s3LoaderUtilKickoffFileProcessing Lambda. ScheduleExpression: !Ref CopyCommandSchedule State: "ENABLED" Targets: - Arn: Fn::GetAtt: - "s3LoaderUtilKickoffFileProcessing" - "Arn" Id: KickoffFileProcessingSchedule Input: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DatabaseUserName}\",\"action\":\"run_sql\"}}" PermissionForKickoffFileProcessingSchedule: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "s3LoaderUtilKickoffFileProcessing" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "KickoffFileProcessingSchedule" - "Arn" #Event Bridge rule to trigger Lambda 5 (s3LoaderUtilUpdateLogTable). This rule is pattern based. Triggered based on s3LoaderUtilProcessQueueLoadRS Redshift Data API Event. UpdateLogTableEventBased: Type: "AWS::Events::Rule" Description: Second Event Bridge rule to trigger Lambda 5 (s3LoaderUtilUpdateLogTable). This rule is pattern based and triggered based on s3LoaderUtilProcessQueueLoadRS Redshift Data API Event. Properties: EventPattern: !Sub "{\"source\": [\"aws.redshift-data\"],\"detail-type\": [\"Redshift Data Statement Status Change\"]}" Description: Respond to Redshift-data ETL events coming from s3LoaderUtilProcessQueueLoadRS lambda. 2nd Event Rule. State: "ENABLED" Targets: - Arn: !GetAtt 's3LoaderUtilUpdateLogTable.Arn' Id: UpdateLogTableEventBased PermissionForUpdateLogTableEventBased: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "s3LoaderUtilUpdateLogTable" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "UpdateLogTableEventBased" - "Arn" #Event bridge rule to trigger Lambda 4 s3LoaderUtilProcessQueueLoadRS to process queue messages and invoke Redshift COPY commands QueueRSProcessingSchedule: Type: "AWS::Events::Rule" Properties: Description: Event Rule to be triggered periodically, this rule triggers s3LoaderUtilProcessQueueLoadRS Lambda to kickoff the Redshift COPY commands. ScheduleExpression: "rate(5 minutes)" State: "ENABLED" Targets: - Arn: Fn::GetAtt: - "s3LoaderUtilProcessQueueLoadRS" - "Arn" Id: QueueRSProcessingSchedule PermissionForQueueRSProcessingSchedule: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "s3LoaderUtilProcessQueueLoadRS" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "QueueRSProcessingSchedule" - "Arn" #Lambda Functions #step 1 lambda function s3LoaderUtilLogFileMetadata : Type: "AWS::Lambda::Function" #DependsOn: SetupRedshiftObjectsLambdaRole Properties: Description: Step 1 Lambda Function log Metadata to Amazon DynamoDB Table Handler: index.lambda_handler Role: !GetAtt 's3LoaderLambdaRole.Arn' Runtime: python3.9 Layers: - !Ref DataWranglerLayer EphemeralStorage: Size: 5120 MemorySize: 10240 Timeout: 900 Environment: Variables: redshift_cluster_id: !Ref RedshiftClusterIdentifier redshift_database: !Ref DatabaseName redshift_user: !Ref DatabaseUserName redshift_schema: !Ref DatabaseSchemaName redshift_iam_role: !Ref RedshiftIAMRoleARN source_s3_bucket: !Ref SourceS3Bucket copy_default_options: !Ref CopyCommandOptions initiate_schema_detection: !Ref InitiateSchemaDetection Code: ZipFile: | import json import urllib.parse import boto3 import csv import os import time from pyarrow.parquet import ParquetFile import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import awswrangler as wr from dateutil.parser import parse from datetime import date, datetime, timedelta dynamodb = boto3.resource('dynamodb') global_table_name = '' global_schema_name ='' global_file_type = 'csv' global_additional_copy_parameters = '' def AddTableMetadata(s3_table_name_config,rs_iam_role ,redshift_schema,copy_default_options): status = 0 dynamoTable_config = dynamodb.Table('s3_data_loader_table_config') response = dynamoTable_config.get_item(Key={'s3_table_name': s3_table_name_config}) #response = dynamoTable_config.query(KeyConditionExpression=Key('s3_table_name').eq(s3_table_name_config)) #print('Response from dynamoDb is:',response) item_key='Item' if item_key in response: #print('Its already added') item_key='Item' else: event_time = datetime.now() event_time = event_time - timedelta (days=1000) file_last_processed_timestamp = event_time.strftime('%Y-%m-%d %H:%M:%S.%f') #copy_command_additional_options = "delimiter '|' gzip" #if (copy_default_options=='delimiter pipe gzip'): # copy_command_additional_options = "delimiter '|' gzip" #elif (copy_default_options=='delimiter pipe'): # copy_command_additional_options = "delimiter '|'" #if (copy_default_options=='json auto'): # copy_command_additional_options = "FORMAT JSON 'auto'" #elif (copy_default_options=='json noshred'): # copy_command_additional_options = "FORMAT JSON 'noshred'" #elif (copy_default_options=='parquet'): # copy_command_additional_options = "format as parquet" #elif (copy_default_options=='csv'): # copy_command_additional_options = "csv" #elif (copy_default_options=='csv gzip'): # copy_command_additional_options = "csv gzip" #else: # copy_command_additional_options = "delimiter '|' gzip" ### change to support schema creation ##### redshift_tbl_name = s3_table_name_config chck_schema_table_name = s3_table_name_config.split('.',1) if len(chck_schema_table_name) >1: redshift_schema = chck_schema_table_name[0] redshift_tbl_name = chck_schema_table_name[1] create_schema_ddl = 'CREATE SCHEMA IF NOT EXISTS ' + redshift_schema + ';' print(create_schema_ddl) create_redshift_table(create_schema_ddl) time.sleep(3) #### schema creation end ######### dynamoTable_config.put_item( Item={'s3_table_name': s3_table_name_config, 'redshift_table_name': redshift_tbl_name,'max_file_proccessed_timestamp': file_last_processed_timestamp , 'load_status': 'inactive' , 'iam_role':rs_iam_role ,'additional_copy_options':copy_default_options,'redshift_schema':redshift_schema,'schema_detection_completed': 'No'}) status =1 return status def DetectSchemaUpdateMetadata(s3_table_name_config, rs_iam_role, full_s3_file_path): status = 0 global global_table_name global global_schema_name global global_additional_copy_parameters global global_file_type global global_s3_table_name_config dynamoTable_config = dynamodb.Table('s3_data_loader_table_config') response = dynamoTable_config.get_item(Key={'s3_table_name': s3_table_name_config}) item_key='Item' global_table_name = response[item_key]['redshift_table_name'] global_schema_name = response[item_key]['redshift_schema'] print('DetectSchemaUpdateMetadata : DynamoDB table info') print(response) global_s3_table_name_config = s3_table_name_config print('Global S3 global_s3_table_name_config') print(global_s3_table_name_config) if not full_s3_file_path.endswith('/'): if os.environ['initiate_schema_detection'] == 'No': dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set load_status = :label", ExpressionAttributeValues={ ":label": "active" },ReturnValues="UPDATED_NEW") elif response[item_key]['schema_detection_completed'] == 'No': copy_command_additional_options = read_file_type_from_dynamodb_source(full_s3_file_path) if copy_command_additional_options[0] != 'format as json \'auto\'': dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set schema_detection_completed = :label", ExpressionAttributeValues={ ":label": "Yes" },ReturnValues="UPDATED_NEW") dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set additional_copy_options = :label", ExpressionAttributeValues={ ":label": copy_command_additional_options[0]},ReturnValues="UPDATED_NEW") dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set load_status = :label", ExpressionAttributeValues={ ":label": "active" },ReturnValues="UPDATED_NEW") if copy_command_additional_options[1] == 'false': dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set schema_detection_failed = :label", ExpressionAttributeValues={ ":label": "Yes" } ,ReturnValues="UPDATED_NEW") dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set failed_reason = :label", ExpressionAttributeValues={ ":label": copy_command_additional_options[3] } ,ReturnValues="UPDATED_NEW") #print('schema detection failed') #dynamoTable_config.update_item(Key={ 's3_table_name': s3_table_name_config}, UpdateExpression="set schema_detection_failed = :label", ExpressionAttributeValues={ ":label": "Yes" },ReturnValues="UPDATED_NEW") return status def lambda_handler(event, context): # TODO implement Number_of_messages = len(event['Records']) my_file_list =[] s3_base_path ='s3-redshift-loader-source/' s3_file_path_prefix = os.environ['source_s3_bucket'] # Get rest of environment variables control_table_name = 's3_data_loader_file_metadata' iam_role_redshift = os.environ['redshift_iam_role'] copy_file_format ='delimiter pipe gzip' redshift_schema = os.environ['redshift_schema'] copy_default_options = os.environ['copy_default_options'] for i in range(0,Number_of_messages): bucket = event['Records'][i]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][i]['s3']['object']['key'], encoding='utf-8') print('S3 key : ',key) table_name_path = key.replace(s3_base_path,'') tbl_split = table_name_path.split('/',1) table_name = tbl_split[0] complete_file_path= 's3://'+ s3_file_path_prefix+ '/' + key my_file_list.append(table_name) print("Table Name : ",table_name) print('S3 file path : ',complete_file_path) event_time = datetime.now() #format = "%Y-%m-%d %H:%M:%S.%f" #date_object = datetime.strptime(event_time, format) file_created_timestamp = event_time.strftime('%Y-%m-%d %H:%M:%S.%f') # Adding code to capture the content length## s3 = boto3.resource('s3') object = s3.Object(bucket,key) file_content_length = object.content_length # Adding code to capture the content length## config_status = AddTableMetadata(table_name,iam_role_redshift,redshift_schema,copy_default_options) dynamoTable = dynamodb.Table('s3_data_loader_file_metadata') dynamoTable.put_item( Item={'s3_table_name': table_name, 'file_created_timestamp': file_created_timestamp,'s3_file_path': complete_file_path,'file_content_length':file_content_length}) ## detect schema and update the dynamoDb metadata schema_status = DetectSchemaUpdateMetadata(table_name,iam_role_redshift,complete_file_path) return { 'statusCode': 200, 'Num of files': json.dumps(Number_of_messages) } def read_file_type_from_dynamodb_source(table_name_path): s3_client = boto3.client('s3') s3 = boto3.resource('s3') path_split = split_s3_path(table_name_path) s3.Object(bucket_name=path_split[0], key=path_split[1]).download_file('/tmp/data') json_flag = False file_type='Not Identified' is_list = False try: df = wr.s3.read_parquet_metadata(path=table_name_path) file_type='Parquet' df = "" except: print('not parquet') try: #df = pd.read_csv('/tmp/data', keep_default_na=False) #table_headers = df.columns.values.tolist() with open('/tmp/data') as f: if f.read(1) in '{[': print('likely JSON') json_flag = True raise elif ' 1: print('likely CSV') # except StopIteration: # pass # print('TABLE HEADERS_______') # print(table_headers) file_type='CSV' df = "" except: print('not csv') try: with open('/tmp/data') as f: lines = f.read() firstline = lines.split('\n', 1)[0] jsonformatted = json.loads(firstline.replace('\'', '"')) # response = s3_client.get_object(Bucket=path_split[0], Key=path_split[1]) # content = response['Body'].read().decode('utf-8') # firstline = content.split('\n')[0] # jsonformatted = json.loads(firstline.replace('\'', '"')) # content = '' file_type='JSON' json_flag = False except: print('JSON enhanced check 1 failed') json_flag = True if json_flag == True: try: with open('/tmp/data') as f: lines = f.read() #result = [json.dumps(record) for record in json.load(lines)] firstobj = lines.split('},', 1)[0] + '}' print('FIRST CHECK ______') print(firstobj) json.loads(firstobj.replace('\'', '"')) json_flag = False is_list = False file_type='JSON' except: print('JSON enhanced check 2 failed') # LOGIC TO CONVERT JSON FILE EMBEDDED IN A LIST INTO REDSHIFT LOAD FORMAT # if json_flag == True: # enhanced_check_success = False # try: # with open('/tmp/data') as f: # lines = f.read() # firstobj = lines.split('},', 1)[0] + '}' # firstobjCopy = firstobj # bracket_occurence_pos = len(firstobjCopy) - len(firstobjCopy.lstrip()) # if firstobj[bracket_occurence_pos] == '[': # firstobj = firstobj[bracket_occurence_pos+1:] # print('SECOND CHECK ______') # print(firstobj) # json.loads(firstobj.replace('\'', '"')) # is_list = True # json_flag = False # enhanced_check_success = True # file_type='JSON' # except: # print('JSON enhanced check 2 failed') # if enhanced_check_success: # new_file='' # data={} # try: # with open('/tmp/data') as f: # data = json.load(f) # # firstline = new_file.split('\n', 1)[0] # # jsonformatted = json.loads(firstline.replace('\'', '"')) # result = [json.dumps(record) for record in data] # print('JSON REWRITE PRE-FORMATTING SUCCESSFUL') # with open('/tmp/data', 'w') as modified: # for i in result: # modified.write(i+'\n') # print('JSON REWRITE SUCCESSFUL') # is_list=False # with open('/tmp/data', 'r') as f: # print(f.read(256)) # except: # print('JSON rewrite failed, redshift does not support list of jsons') # file_type='Not Identified' print('FILE TYPE: ' + file_type) global global_file_type if (file_type == 'CSV'): global_file_type='csv' result = csv_parser(table_name_path) if result[0] == 'csv' or result[0] == 'csv delimiter \'|\'': parse_csv(False, result[2]) else: if result[1] == 'false': parse_csv(False, result[2]) else: parse_csv(True, result[2]) return result elif file_type == 'JSON': global_file_type='json' read_json(is_list) return 'format as json \'auto\'', 'true', is_list elif file_type == 'Parquet': global_file_type='parquet' read_parquet(table_name_path) return 'format as parquet', 'true', table_name_path else: global_file_type='csv' return os.environ['copy_default_options'], 'false', '', 'Could not detect file type' def csv_parser(table_name_path): copy_command_additional_options='csv ignoreheader 1' s3_client = boto3.client('s3') s3 = boto3.resource('s3') sniffer = csv.Sniffer() has_header = False delimeter=',' with open('/tmp/data', 'rt') as csvfile: dialect = csv.Sniffer().sniff(csvfile.read(2048)) delimeter=dialect.delimiter print('PRINTING DELIMITER _________') print(delimeter) csvfile.seek(0) has_header=csv.Sniffer().has_header(csvfile.read(2048)) print('HAS CSV HEADER PRINT____') print(has_header) if has_header and delimeter == ',': print("HEADER WITH DELIMETER AS COMMA") copy_command_additional_options='csv ignoreheader 1' return copy_command_additional_options, 'true', delimeter elif not has_header and delimeter == ',': print("NO HEADER WITH DELIMETER AS COMMA") copy_command_additional_options='csv' return copy_command_additional_options, 'true', delimeter elif has_header and delimeter == '|': print('HEADER WITH DELIMETER AS PIPE') copy_command_additional_options = 'csv delimiter \'|\' ignoreheader 1' return copy_command_additional_options, 'true', delimeter elif not has_header and delimeter == '|': print('NO HEADER WITH DELIMETER AS PIPE') copy_command_additional_options = 'csv delimiter \'|\'' return copy_command_additional_options, 'true', delimeter else: print('DELIMIER NOT DETECTED IN CSV FILE - ISSUING STANDARD DELIMITER AS COMMA') return copy_command_additional_options, 'false', ',', 'delimiter not detected in csv file - issuing standard delimiter as comma' def split_s3_path(s3_path): path_parts=s3_path.replace("s3://","").split("/") bucket=path_parts.pop(0) key="/".join(path_parts) return bucket, key def parse_csv(has_header, delimeter): try: df = pd.read_csv('/tmp/data', sep = delimeter, keep_default_na=False, nrows=200000) except: dynamoTable_config = dynamodb.Table('s3_data_loader_table_config') dynamoTable_config.update_item(Key={ 's3_table_name': global_s3_table_name_config}, UpdateExpression="set failed_reason = :label", ExpressionAttributeValues={ ":label": 'unsupported delimeter type' } ,ReturnValues="UPDATED_NEW") dynamoTable_config.update_item(Key={ 's3_table_name': global_s3_table_name_config}, UpdateExpression="set load_status = :label", ExpressionAttributeValues={ ":label": "active" },ReturnValues="UPDATED_NEW") table_headers = df.columns.values.tolist() data_types = df.dtypes.astype(str).values.tolist() if has_header == False: table_headers_string='' count=0 for i in table_headers: col_name = 'col_' + str(count+1) table_headers[count] = col_name if count != len(table_headers) - 1: table_headers_string+=col_name + delimeter else: table_headers_string+=col_name count+=1 file_data='' with open('/tmp/data', 'r') as original: file_data = original.read() with open('/tmp/data', 'w') as modified: modified.write(table_headers_string + "\n" + file_data) # with open('/tmp/data', 'r') as f: # print(f.read(2048)) df = pd.read_csv('/tmp/data', sep = delimeter, keep_default_na=False, nrows=200000) len_headers = len(table_headers) len_data = len(data_types) result = dict() for x in range(0, len_data, len_headers): for key, val in zip(table_headers, data_types[x:x+len_headers]): if key not in result: result[key] = [] result[key].append(val) csv_file_confirmation = 'csv' create_csv_ddl(json.loads(json.dumps(result)), df, csv_file_confirmation) def read_parquet(table_name_path): df = wr.s3.read_parquet_metadata(path=table_name_path) print('I AM HERE') pf = ParquetFile('/tmp/data') limit_rows = next(pf.iter_batches(batch_size = 200000)) df_table = pa.Table.from_batches([limit_rows]).to_pandas() cols = df[0].keys() datatypes = df[0].values() print(df[0]) parquet_file_confirmation = 'parquet' create_csv_ddl(df[0], df_table, parquet_file_confirmation) def read_json(is_list): x = {} if is_list: df = pd.read_json('/tmp/data', precise_float=True) else: df = pd.read_json('/tmp/data', lines=True, precise_float=True, nrows=200000) table_headers = df.columns.values.tolist() data_types = df.dtypes.astype(str).values.tolist() integer_columns = [] counter=0 for i in data_types: if 'int' in i: integer_columns.append(table_headers[counter]) counter+=1 print(integer_columns) try: if len(integer_columns) > 0: for i in integer_columns: with open('/tmp/data', 'rb') as f: data = f.readline() data_obj = json.loads(data) if is_float(data_obj[i]): df[i] = df[i].astype(float) print('ITS HAPPENING HERE') print(i) break except: print('enhanced float check failed') data_types = df.dtypes.astype(str).values.tolist() len_headers = len(table_headers) len_data = len(data_types) result = dict() for x in range(0, len_data, len_headers): for key, val in zip(table_headers, data_types[x:x+len_headers]): if key not in result: result[key] = [] result[key].append(val) json_file_confirmation = 'json' create_csv_ddl(json.loads(json.dumps(result)), df, json_file_confirmation) def create_csv_ddl(keyanddatatypes, df, file_confirmation): buf = [] #buf.append('CREATE TABLE IF NOT EXISTS ' + os.environ['redshift_database'] + '.public.' + global_table_name + ' (') buf.append('CREATE TABLE IF NOT EXISTS ' + '\"' + os.environ['redshift_database'] + '\"' + '.' + '\"' + global_schema_name + '\"' + '.' + '\"' + global_table_name + '\"' + ' (') # Tahir : added the change to remove hard code public schema and use the one which is in dynamoDB metadata table sizeof = len(keyanddatatypes) count=1; for key in keyanddatatypes: value = keyanddatatypes[key] if file_confirmation == 'parquet' else keyanddatatypes[key][0] if count == sizeof: value = csv_data_type_mapping(key, value, df, file_confirmation) total = '\"' + key + '\"' + ' ' + str(value) else: value = csv_data_type_mapping(key, value, df, file_confirmation) total = '\"' + key + '\"' + ' ' + str(value) + ', ' buf.append(total) count = count + 1 buf.append(');') ##partition by pt tabledef = ''.join(buf) print("---------print definition ---------") print(tabledef) df = "" global_additional_copy_parameters = '' create_redshift_table(tabledef) def csv_data_type_mapping(key, value, df, file_confirmation): global global_additional_copy_parameters if 'FLOAT' in value or 'float' in value or 'double' in value: return 'FLOAT' elif 'INT' in value or 'int' in value: if global_file_type == 'parquet': print('IN THE GLOBAL PARQUET FILE TYPE L') return value col_bytes_len = int(df[key].astype(bytes).str.len().max()) if col_bytes_len <= 4: return 'SMALLINT' elif col_bytes_len > 9: return 'BIGINT' else: return 'INT' elif 'string' in value or 'object' in value: try: if file_confirmation == 'parquet': print('FILE CONFIRMATION') print(file_confirmation) raise datekey = str(df[key].iloc[0]) print('PRINTING DATEKEY______') print(datekey) #datetime_obj = parse(datekey) if hasDateTime(datekey)[0] == False: if global_additional_copy_parameters == ' timeformat \'auto\'': global_additional_copy_parameters = global_additional_copy_parameters + ' dateformat \'auto\'' else: global_additional_copy_parameters = ' dateformat \'auto\'' print('PRINTING UPDATED COPY COMMAND') print(global_additional_copy_parameters) return 'DATE' elif hasDateTime(datekey)[0]: if global_additional_copy_parameters == ' dateformat \'auto\'': global_additional_copy_parameters = global_additional_copy_parameters + ' timeformat \'auto\'' else: global_additional_copy_parameters = ' timeformat \'auto\'' print('PRINTING UPDATED COPY COMMAND') print(global_additional_copy_parameters) return 'TIMESTAMP' else: raise except: print(key + ' is NOT a datetime object. value is ' + value) try: col_bytes_len = 3 * int(df[key].astype(bytes).str.len().max()) if col_bytes_len < 62535: return 'varchar(' + str(col_bytes_len) + ')' else: return 'varchar(65535)' except: return 'varchar(65535)' else: return value def create_redshift_table(CREATE): redshift_cluster_id = os.environ['redshift_cluster_id'] redshift_database = os.environ['redshift_database'] redshift_user = os.environ['redshift_user'] redshift_cluster_iam_role = os.environ['redshift_iam_role'] default_json_copy_parameter = 'format as json \'auto\'' global global_additional_copy_parameters global global_table_name global global_file_type try: response = boto3.client("redshift-data").execute_statement( ClusterIdentifier = redshift_cluster_id, Database = redshift_database, DbUser = redshift_user, Sql = CREATE ) print('redshift_ddl_response: ') print(response) except Exception as e: print('Invalid response from client') if global_file_type == 'json': dynamoTable_config = dynamodb.Table('s3_data_loader_table_config') dynamoTable_config.update_item(Key={ 's3_table_name': global_s3_table_name_config}, UpdateExpression="set schema_detection_completed = :label", ExpressionAttributeValues={ ":label": "Yes" },ReturnValues="UPDATED_NEW") dynamoTable_config.update_item(Key={ 's3_table_name': global_s3_table_name_config}, UpdateExpression="set additional_copy_options = :label", ExpressionAttributeValues={ ":label": default_json_copy_parameter + global_additional_copy_parameters},ReturnValues="UPDATED_NEW") dynamoTable_config.update_item(Key={ 's3_table_name': global_s3_table_name_config}, UpdateExpression="set load_status = :label", ExpressionAttributeValues={ ":label": "active" },ReturnValues="UPDATED_NEW") def head(filename: str, n: int): try: with open(filename) as f: head_lines = [next(f).rstrip() for x in range(n)] except StopIteration: with open(filename) as f: head_lines = f.read().splitlines() return def is_float(string): try: return float(string) and '.' in string # True if string is a number contains a dot except ValueError: # String is not a number return False def hasDateTime(s): try: return False, datetime.combine(date.fromisoformat(s), time.min) except ValueError: return True, datetime.fromisoformat(s) # def is_date(string, fuzzy=False): # """ # Return whether the string can be interpreted as a date. # :param string: str, string to check for date # :param fuzzy: bool, ignore unknown tokens in string if True # """ # try: # parse(string, fuzzy=fuzzy) # return True # except ValueError: # return False DataWranglerLayer: Type: AWS::Lambda::LayerVersion Properties: CompatibleRuntimes: - python3.9 Content: S3Bucket: !FindInMap [RegionMap, !Ref "AWS::Region", S3BucketMap] S3Key: !FindInMap [RegionMap, !Ref "AWS::Region", S3KeyMap] Description: Packages in AWS Data Wrangler LayerName: AwsDataWrangler-Python39 LicenseInfo: Apache #step 3 lambda function s3LoaderUtilProcessPendingFiles : DependsOn: - s3LoaderLambdaRole Type: AWS::Lambda::Function Properties: Description: Step 3 Lambda function prepare copy commands and process the files Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt 's3LoaderLambdaRole.Arn' Timeout: 30 Environment: Variables: redshift_cluster_id: !Ref RedshiftClusterIdentifier redshift_database: !Ref DatabaseName redshift_user: !Ref DatabaseUserName redshift_iam_role: !Ref RedshiftIAMRoleARN source_s3_bucket: !Ref SourceS3Bucket Code: ZipFile: | import json import boto3 from datetime import datetime from collections import OrderedDict import time import os from boto3.dynamodb.conditions import Key def lambda_handler(event, context): dynamodb = boto3.resource('dynamodb') sqs = boto3.resource('sqs') table_name = event['TableName'] print('Table Name:',table_name) ret = dict() ret['success'] = True ret['results'] = ['0'] statement_name = 's3-loader-utility' s3_bucket_name = os.environ['source_s3_bucket'] s3_file_path_prefix ='s3://' + s3_bucket_name + '/' s3_bucket_manifest_key='s3-redshift-loader-manifest/'+table_name dynamoTable_config = dynamodb.Table('s3_data_loader_table_config') dynamoTable_file = dynamodb.Table('s3_data_loader_file_metadata') response = dynamoTable_config.get_item(Key={'s3_table_name': table_name}) item_key='Item' amazon_sqs_name ='s3-loader-utility.fifo' if item_key in response: print(response['Item']) copy_iam_role =response['Item']['iam_role'] copy_additional_option=response['Item']['additional_copy_options'] copy_redshift_table_name=response['Item']['redshift_table_name'] copy_load_status = response['Item']['load_status'] max_file_proccessed_timestamp = response['Item']['max_file_proccessed_timestamp'] redshift_schema = response['Item']['redshift_schema'] if(copy_load_status=='active'): # get all files from the control table which are not processed yet. response = dynamoTable_file.query(KeyConditionExpression=Key('s3_table_name').eq(table_name) & Key('file_created_timestamp').gt(max_file_proccessed_timestamp)) print(response['Items']) num_rows = len(response['Items']) s3_objects = [] date_object_list=[] #code to handle content length parquet content_length_dic_file ={} #### end content length change if(num_rows>0): for record in response['Items']: print ('file path :',record['s3_file_path']) file_path = record['s3_file_path'] if(file_path[-1]=='/'): skip=1 else: s3_objects.append(record['s3_file_path']) dt_str = record['file_created_timestamp'] format = "%Y-%m-%d %H:%M:%S.%f" date_object = datetime.strptime(dt_str, format) date_object_list.append(date_object) ## content changes for parquet content_length_item ={} content_length_item['content_length'] = int(record['file_content_length']) content_length_dic_file[record['s3_file_path']] = content_length_item ## end content length change #entries_dict = dict(entries = [dict(url=x, mandatory=True) for x in s3_objects]) entries_dict = dict(entries = [dict(url=x, mandatory=True,meta =content_length_dic_file[x] ) for x in s3_objects]) s3 = boto3.client('s3') current_timestamp = datetime.utcnow().isoformat().replace(':','') s3_manifest_file_name = '{}/{}#{}.json'.format(s3_bucket_manifest_key,table_name,current_timestamp) s3.put_object(Body=json.dumps(entries_dict),Bucket=s3_bucket_name,Key= s3_manifest_file_name) max_processed_date = max(date_object_list) print('Max file for the selected table :',max_processed_date) sql_copy_command = 'COPY ' sql_copy_command = sql_copy_command + redshift_schema + "." + copy_redshift_table_name +" FROM \'" +s3_file_path_prefix+ s3_manifest_file_name if (copy_iam_role=='default'): sql_copy_command = sql_copy_command+ "\' iam_role " + copy_iam_role + " manifest " + copy_additional_option else: sql_copy_command = sql_copy_command + "\' iam_role '" + copy_iam_role + "' manifest " + copy_additional_option print('Copy command :',sql_copy_command) #copy_response = execute_sql_data_api(redshift_client,redshift_database,'COPY',sql_copy_command,redshift_user,redshift_cluster_id,False,statement_name,True) queue = sqs.get_queue_by_name(QueueName=amazon_sqs_name) MSG_ATTR ={ 's3_table_name': { 'DataType': 'String', 'StringValue': table_name }, 'redshift_table_name': { 'DataType': 'String', 'StringValue': copy_redshift_table_name }, 's3_manifest_file_path': { 'DataType': 'String', 'StringValue': s3_file_path_prefix+ s3_manifest_file_name }, 's3_manifest_file_path': { 'DataType': 'String', 'StringValue': s3_file_path_prefix+ s3_manifest_file_name }, 'redshift_schema': { 'DataType': 'String', 'StringValue': redshift_schema } } MSG_COPY_COMMAND = sql_copy_command dynamoTable_config.update_item(Key={ 's3_table_name': table_name,}, UpdateExpression="set max_file_proccessed_timestamp = :g", ExpressionAttributeValues={ ':g': max_processed_date.strftime('%Y-%m-%d %H:%M:%S.%f') },ReturnValues="UPDATED_NEW") resp_queu = queue.send_message(MessageBody=MSG_COPY_COMMAND , MessageAttributes=MSG_ATTR , MessageGroupId=table_name) json_msg = json.dumps(resp_queu, indent=4) print(f'''Message sent to the queue {amazon_sqs_name}.Message attributes: \n{json_msg}''') return { 'statusCode': 200, 'body': json.dumps('S3 Loader Util Process Files') } #step 2 lambda function s3LoaderUtilKickoffFileProcessing : DependsOn: - s3LoaderLambdaRole - s3LoaderUtilProcessPendingFiles Type: AWS::Lambda::Function Properties: Description: Step 2 Lambda function trigger the process file function based on the Metadata from DynamoDB Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt 's3LoaderLambdaRole.Arn' Timeout: 30 Environment: Variables: lambda_processing_files_function : !Sub ${s3LoaderUtilProcessPendingFiles} Code: ZipFile: | import json import boto3 from datetime import datetime from collections import OrderedDict import time import os # initialize redshift-data client in boto3 dynamodb_client = boto3.resource('dynamodb') lambda_client = boto3.client('lambda') def lambda_handler(event, context): # TODO implement config_table ='s3_data_loader_table_config' table = dynamodb_client.Table(config_table) response = table.scan() data = response['Items'] lambda_processing_files = os.environ['lambda_processing_files_function'] while 'LastEvaluatedKey' in response: response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) data.extend(response['Items']) print('All Contents of Config table ',data) for i in range(0,len(data)): sql = "Select " + lambda_processing_files +"('"+data[i]['s3_table_name']+"')" print(sql) lambda_payload = {"TableName":data[i]['s3_table_name']} lambda_client.invoke(FunctionName=lambda_processing_files, InvocationType='Event', Payload=json.dumps(lambda_payload)) print('Lambda Payload :',lambda_payload) #update_response = execute_sql_data_api(redshift_client,redshift_database,'KICK OFF FILE LOAD FUNCTION',sql,redshift_user,redshift_cluster_id,False,'Kick off the Lambda Function',False) return { 'statusCode': 200, 'body': json.dumps('S3 Loader Util Kicked Off') } #step 4 lamdba function s3LoaderUtilProcessQueueLoadRS : DependsOn: - s3LoaderLambdaRole Type: AWS::Lambda::Function Properties: Description: Step 4 Lambda function consume Queue messages and fire Redshift DATA API. This function has Env variables where Cluster Id, UserName or stored Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt 's3LoaderLambdaRole.Arn' Timeout: 30 Environment: Variables: redshift_cluster_id: !Ref RedshiftClusterIdentifier redshift_database: !Ref DatabaseName redshift_user: !Ref DatabaseUserName redshift_iam_role: !Ref RedshiftIAMRoleARN #s3bucket: !Ref s3bukectURI Code: ZipFile: | import json import json import boto3 from datetime import datetime from collections import OrderedDict import time from boto3.dynamodb.conditions import Key import os # initialize redshift-data client in boto3 redshift_client = boto3.client("redshift-data") dynamodb = boto3.resource('dynamodb') sqs = boto3.resource('sqs') rs_db_client = boto3.client('redshift') def execute_sql_data_api(redshift_data_api_client, redshift_database_name, command, query, redshift_user, redshift_cluster_id, isSynchronous,statement_name,with_event_flag): MAX_WAIT_CYCLES = 20 attempts = 0 # Calling Redshift Data API with executeStatement() res = redshift_data_api_client.execute_statement( Database=redshift_database_name, DbUser=redshift_user, Sql=query, ClusterIdentifier=redshift_cluster_id, StatementName=statement_name ,WithEvent=with_event_flag) query_id = res["Id"] desc = redshift_data_api_client.describe_statement(Id=query_id) query_status = query_id print("Query status: {} .... for query-->{}".format(query_status, query)) done = False # Wait until query is finished or max cycles limit has been reached. while not done and isSynchronous: #and attempts < MAX_WAIT_CYCLES: #attempts += 1 #time.sleep(1) desc = redshift_data_api_client.describe_statement(Id=query_id) query_status = desc["Status"] if query_status == "FAILED": raise Exception('SQL query failed:' + query_id + ": " + desc["Error"]) elif query_status == "FINISHED": print("query status is: {} for query id: {} and command: {}".format( query_status, query_id, command)) done = True # print result if there is a result (typically from Select statement) if desc['HasResultSet']: response = redshift_data_api_client.get_statement_result( Id=query_id) print("Response of {} query --> {}".format(command, response['Records'])) return response else: print("Current working... query status is: {} ".format(query_status)) # Timeout Precaution if done == False and attempts >= MAX_WAIT_CYCLES and isSynchronous: print("Limit for MAX_WAIT_CYCLES has been reached before the query was able to finish. We have exited out of the while-loop. You may increase the limit accordingly. \n") raise Exception("query status is: {} for query id: {} and command: {}".format( query_status, query_id, command)) return query_status def lambda_handler(event, context): # TODO implement redshift_cluster_id = os.environ['redshift_cluster_id'] redshift_database = os.environ['redshift_database'] redshift_user = os.environ['redshift_user'] redshift_cluster_iam_role = os.environ['redshift_iam_role'] statement_name = 's3-loader-utility' dynamoTable_config = dynamodb.Table('s3_data_loader_table_config') dynamoTable_log = dynamodb.Table('s3_data_loader_log') amazon_sqs_name ='s3-loader-utility.fifo' queue = sqs.get_queue_by_name(QueueName=amazon_sqs_name) max_no_of_copy_commands = 5 copy_counter=1 max_message_to_poll = 5 if max_no_of_copy_commands>10: max_message_to_poll=10 else: max_message_to_poll = max_no_of_copy_commands cluster_status='Not Available' myClusters = rs_db_client.describe_clusters(ClusterIdentifier=redshift_cluster_id)['Clusters'] if len(myClusters) > 0: cluster_status = myClusters[0]['ClusterAvailabilityStatus'] print('Redshift cluster {} status : {}'.format(redshift_cluster_id,cluster_status) ) if cluster_status !='Available': print('Redshift Cluster {} is not available !'.format(redshift_cluster_id)) while True and cluster_status=='Available': messages = queue.receive_messages(MessageAttributeNames=['All'], MaxNumberOfMessages=max_message_to_poll, VisibilityTimeout=30) if len(messages) == 0 or copy_counter >=max_no_of_copy_commands: print('The S3 Loader Que is Empty or has reached max copy command limit') break for msg in messages: copy_counter +=1 print('Object Number----') print('Message object :',msg.body) sql_copy_command = msg.body s3_table_name = msg.message_attributes.get('s3_table_name').get('StringValue') redshift_table_name = msg.message_attributes.get('redshift_table_name').get('StringValue') s3_manifest_file_path = msg.message_attributes.get('s3_manifest_file_path').get('StringValue') redshift_schema = msg.message_attributes.get('redshift_schema').get('StringValue') print('Copy command :',sql_copy_command) copy_response = execute_sql_data_api(redshift_client,redshift_database,'COPY',sql_copy_command,redshift_user,redshift_cluster_id,False,statement_name,True) # purge the meessage from the msg.delete() event_time = datetime.now() log_timestamp = event_time.strftime('%Y-%m-%d %H:%M:%S.%f') dynamoTable_log.put_item(Item={'data_api_queryId': copy_response,'redshift_schema':redshift_schema, 's3_table_name': s3_table_name,'redshift_table_name': redshift_table_name , 's3_manifest_file_path': s3_manifest_file_path , 'copy_command_status':'Execution' ,'copy_command_sql': sql_copy_command,'redshift_query_Id':'','log_timestamp':log_timestamp,'finish_timestamp':''}) return { 'statusCode': 200, 'body': json.dumps('Processed Queue') } #step 5 lamdba function s3LoaderUtilUpdateLogTable : DependsOn: - s3LoaderLambdaRole Type: AWS::Lambda::Function Properties: Description: Step 5 Lambda function to update DynamoDB table with final status for table load. Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt 's3LoaderLambdaRole.Arn' Timeout: 30 Environment: Variables: redshift_cluster_id: !Ref RedshiftClusterIdentifier redshift_database: !Ref DatabaseName redshift_user: !Ref DatabaseUserName redshift_iam_role: !Ref RedshiftIAMRoleARN Code: ZipFile: | import json import boto3 from datetime import datetime from collections import OrderedDict import time # initialize redshift-data client in boto3 dynamodb = boto3.resource('dynamodb') def lambda_handler(event, context): # TODO implement dynamoTable_log = dynamodb.Table('s3_data_loader_log') data_api_detail = event['detail'] statement_name = data_api_detail['statementName'] if(statement_name=='s3-loader-utility'): data_api_query_id = data_api_detail['statementId'] redshift_query_id = str(int(data_api_detail['redshiftQueryId'])) Query_state = data_api_detail['state'] print('Value of data_api_query_id :',data_api_query_id) print('Value of redshift_query_id :',redshift_query_id) print('Value of items Query_state :',Query_state) event_time = datetime.now() log_timestamp = event_time.strftime('%Y-%m-%d %H:%M:%S.%f') dynamoTable_log.update_item(Key={ 'data_api_queryId': data_api_query_id,}, UpdateExpression="set redshift_query_Id = :g ,copy_command_status = :st , finish_timestamp = :ft", ExpressionAttributeValues={ ':g': redshift_query_id ,':st':Query_state,':ft':log_timestamp},ReturnValues="UPDATED_NEW") return { 'statusCode': 200, 'body': json.dumps('Logging Data to Redshift Done!') } DynamoDBTables3loadeFileMetadata: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "s3_table_name" AttributeType: "S" - AttributeName: "file_created_timestamp" AttributeType: "S" KeySchema: - AttributeName: "s3_table_name" KeyType: "HASH" - AttributeName: "file_created_timestamp" KeyType: "RANGE" ProvisionedThroughput: ReadCapacityUnits: "5" WriteCapacityUnits: "5" TableClass: "STANDARD" TableName: "s3_data_loader_file_metadata" DynamoDBTables3DataLoaderLog: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "data_api_queryId" AttributeType: "S" KeySchema: - AttributeName: "data_api_queryId" KeyType: "HASH" ProvisionedThroughput: ReadCapacityUnits: "5" WriteCapacityUnits: "5" TableClass: "STANDARD" TableName: "s3_data_loader_log" DynamoDBTables3DataLoaderTableConfig: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "s3_table_name" AttributeType: "S" KeySchema: - AttributeName: "s3_table_name" KeyType: "HASH" ProvisionedThroughput: ReadCapacityUnits: "5" WriteCapacityUnits: "5" TableClass: "STANDARD" TableName: "s3_data_loader_table_config" S3LoaderSQSQueue: Type: AWS::SQS::Queue Properties: ContentBasedDeduplication: True FifoQueue: True MessageRetentionPeriod: 345600 KmsMasterKeyId: alias/aws/sqs QueueName: s3-loader-utility.fifo ReceiveMessageWaitTimeSeconds: 1 Tags: - Key: "keyname1" Value: "value1" s3LoaderUtilLogFileMetadataInvokePermission: Type: 'AWS::Lambda::Permission' DependsOn: s3LoaderUtilLogFileMetadata Properties: FunctionName: !GetAtt s3LoaderUtilLogFileMetadata.Arn Action: 'lambda:InvokeFunction' Principal: s3.amazonaws.com SourceAccount: !Ref 'AWS::AccountId' SourceArn: !Sub 'arn:aws:s3:::${SourceS3Bucket}' CustomResourceLambdaFunction: Type: 'AWS::Lambda::Function' DependsOn: DynamoDBTables3DataLoaderTableConfig Properties: Handler: index.lambda_handler Role: !GetAtt s3LoaderLambdaRole.Arn Code: ZipFile: | from __future__ import print_function import json import boto3 import cfnresponse import os SUCCESS = "SUCCESS" FAILED = "FAILED" print('Loading function') s3 = boto3.resource('s3') s3_client = boto3.client('s3') def lambda_handler(event, context): print("Received event: " + json.dumps(event, indent=2)) responseData={} try: if event['RequestType'] == 'Delete': print("Request Type:",event['RequestType']) Bucket=event['ResourceProperties']['Bucket'] delete_notification(Bucket) print("Sending response to custom resource after Delete") elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update': print("Request Type:",event['RequestType']) LambdaArn=event['ResourceProperties']['LambdaArn'] Bucket=event['ResourceProperties']['Bucket'] add_notification(LambdaArn, Bucket) responseData={'Bucket':Bucket} print("Sending response to custom resource") responseStatus = 'SUCCESS' except Exception as e: print('Failed to process:', e) responseStatus = 'FAILED' responseData = {'Failure': 'Something bad happened.'} cfnresponse.send(event, context, responseStatus, responseData) def add_notification(LambdaArn, Bucket): s3_client.put_object(Bucket=Bucket,Key='s3-redshift-loader-source/') bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'LambdaFunctionArn': LambdaArn, 'Events': [ 's3:ObjectCreated:*' ], "Filter": { "Key": { "FilterRules": [ { "Name": "prefix", "Value": "s3-redshift-loader-source" } ] } } } ] } ) print("Put request completed....") def delete_notification(Bucket): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={} ) print("Delete request completed....") Runtime: python3.9 Timeout: 60 s3LoaderUtilLogFileMetadataLambdaTrigger: Type: 'Custom::LambdaTrigger' DependsOn: s3LoaderUtilLogFileMetadataInvokePermission Properties: ServiceToken: !GetAtt CustomResourceLambdaFunction.Arn LambdaArn: !GetAtt s3LoaderUtilLogFileMetadata.Arn Bucket: !Ref SourceS3Bucket