# Event Dashboard solution on quicksight dashboard file # # template for Event Dashboard solution with quicksight dashboard. # **DO NOT DELETE** # # author: satyaso@ AWSTemplateFormatVersion: 2010-09-09 Description: (V-01) - Event dashboard implementation over Event-database-solution Parameters: EventAthenaDatabaseName: Type: String Description: Provide Digital User Engagement Events Database in Amazon Athena (EventAthenaDatabaseName) Default: "due_eventdb" S3DataLogBucket: Type: String Description: Provide S3 Bucket Name where the S3 Data Lake and Events are stored; this would have during the Event database implementation Mappings: LambdaRuntime: Language: Python: python3.8 Resources: EventTableAllEmailNamedQuery: Type: AWS::Athena::NamedQuery DeletionPolicy: Delete Properties: Database: !Ref EventAthenaDatabaseName Description: "Create the Email AllMail View" QueryString: !Sub | CREATE OR REPLACE VIEW email_all_events AS SELECT event_type , from_unixtime((event_timestamp / 1000)) event_timestamp , from_unixtime((arrival_timestamp / 1000)) arrival_timestamp , application.app_id application_id , client.client_id endpoint_id , attributes['campaign_id'] pinpoint_campaign_id , attributes['treament_id'] pinpoint_treatment_id , awsaccountid aws_account_id , facets.email_channel.mail_event.mail.message_id message_id , from_unixtime((facets.email_channel.mail_event.mail.message_send_timestamp / 1000)) message_send_timestamp , facets.email_channel.mail_event.mail.from_address from_address , element_at(facets.email_channel.mail_event.mail.destination, 1) destination , facets.email_channel.mail_event.mail.common_headers.subject as subject , MAP_CONCAT(COALESCE(client_context.custom, CAST(JSON '{}' AS MAP(varchar,varchar))), attributes) as message_tags , ingest_timestamp FROM all_events WHERE (event_type LIKE '_email.%') EventTableAllSMSNamedQuery: Type: AWS::Athena::NamedQuery DeletionPolicy: Delete Properties: Database: !Ref EventAthenaDatabaseName Description: "Create the Email All SMS View" QueryString: !Sub | CREATE OR REPLACE VIEW sms_all_events AS SELECT event_type , from_unixtime((event_timestamp / 1000)) event_timestamp , from_unixtime((arrival_timestamp / 1000)) arrival_timestamp , application.app_id application_id , client.client_id endpoint_id , attributes['campaign_id'] pinpoint_campaign_id , attributes['treament_id'] pinpoint_treatment_id , awsaccountid aws_account_id , attributes['sender_request_id'] as sender_request_id , attributes['destination_phone_number'] as destination_phone_number , attributes['record_status'] as record_status , attributes['iso_country_code'] as iso_country_code , attributes['number_of_message_parts'] as number_of_message_parts , attributes['message_id'] as message_id , attributes['message_type'] as message_type , attributes['origination_phone_number'] as origination_phone_number , metrics.price_in_millicents_usd as price_in_millicents_usd , CAST(JSON_PARSE(attributes['customer_context']) AS MAP(VARCHAR, VARCHAR)) as message_tags , ingest_timestamp FROM all_events WHERE (event_type LIKE '_SMS.%') EventTableAllCampaignNamedQuery: Type: AWS::Athena::NamedQuery DeletionPolicy: Delete Properties: Database: !Ref EventAthenaDatabaseName Description: "Creates the Campaign events View" QueryString: !Sub | CREATE OR REPLACE VIEW campaign_all_events AS SELECT event_type , from_unixtime((event_timestamp / 1000)) event_timestamp , from_unixtime((arrival_timestamp / 1000)) arrival_timestamp , application.app_id application_id , client.client_id endpoint_id , attributes['campaign_id'] pinpoint_campaign_id , attributes['treament_id'] pinpoint_treatment_id , awsaccountid aws_account_id , attributes['delivery_type'] delivery_type , attributes['campaign_send_status'] campaign_send_status , ingest_timestamp FROM all_events WHERE (event_type like '_campaign.%') EventTableJourneyNamedQuery: Type: AWS::Athena::NamedQuery DeletionPolicy: Delete Properties: Database: !Ref EventAthenaDatabaseName Description: "Create all Journey View" QueryString: !Sub | CREATE OR REPLACE VIEW journey_all_events AS SELECT event_type , from_unixtime((event_timestamp / 1000)) event_timestamp , from_unixtime((arrival_timestamp / 1000)) arrival_timestamp , application.app_id application_id , client.client_id endpoint_id , attributes['journey_run_id'] journey_run_id , attributes['journey_send_status'] journey_send_status , attributes['journey_id'] journey_id , attributes['journey_activity_id'] journey_activity_id , awsaccountid aws_account_id , typeof(client_context.custom['endpoint']) as custom , json_extract_scalar(client_context.custom['endpoint'], '$.EndpointStatus') as end_point_status , json_extract_scalar(client_context.custom['endpoint'], '$.ChannelType') as channel_type , json_extract_scalar(client_context.custom['endpoint'], '$.User.UserId') as user_id , ingest_timestamp FROM all_events WHERE (event_type like '_journey.%') EventTableAllCustomNamedQuery: Type: AWS::Athena::NamedQuery DeletionPolicy: Delete Properties: Database: !Ref EventAthenaDatabaseName Description: "Create the Custom events View" QueryString: !Sub | CREATE OR REPLACE VIEW custom_all_events AS SELECT event_type , from_unixtime((event_timestamp / 1000)) event_timestamp , from_unixtime((arrival_timestamp / 1000)) arrival_timestamp , application.app_id application_id , client.client_id endpoint_id , client.cognito_id cognito_id , session['session_id'] session_id , from_unixtime(CAST(session['start_timestamp'] as BIGINT) / 1000) session_start_time , from_unixtime(CAST(session['stop_timestamp'] as BIGINT) / 1000) session_stop_time , ingest_timestamp FROM all_events WHERE (event_type NOT LIKE '_email.%' and event_type NOT LIKE '_SMS.%' and event_type NOT LIKE '_campaign.%' and event_type NOT LIKE '_test.event%' and event_type NOT LIKE '_journey.%') ##Function invocation to execute the quries SetupSampleFiles: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn CustomResourceAction: SetupSampleFiles CustomResourceHelper: Type: AWS::Lambda::Function Properties: Environment: Variables: LOG_LEVEL: "DEBUG" S3_DATA_BUCKET: !Ref S3DataLogBucket EMAIL_ALL_EVENTS: !Ref EventTableAllEmailNamedQuery JOURNEY_ALL_EVENTS: !Ref EventTableJourneyNamedQuery SMS_ALL_EVENTS: !Ref EventTableAllSMSNamedQuery CAMPAIGN_ALL_EVENTS: !Ref EventTableAllCampaignNamedQuery CUSTOM_ALL_EVENTS: !Ref EventTableAllCustomNamedQuery Description: Helps set up the solution. MemorySize: 256 Role: !GetAtt CustomResourceHelperRole.Arn Timeout: 300 Handler: index.lambda_handler Runtime: !FindInMap ["LambdaRuntime", "Language", "Python"] Code: ZipFile: | import cfnresponse import os import logging import traceback import boto3 import json import time athena = boto3.client('athena') sesv2 = boto3.client('sesv2') def execute_named_queries(namedQueries): try: response = athena.batch_get_named_query( NamedQueryIds=namedQueries ) for q in response['NamedQueries']: start_query_response = athena.start_query_execution( QueryString=q['QueryString'], QueryExecutionContext={ 'Database': q['Database'] }, ResultConfiguration={ 'OutputLocation': 's3://%s/temp/' % (os.environ.get('S3_DATA_BUCKET')) } ) while True: time.sleep(4) get_query_response = athena.get_query_execution( QueryExecutionId=start_query_response['QueryExecutionId'] ) if get_query_response['QueryExecution']['Status']['State'] == 'SUCCEEDED' or get_query_response['QueryExecution']['Status']['State'] == 'FAILED': logging.info('Query Result : %s' % (q['Name']), get_query_response) break except Exception as error: logging.error('execute_named_queries error: %s' % (error)) logging.error('execute_named_queries trace: %s' % traceback.format_exc()) raise def lambda_handler(event, context): body = { "message": "Go Serverless v1.0! Your function executed successfully !", "input": event } ''' response = { "statusCode": 200, "body": json.dumps(body) } # return response ''' log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'DEBUG' time.sleep(5) logging.getLogger().setLevel(log_level) bucketname = os.environ.get('S3_DATA_BUCKET') logging.info('Event value ------ : %s' % event) logging.info('S3_DATA_BUCKET ENV variable :---- %s' % bucketname) try: if event['ResourceProperties']['CustomResourceAction'] == 'SetupSampleFiles': execute_named_queries([ os.environ.get('EMAIL_ALL_EVENTS'), os.environ.get('JOURNEY_ALL_EVENTS'), os.environ.get('SMS_ALL_EVENTS'), os.environ.get('CAMPAIGN_ALL_EVENTS'), os.environ.get('CUSTOM_ALL_EVENTS'), ]) ''' #####commented out as we dont need this method to #####get called which is to update the configuration sets set_pinpoint_event_destination(os.environ.get('EXISTING_CS').split(',')) ''' cfnresponse.send(event, context, cfnresponse.SUCCESS, {"success": True}, 'SetupSampleFiles') else: logging.error('Missing CustomResourceAction - no action to perform') cfnresponse.send(event, context, cfnresponse.FAILED, {"success": False, "error": "Missing CustomResourceAction"}, "error") except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) cfnresponse.send(event, context, cfnresponse.FAILED, {"success": False, "error": "See Lambda Logs"}, "error") CustomResourceHelperRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" CustomResourceHelperPolicy: Type: AWS::IAM::Policy Properties: Roles: - !Ref CustomResourceHelperRole PolicyName: CustomResourceHelperPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" - "s3:ListBucketMultipartUploads" - "s3:ListMultipartUploadParts" - "s3:AbortMultipartUpload" - "s3:CreateBucket" - "s3:PutObject" Resource: - !Sub "arn:aws:s3:::${S3DataLogBucket}" - !Sub "arn:aws:s3:::${S3DataLogBucket}*" - Effect: "Allow" Action: - "athena:*" Resource: !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/*" - Effect: "Allow" Action: - "glue:GetDatabase" - "glue:GetDatabases" - "glue:GetTable" - "glue:GetTables" - "glue:GetPartition" - "glue:GetPartitions" - "glue:CreateTable" Resource: - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${EventAthenaDatabaseName}/*" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${EventAthenaDatabaseName}" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog" - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"