AWSTemplateFormatVersion: 2010-09-09 Description: Sending SMS Messages with Amazon Pinpoint from a File Drop in Amazon S3 Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID AllowedPattern: ^[a-zA-Z0-9]*$ CreateS3BucketName: Type: String Description: Name of Amazon S3 Bucket to Create S3BucketFileDropPrefix: Type: String Description: "Prefix (folder) in Amazon S3 where files will be dropped. Ex: files/" Mappings: LambdaRuntime: Language: Python: python3.7 Resources: #### Amazon S3 Section ####################################### # Create the Amazon S3 bucket with Lambda Trigger S3FileDrop: Type: AWS::S3::Bucket DeletionPolicy: Retain Metadata: cfn_nag: rules_to_suppress: - id: W51 reason: Not public facing. Properties: BucketName: !Ref CreateS3BucketName PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled LoggingConfiguration: DestinationBucketName: !Ref LogBucket LogFilePrefix: pinpoint-event-processing/ NotificationConfiguration: LambdaConfigurations: - Event: "s3:ObjectCreated:*" Filter: S3Key: Rules: - Name: "prefix" Value: !Ref S3BucketFileDropPrefix Function: !GetAtt S3TriggeredSMSLambda.Arn # Restrict Bucket Traffic to SSL Only S3FileDropPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref S3FileDrop PolicyDocument: Version: 2012-10-17 Statement: - Sid: AllowSSLRequestsOnly Effect: Deny Principal: "*" Action: "s3:*" Resource: - !Sub "arn:aws:s3:::${CreateS3BucketName}/*" - !Sub "arn:aws:s3:::${CreateS3BucketName}" Condition: Bool: "aws:SecureTransport": "false" # Provide Permissions for S3 to Invoke Lambda Function S3InvokeLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref S3TriggeredSMSLambda Principal: s3.amazonaws.com SourceArn: !Sub arn:aws:s3:::${CreateS3BucketName} SourceAccount: !Ref "AWS::AccountId" # Best Practice - S3 Bucket to log access to File Drop S3 Bucket LogBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Metadata: cfn_nag: rules_to_suppress: - id: W35 reason: This is the log bucket. Properties: AccessControl: LogDeliveryWrite PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled # S3 Policy for the Log Bucket LogBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref LogBucket PolicyDocument: Version: 2012-10-17 Statement: - Sid: AWSCloudTrailAclCheck Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: "s3:GetBucketAcl" Resource: !Sub arn:aws:s3:::${LogBucket} - Sid: AWSCloudTrailWrite Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: "s3:PutObject" Resource: !Sub arn:aws:s3:::${LogBucket}/AWSLogs/${AWS::AccountId}/* Condition: StringEquals: "s3:x-amz-acl": "bucket-owner-full-control" - Sid: LogBucketAllowSSLRequestsOnly Effect: Deny Principal: "*" Action: "s3:*" Resource: - !Sub "arn:aws:s3:::${LogBucket}/*" - !Sub "arn:aws:s3:::${LogBucket}" Condition: Bool: "aws:SecureTransport": "false" #### AWS Lambda Section ####################################### S3TriggeredSMSLambda: Type: AWS::Lambda::Function Properties: Role: !GetAtt S3TriggeredSMSLambdaRole.Arn Timeout: 600 MemorySize: 2048 Environment: Variables: LOG_LEVEL: "DEBUG" SQS_QUEUE_URL: !Ref FIFOSQSQueue Handler: index.lambda_handler Runtime: !FindInMap ["LambdaRuntime", "Language", "Python"] Code: ZipFile: | import json import boto3 import logging import traceback import os from io import StringIO import csv s3 = boto3.client('s3') sqs = boto3.client('sqs') queue_url = os.environ['SQS_QUEUE_URL'] def lambda_handler(event, context): log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'DEBUG' logging.getLogger().setLevel(log_level) logging.debug(event) try: for record in event['Records']: obj = s3.get_object( Bucket=record['s3']['bucket']['name'], Key=record['s3']['object']['key'] ) # Assume Header Row - disregard it obj['Body']._raw_stream.readline() # For each line of the CSV, put message into a Queue for line in obj['Body']._raw_stream: line = line.decode('utf-8') logging.debug('Reading Line: ' + line) # TODO - ensure these match with the columns - this needs productionalizing sms_number, message, external_campaign_id, short_code, message_type, id_1, id_2 = list(next(csv.reader(StringIO(line)))) msg = { 'sms_number': sms_number, 'message': message, 'external_campaign_id': external_campaign_id, 'short_code': short_code, 'message_type': message_type, 'id_1': id_1, 'id_2': id_2 } # Send Message to SQS FIFO, using MessageGroupId allows to control concurrency response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(msg), MessageDeduplicationId="%s-%s" % (sms_number, external_campaign_id), MessageGroupId=short_code ) logging.debug(response) result = { 'statusCode': '200', 'body': {'message': 'success'} } return json.dumps(result) except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'statusCode': '500', 'body': {'message': 'error'} } return json.dumps(result) S3TriggeredSMSLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:GetObject" - "s3:PutObject" Resource: - !Sub "arn:aws:s3:::${CreateS3BucketName}/*" - Effect: Allow Action: sqs:SendMessage Resource: !GetAtt FIFOSQSQueue.Arn SQSTriggeredSMSLambda: Type: AWS::Lambda::Function Properties: Role: !GetAtt SQSTriggeredSMSLambdaRole.Arn Timeout: 60 MemorySize: 256 Environment: Variables: LOG_LEVEL: "DEBUG" PINPOINT_PROJECT_ID: !Ref PinpointProjectId Handler: index.lambda_handler Runtime: !FindInMap ["LambdaRuntime", "Language", "Python"] Code: ZipFile: | import json import boto3 import logging import traceback import os pinpoint = boto3.client('pinpoint') def lambda_handler(event, context): log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'DEBUG' logging.getLogger().setLevel(log_level) logging.debug(event) # Batch collection for our bulk send_messages call addresses = {} message_type = None short_code = None # loop over records and create our batch addresses for record in event['Records']: try: payload=record["body"] msg = json.loads(payload) short_code = msg['short_code'] if short_code is None else short_code message_type = msg['message_type'] if message_type is None else message_type addresses[msg['sms_number']] = { 'BodyOverride': msg['message'], 'ChannelType': 'SMS', 'Context': { 'external_campaign_id': msg['external_campaign_id'], 'id_1': msg['id_1'], 'id_2': msg['id_2'] } } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'statusCode': '500', 'body': {'message': 'error'} } return json.dumps(result) try: # Send the bulk SMS request response = pinpoint.send_messages( ApplicationId=os.environ['PINPOINT_PROJECT_ID'], MessageRequest={ 'Addresses': addresses, 'MessageConfiguration': { 'SMSMessage': { 'MessageType': message_type, 'OriginationNumber': short_code, } } } ) # TODO -review the response object to see success failures of each message logging.debug(response) except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'statusCode': '500', 'body': {'message': 'error'} } return json.dumps(result) result = { 'statusCode': '200', 'body': {'message': 'success'} } return json.dumps(result) SQSLambdaEventMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 10 Enabled: True EventSourceArn: !GetAtt FIFOSQSQueue.Arn FunctionName: !Ref SQSTriggeredSMSLambda SQSTriggeredSMSLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: Allow Action: mobiletargeting:SendMessages Resource: !Sub arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}/messages - Effect: Allow Action: - sqs:ReceiveMessage - sqs:DeleteMessage - sqs:GetQueueAttributes Resource: !GetAtt FIFOSQSQueue.Arn # FIFO Queue FIFOSQSQueue: Type: AWS::SQS::Queue Properties: VisibilityTimeout: 100 FifoQueue: true KmsMasterKeyId: alias/aws/sqs