AWSTemplateFormatVersion: '2010-09-09' Metadata: License: Apache-2.0 Description: 'CloudFormation template for data lake and machine learning for game log at Gaming on AWS 2018.' Parameters: KeyName: Description: Name of an existing EC2 KeyPair to enable SSH access to the instance Type: AWS::EC2::KeyPair::KeyName ConstraintDescription: must be the name of an existing EC2 KeyPair. Resources: EC2Instance: Type: AWS::EC2::Instance Properties: InstanceType: m5.large SecurityGroups: [!Ref 'EC2SecurityGroup'] IamInstanceProfile: !Ref 'EC2Profile' KeyName: !Ref 'KeyName' ImageId: ami-0c4d4c97d0d1eb1f5 Tags: - Key: Name Value: PlayLogGenerator EC2SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: SSH-SecurityGroup GroupDescription: Enable SSH access via port 22 SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: Tags: - Key: Name Value: SSH-SecurityGroup EC2Profile: Type: AWS::IAM::InstanceProfile Properties: Path: "/" Roles: [!Ref 'EC2Role'] InstanceProfileName: PlayLogGeneratorProfile DDBScalingRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: - "" Action: - "sts:AssumeRole" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Action: - "dynamodb:DescribeTable" - "dynamodb:UpdateTable" - "cloudwatch:PutMetricAlarm" - "cloudwatch:DescribeAlarms" - "cloudwatch:GetMetricStatistics" - "cloudwatch:SetAlarmState" - "cloudwatch:DeleteAlarms" Effect: Allow Resource: "*" RoleName: DDBScalingRole WriteCapacityTarget: Type: AWS::ApplicationAutoScaling::ScalableTarget Properties: MaxCapacity: 1000 MinCapacity: 50 ResourceId: !Join - / - - table - !Ref DDBTable ScalableDimension: "dynamodb:table:WriteCapacityUnits" RoleARN: !GetAtt DDBScalingRole.Arn ServiceNamespace: dynamodb WriteScalingPolicy: Type: AWS::ApplicationAutoScaling::ScalingPolicy Properties: PolicyName: WriteAutoScalingPolicy PolicyType: TargetTrackingScaling ScalingTargetId: !Ref 'WriteCapacityTarget' TargetTrackingScalingPolicyConfiguration: TargetValue: 50.0 ScaleInCooldown: 60 ScaleOutCooldown: 60 PredefinedMetricSpecification: PredefinedMetricType: DynamoDBWriteCapacityUtilization DDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "pidx" AttributeType: "N" KeySchema: - AttributeName: "pidx" KeyType: "HASH" ProvisionedThroughput: ReadCapacityUnits: 50 WriteCapacityUnits: 50 TableName: UserProfile LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: - "" Action: - "sts:AssumeRole" Policies: - PolicyName: "LambdaBasicRole" PolicyDocument: Version: "2012-10-17" Statement: - Action: - "logs:*" - "dynamodb:*" - "firehose:*" - "s3:*" Effect: Allow Resource: "*" RoleName: LambdaDDBRole EC2Role: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: - "" Action: - "sts:AssumeRole" Policies: - PolicyName: "EC2ServiceRole" PolicyDocument: Version: "2012-10-17" Statement: - Action: - "firehose:*" - "logs:*" - "sns:*" - "cloudwatch:*" - "s3:*" - "dynamodb:*" Effect: Allow Resource: "*" - Effect: Allow Action: "iam:CreateServiceLinkedRole" Resource: "arn:aws:iam::*:role/aws-service-role/*" Condition: StringLike: iam:AWSServiceName: "" RoleName: EC2GeneratorRole GlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: - "" Action: - "sts:AssumeRole" Policies: - PolicyName: "GlueServiceRole" PolicyDocument: Version: "2012-10-17" Statement: - Action: - "s3:*" - "ec2:*" - "iam:*" - "glue:*" - "logs:*" - "dynamodb:*" - "cloudwatch:*" Effect: Allow Resource: "*" RoleName: GlueETLRole UserLogLamba: Type: AWS::Lambda::Function Properties: Code: ZipFile: !Sub | from __future__ import print_function import boto3 import json client = boto3.client('firehose') def lambda_handler(event, context): for record in event['Records']: if(record['eventName'] == 'MODIFY' or record['eventName'] == 'INSERT'): # Read data from DynamoDB Streams pidx = record['dynamodb']['NewImage']['pidx']['N'] uclass = record['dynamodb']['NewImage']['uclass']['S'] ulevel = record['dynamodb']['NewImage']['ulevel']['N'] utimestamp = record['dynamodb']['NewImage']['utimestamp']['S'] # Dict to store data raw_data = {} raw_data['pidx'] = int(pidx) raw_data['uclass'] = uclass raw_data['ulevel'] = int(ulevel) raw_data['utimestamp'] = utimestamp # Convert to JSON and Put to Kinesis Firehose data = json.dumps(raw_data) + '\n' response = client.put_record( DeliveryStreamName = 'stream-userprofile', Record = { 'Data' : data } ) print(data + ' has been ingested to Firehose.') FunctionName: StreamUserLog Handler: index.lambda_handler Runtime: python2.7 Timeout: 60 Role: !GetAtt [LambdaRole, "Arn"] DDBInitLambda: Type: AWS::Lambda::Function DependsOn: DDBTable Properties: Code: ZipFile: !Sub | import json import csv import random import boto3 from botocore.vendored import requests def lambda_handler(event, context): try: # Set user pidx from S3 s3 = boto3.resource('s3') bucket = s3.Bucket('gamingonaws2018') obj = bucket.Object(key = 'userList.csv') response = obj.get() users = response['Body'].read().split() # Set DynamoDB dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('UserProfile') uclass = ['warrior', 'mage', 'healer'] if event['RequestType'] == 'Delete': print 'Send response to CFN.' send_response(event, context, "SUCCESS", {"Message": "CFN deleted!"}) else: for user in users: response = table.put_item( Item = { 'pidx': int(user), 'ulevel': 1, 'uclass': random.choice(uclass), 'utimestamp': '2000-01-01 00:00:00.000000' } ) print 'Send response to CFN.' send_response(event, context, "SUCCESS", {"Message": "CFN created!"}) print 'End of Lambda function.' except: send_response(event, context, "FAILED", {"Message": "Lambda failed!"}) def send_response(event, context, response_status, response_data): response_body = json.dumps({ "Status": response_status, "Reason": "See the details in CloudWatch Log Stream: " + context.log_stream_name, "PhysicalResourceId": context.log_stream_name, "StackId": event['StackId'], "RequestId": event['RequestId'], "LogicalResourceId": event['LogicalResourceId'], "Data": response_data }) headers = { "Content-Type": "", "Content-Length": str(len(response_body)) } response = requests.put(event["ResponseURL"], headers = headers, data = response_body) FunctionName: DDBInitialize Handler: index.lambda_handler Runtime: python2.7 Timeout: 900 Role: !GetAtt [LambdaRole, "Arn"] DDBInitLambdaInvoke: Type: Custom::DDBInitLambdaInvoke Properties: ServiceToken: !GetAtt DDBInitLambda.Arn Outputs: PublicDNS: Description: Public DNSName of the newly created EC2 instance Value: !GetAtt [EC2Instance, PublicDnsName] PublicIP: Description: Public IP address of the newly created EC2 instance Value: !GetAtt [EC2Instance, PublicIp]