AWSTemplateFormatVersion: 2010-09-09 Description: > Automatically format and profile your CSV data. Fill in the fields below, click create stack, and wait a few minutes. The formatted data set and profiling report will be automatically saved to the same S3 bucket as your CSV data file. Suppose your CSV file is s3://mybucket/myfile.csv, those will be saved in folder s3://mybucket/afd_data_myfile/. Outputs: StackArn: Description: >- Use this as the stack_arn in your cloud_formation_deployment_stack override. Value: !Ref 'AWS::StackId' GlueJobLink: Description: URL link to Glue job Value: !Join - '' - - 'https://' - !Ref "AWS::Region" - '.console.aws.amazon.com/glue/home?region=' - !Ref "AWS::Region" - '#etl:tab=jobs' InputS3Location: Description: S3 path of the input CSV. The formatted data set and report.html are saved to the same S3 bucket. Value: !Ref CSVFilePath Resources: S3CustomResource: Type: Custom::S3CustomResource Properties: ServiceToken: !GetAtt AWSLambdaFunction.Arn Data_Analyze_Glue_Job: !Ref DataAnalyzerGlueJob CSV_File_Path: !Ref CSVFilePath CSV_Delimiter: !Ref FileDelimiter Event_Timestamp_Col: !Ref EventTimestampColumn Label_Col: !Ref LabelColumn AWSLambdaFunction: Type: "AWS::Lambda::Function" Properties: Description: "Save Glue scripts to a local S3 bucket and trigger the Glue job." FunctionName: !Sub '${AWS::StackName}-${AWS::Region}-lambda' Handler: index.handler Role: !GetAtt AWSLambdaExecutionRole.Arn Timeout: 60 Runtime: python3.9 Code: ZipFile: | import boto3 import cfnresponse import codecs import csv def handler(event, context): # Init ... the_event = event['RequestType'] print("The event is: ", str(the_event)) response_data = {} csv_file_path = event['ResourceProperties']['CSV_File_Path'] csv_delimiter = event['ResourceProperties']['CSV_Delimiter'] event_timestamp_col = event['ResourceProperties']['Event_Timestamp_Col'] label_col = event['ResourceProperties']['Label_Col'] if the_event in ('Create', 'Update'): try: # Check if CSV exists and if event timestamp and label columns exists if csv_delimiter=='': csv_delimiter = ',' path_parts=csv_file_path.replace("s3://","").split("/") bucket=path_parts.pop(0) file_name="/".join(path_parts) s3 = boto3.resource('s3') data = s3.meta.client.get_object(Bucket= bucket, Key=file_name) except Exception as e: print("Execution failed...") print(str(e)) response_data['Data'] = str(e) reason_msg = f'Cannot load CSV file {csv_file_path}. Check if it exists.' cfnresponse.send(event, context, cfnresponse.FAILED, response_data, reason=reason_msg) try: reader = csv.DictReader(codecs.getreader("utf-8")(data["Body"]),delimiter=csv_delimiter) headers = reader.fieldnames header_count = len(headers) headers_msg = '[\''+'\',\''.join(headers)+'\']' if event_timestamp_col not in headers: err_msg = f'[ERROR]: column {event_timestamp_col} is not among CSV headers {headers_msg}. Detected {header_count} columns. Check your data and delimiter.' raise Exception(err_msg) if label_col not in headers: err_msg = f'[ERROR]: column {label_col} is not among CSV headers {headers_msg}. Detected {header_count} columns. Check your data and delimiter.' raise Exception(err_msg) except Exception as e: print("Execution failed...") print(str(e)) response_data['Data'] = str(e) cfnresponse.send(event, context, cfnresponse.FAILED, response_data, reason=str(e)) try: # Download Glue script r_glue_script = s3.meta.client.get_object(Bucket= 'amazon-frauddetector-cfn-templates', Key='AFD_Data_Cleaner/afd_data_analyzer_glue_script.py') r_glue_script = r_glue_script['Body'].read() # Get Glue script location glueJobName = event['ResourceProperties']['Data_Analyze_Glue_Job'] client = boto3.client('glue') response = client.get_job(JobName=glueJobName) script_location = response['Job']['Command']['ScriptLocation'] print(script_location) path_parts=script_location.replace("s3://","").split("/") the_bucket=path_parts.pop(0) file_name="/".join(path_parts) my_session = boto3.session.Session() my_region = my_session.region_name bucket = s3.Bucket(the_bucket) if bucket.creation_date==None: bucket = s3.meta.client.create_bucket(Bucket=the_bucket,CreateBucketConfiguration={'LocationConstraint': my_region}) # Save Glue script s3.meta.client.put_object(Bucket=the_bucket, Key=file_name, Body = r_glue_script) # Start glue job response = client.start_job_run(JobName = glueJobName) except Exception as e: print("Execution failed...") print(str(e)) response_data['Data'] = str(e) cfnresponse.send(event, context, cfnresponse.FAILED, response_data, reason=str(e)) # Everything OK... send the signal back print("Execution succesfull!") cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data) AWSLambdaExecutionRole: Type: AWS::IAM::Role Properties: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AWSLambdaExecute' - 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole' - 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceNotebookRole' AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com Version: '2012-10-17' Policies: - PolicyDocument: Statement: - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Effect: Allow Resource: arn:aws:logs:*:*:* - Action: - s3:Get* - s3:List* - s3:Create* - s3:Put* - s3-object-lambda:Get* - s3-object-lambda:List* - s3-object-lambda:Put* Effect: Allow Resource: arn:aws:s3:::* Version: '2012-10-17' PolicyName: !Sub ${AWS::StackName}-${AWS::Region}-AWSLambda-CW DataAnalyzerGlueJob: Type: 'AWS::Glue::Job' Properties: ExecutionProperty: MaxConcurrentRuns: 1 GlueVersion: 2.0 WorkerType: G.1X NumberOfWorkers: 2 Command: Name: glueetl ScriptLocation: !Join - '' - - 's3://aws-glue-scripts-' - !Ref "AWS::AccountId" - '-' - !Ref "AWS::Region" - '/admin/glue_data_analyzer_script.py' Role: !Ref AWSGlueJobRole MaxRetries: 0 DefaultArguments: '--LabelColumn': !Ref LabelColumn '--EventTimestampColumn': !Ref EventTimestampColumn '--CSVFilePath': !Ref CSVFilePath '--FileDelimiter': !Ref FileDelimiter '--FormatCSV': !Ref FormatCSV '--ProfileCSV': !Ref ProfileCSV '--DropLabelMissingRows': !Ref DropLabelMissingRows '--DropTimestampMissingRows': !Ref DropTimestampMissingRows '--additional-python-modules': 'botocore>=1.20.12,boto3>=1.17.12,pyarrow==2,awswrangler==2.9.0,Jinja2,fsspec==0.9.0' '--FraudLabels': !Join - '' - - !Ref FraudLabels - ' ' '--FeatureCorr': !Ref FeatureCorr '--ReportSuffix': !Join - '' - - !Ref ReportSuffix - ' ' Description: Glue job to format csv and profile data AWSGlueJobRole: Type: 'AWS::IAM::Role' Properties: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole' - 'arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess' AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: - 'sts:AssumeRole' Effect: Allow Principal: Service: - glue.amazonaws.com Policies: - PolicyDocument: Statement: - Action: - s3:Get* - s3:List* - s3:Create* - s3:Head* - s3:Put* - s3:Upload* - s3-object-lambda:Get* - s3-object-lambda:List* - s3-object-lambda:Create* - s3-object-lambda:Head* - s3-object-lambda:Put* - s3-object-lambda:Upload* Effect: Allow Resource: - !Join - '' - - 'arn:aws:s3:::' - !Select [2, !Split ["/", !Ref CSVFilePath]] - '/*' - !Join - '' - - 'arn:aws:s3:::' - !Select [2, !Split ["/", !Ref CSVFilePath]] Version: '2012-10-17' PolicyName: !Sub ${AWS::StackName}-${AWS::Region}-AWSGlue-S3 Parameters: CSVFilePath: Type: String Description: S3 path to your CSV file. E.g. s3://mybucket/myfile.csv Default: 's3://' FileDelimiter: Type: String Description: Delimiter of your CSV file. Default: ',' EventTimestampColumn: Type: String Description: Column name of event timestamp in your CSV file. This is a mandatory column for AFD. Default: 'EVENT_TIMESTAMP' LabelColumn: Type: String Description: Column name of label in your CSV file. This is a mandatory column for AFD. Default: 'EVENT_LABEL' FormatCSV: Type: String Description: Do you want to format your CSV to AFD required formats? This will create and save a new copy of CSV data. Default: 'Yes' AllowedValues: - 'Yes' - 'No' ProfileCSV: Type: String Description: Do you want to profile your data? This will generate an HTML report of your data statistics and potential AFD validation errors. Default: 'Yes' AllowedValues: - 'Yes' - 'No' DropTimestampMissingRows: Type: String Description: Do you want to drop rows with missing EVENT_TIMESTAMP? Recommend Yes for all AFD models. Default: 'Yes' AllowedValues: - 'Yes' - 'No' DropLabelMissingRows: Type: String Description: Do you want to drop rows with missing EVENT_LABELS? Recommend Yes for AFD model with external events. Default: 'Yes' AllowedValues: - 'Yes' - 'No' FraudLabels: Type: String Description: (Optional) Specify label values to be mapped to FRAUD, separated by comma. E.g. suspicious, fraud. If you don't want to map labels, leave this option blank and reprot will show distribution of original label values. Default: ' ' FeatureCorr: Type: String Description: Do you want to show pair-wise feature correlation in report? The correlation shows that for each pair of features how much one feature depends on the other. This calculation may take some time. Default: 'No' AllowedValues: - 'Yes' - 'No' ReportSuffix: Type: String Description: (Optional) Suffix name of profiling report. The report will be named as report_.html. Default: ' ' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Information about your CSV data set:" Parameters: - CSVFilePath - EventTimestampColumn - LabelColumn - FileDelimiter - Label: default: "Format CSV:" Parameters: - FormatCSV - DropTimestampMissingRows - DropLabelMissingRows - Label: default: "Profile CSV:" Parameters: - ProfileCSV - ReportSuffix - FeatureCorr - FraudLabels