AWSTemplateFormatVersion: 2010-09-09 Description: Amazon Pinpoint Phone Number Validate Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID to perform validation against TempExportS3Bucket: Type: String Description: Name of the existing Amazon S3 Bucket where Pinpoint export files can be placed for validation. TempExportS3Prefix: Type: String Description: Prefix of the Amazon S3 Bucket where new export files will be placed. AssumeUS: Type: String Default: True AllowedValues: - True - False Description: Enter TRUE if you want to assume US (+1) phone number for any phone 10 digits long or FALSE if you want to import as-is. Default is TRUE. Resources: PhoneNotValidatedSegment: Type: 'AWS::Pinpoint::Segment' Properties: ApplicationId: !Ref PinpointProjectId SegmentGroups: Include: ALL Groups: - Type: NONE Dimensions: - Attributes: PNV_PhoneNumberValidated: AttributeType: "INCLUSIVE" Values: - "Yes" SourceType: ALL - Type: ANY Dimensions: - Demographic: Channel: DimensionType: INCLUSIVE Values: - "SMS" - "VOICE" Name: Needs Phone Number Validated ExportPinpointEndpointsLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ExportPinpointEndpointsLambdaRole.Arn Runtime: python3.7 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" PINPOINT_PROJECT_ID: !Ref PinpointProjectId ROLE_ARN: !GetAtt PinpointExportRole.Arn EXPORT_SEGMENT_ID: !GetAtt PhoneNotValidatedSegment.SegmentId S3_BUCKET: !Ref TempExportS3Bucket S3_PREFIX: !Ref TempExportS3Prefix Code: ZipFile: | import boto3 import os import random import string import logging client = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) random_prefix = ''.join(random.choice(string.ascii_lowercase) for i in range(10)) full_prefix = os.environ.get('S3_PREFIX') + '/' + random_prefix s3_url = 's3://' + os.environ.get('S3_BUCKET') + '/' + full_prefix response = client.create_export_job( ApplicationId=os.environ.get('PINPOINT_PROJECT_ID'), ExportJobRequest={ 'RoleArn': os.environ.get('ROLE_ARN'), 'S3UrlPrefix': s3_url, 'SegmentId': os.environ.get('EXPORT_SEGMENT_ID') } ) logging.info(response) return { 'ExportJobId': response['ExportJobResponse']['Id'], 'S3_FULL_PREFIX': full_prefix } ExportStatusLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ExportStatusLambdaRole.Arn Runtime: python3.7 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" PINPOINT_PROJECT_ID: !Ref PinpointProjectId Code: ZipFile: | import boto3 import os import logging client = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = client.get_export_job( ApplicationId=os.environ.get('PINPOINT_PROJECT_ID'), JobId=event['ExportJobId'] ) logging.info(response) return { 'ExportJobStatus': response['ExportJobResponse']['JobStatus'], 'ExportJobId': response['ExportJobResponse']['Id'], 'S3URL': response['ExportJobResponse']['Definition']['S3UrlPrefix'], 'S3_FULL_PREFIX': event['S3_FULL_PREFIX'] } ValidatePhoneLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ValidatePhoneLambdaRole.Arn Runtime: python3.7 Timeout: 300 Environment: Variables: LOG_LEVEL: "INFO" PINPOINT_PROJECT_ID: !Ref PinpointProjectId ASSUME_US: !Ref AssumeUS S3_BUCKET: !Ref TempExportS3Bucket Code: ZipFile: | import boto3 import os import json import codecs import logging import gzip from io import BytesIO s3r = boto3.resource('s3') s3c = boto3.client('s3') client = boto3.client('pinpoint') assume_us = os.environ["ASSUME_US"].lower() == "true" def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = s3c.list_objects_v2( Bucket=os.environ.get('S3_BUCKET'), Prefix=event['S3_FULL_PREFIX'] ) logging.info(response) validate_count = 0 for content in response['Contents']: s3_key = content['Key'] if not s3_key.endswith('.gz'): continue logging.info(s3_key) # Upload new version extract from gz s3c.upload_fileobj( Fileobj=gzip.GzipFile( None, 'rb', fileobj=BytesIO(s3c.get_object(Bucket=os.environ.get('S3_BUCKET'), Key=s3_key)['Body'].read())), Bucket=os.environ.get('S3_BUCKET'), Key=s3_key + ".json") # Read in json version iterator = s3r.Object(os.environ.get('S3_BUCKET'), s3_key + ".json").get()['Body'].iter_lines() for line in iterator: endpoint = json.loads(line) logging.info('Got Endpoint') logging.info(endpoint) response = client.phone_number_validate( NumberValidateRequest={ 'PhoneNumber': get_number(endpoint['Address']) } ) pnv_response = response['NumberValidateResponse'] update_response = client.update_endpoint( ApplicationId=os.environ.get('PINPOINT_PROJECT_ID'), EndpointId=endpoint['Id'], EndpointRequest={ 'Address': pnv_response['CleansedPhoneNumberE164'], 'Attributes': { 'PNV_PhoneNumberValidated': ['Yes'], 'PNV_OriginalPhoneNumber': [endpoint['Address']], 'PNV_Carrier': [pnv_response['Carrier'] if 'Carrier' in pnv_response else 'Unknown'], 'PNV_City': [pnv_response['City'] if 'City' in pnv_response else 'Unknown'], 'PNV_Country': [pnv_response['Country'] if 'Country' in pnv_response else 'Unknown'], 'PNV_CountryCodeIso2': [pnv_response['CountryCodeIso2'] if 'CountryCodeIso2' in pnv_response else 'Unknown'], 'PNV_PhoneType': [pnv_response['PhoneType'] if 'PhoneType' in pnv_response else 'Unknown'], 'PNV_Timezone': [pnv_response['Timezone'] if 'Timezone' in pnv_response else 'Unknown'], 'PNV_ZipCode': [pnv_response['ZipCode'] if 'ZipCode' in pnv_response else 'Unknown'] } } ) logging.info(update_response) validate_count = validate_count + 1 return { 'TotalPhoneNumbersValidated' : validate_count } def get_number(address): if assume_us and get_numeric(address) == 10: return "+1"+address return address def get_numeric(str): import re temp = re.findall(r'\d+', str) res = ''.join(temp) return len(res) PinpointValidateNotificationTopic: Type: AWS::SNS::Topic Properties: DisplayName: 'PinpointPhoneValidateNotifications' KmsMasterKeyId: alias/aws/sns PhoneValidateStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt StateMachineRole.Arn DefinitionString: !Sub - |- { "StartAt": "ExportPinpointEndpoints", "States": { "ExportPinpointEndpoints": { "Type": "Task", "Resource": "${ExportPinpointEndpointsArn}", "Next": "ExportWait" }, "ExportWait": { "Type": "Wait", "Seconds": 30, "Next": "ExportStatus" }, "ExportStatus": { "Type": "Task", "Resource": "${ExportStatusArn}", "Next": "IsExportFinished" }, "IsExportFinished": { "Type": "Choice", "Default": "ExportWait", "Choices": [ { "Variable": "$.ExportJobStatus", "StringEquals": "FAILED", "Next": "PhoneValidateFailed" }, { "Variable": "$.ExportJobStatus", "StringEquals": "COMPLETED", "Next": "ValidatePhoneLambda" } ] }, "ValidatePhoneLambda": { "Type": "Task", "Resource": "${ValidatePhoneLambdaArn}", "Next": "PhoneValidateSuccess" }, "PhoneValidateSuccess": { "Type": "Pass", "Parameters": { "ValidateOutput": { "TotalPhoneNumbersValidated.$": "$.TotalPhoneNumbersValidated" }, "ValidateInput.$": "$$.Execution.Input" }, "Next": "EmitSuccess" }, "EmitSuccess": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "ResultPath": null, "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Phone Validate Successful", "ValidatePhoneResult.$": "$" }, "Subject": "Amazon Pinpoint Phone Validate Successful", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "success" } } }, "End": true }, "PhoneValidateFailed": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Phone Validate Failed", "ValidatePhoneResult.$": "$" }, "Subject": "Amazon Pinpoint Phone Validate Failed", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "End": true } } } - {ExportPinpointEndpointsArn: !GetAtt ExportPinpointEndpointsLambda.Arn, ExportStatusArn: !GetAtt ExportStatusLambda.Arn, ValidatePhoneLambdaArn: !GetAtt ValidatePhoneLambda.Arn, SNSTopicArn: !Ref PinpointValidateNotificationTopic} ExportPinpointEndpointsLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: "iam:PassRole" Resource: - !GetAtt PinpointExportRole.Arn - Effect: "Allow" Action: - "mobiletargeting:CreateExportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}/jobs/export" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" PinpointExportRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pinpoint.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:PutObject" - "s3:GetObjectAcl" - "s3:GetObject" - "s3:DeleteObjectVersion" - "s3:GetObjectTagging" - "s3:DeleteObject" - "s3:GetObjectVersion" Resource: - !Sub "arn:aws:s3:::${TempExportS3Bucket}*" - !Sub "arn:aws:s3:::${TempExportS3Bucket}" - !Sub "arn:aws:s3:::${TempExportS3Bucket}/" - !Sub "arn:aws:s3:::${TempExportS3Bucket}/*" - Effect: "Allow" Action: - "s3:ListAllMyBuckets" - "s3:GetBucketLocation" Resource: - !Sub "arn:aws:s3:::*" ExportStatusLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:GetExportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}/jobs/export/*" ValidatePhoneLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:PutObject" - "s3:GetObjectAcl" - "s3:GetObject" - "s3:DeleteObjectVersion" - "s3:GetObjectTagging" - "s3:DeleteObject" - "s3:GetObjectVersion" - "s3:ListBucket" Resource: - !Sub "arn:aws:s3:::${TempExportS3Bucket}*" - !Sub "arn:aws:s3:::${TempExportS3Bucket}" - !Sub "arn:aws:s3:::${TempExportS3Bucket}/" - !Sub "arn:aws:s3:::${TempExportS3Bucket}/*" - Effect: "Allow" Action: - "s3:ListAllMyBuckets" - "s3:GetBucketLocation" Resource: - !Sub "arn:aws:s3:::*" - Effect: "Allow" Action: - "mobiletargeting:PhoneNumberValidate" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:phone/number/validate" - Effect: "Allow" Action: - "mobiletargeting:UpdateEndpoint" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}*" StateMachineRole: Type: AWS::IAM::Role Metadata: cfn_nag: rules_to_suppress: - id: W76 reason: Complex Role that is used for StateMachine to invoke many other lambdas Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "states.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: - !GetAtt ExportPinpointEndpointsLambda.Arn - !GetAtt ExportStatusLambda.Arn - !GetAtt ValidatePhoneLambda.Arn - Effect: "Allow" Action: sns:Publish Resource: !Ref PinpointValidateNotificationTopic Outputs: PhoneValidateStateMachineArn: Description: The Phone Number Validate State Machine ARN Value: !Ref PhoneValidateStateMachine PhoneValidateNotificationTopicArn: Description: SNS Topic used to provide updates to status Value: !Ref PinpointValidateNotificationTopic