AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > "Twitter Rekognition Demo - Lambdas \n" Globals: Function: AutoPublishAlias: live Handler: index.handler MemorySize: 256 Runtime: python3.8 Timeout: 120 Tracing: Active Layers: - !Ref CoreLayer Parameters: GlueDatabaseName: Type: String Description: The Glue Database name for the app. Default: twitter-db SearchText: Type: String Description: Non-URL-encoded search text poller should use when querying Twitter Search API. Default: selfie SSMParameterPrefix: Type: String Default: 'twitter-event-source' Description: > This app assumes API keys needed to use the Twitter API are stored as SecureStrings in SSM Parameter Store under the prefix defined by this parameter. See the app README for details. PollingFrequencyInMinutes: Type: Number MinValue: 1 Default: 10 Description: Frequency in minutes to poll for more tweets. BatchSize: Type: Number MinValue: 1 Default: 15 Description: Max number of tweets to send to the TweetProcessor lambda function on each invocation. Conditions: IsPollingFrequencyInMinutesSingular: !Equals [!Ref PollingFrequencyInMinutes, 1] Resources: GlueDatabase: Type: "Custom::DbInit" Properties: ServiceToken: !GetAtt GlueDatabaseInit.Arn BucketName: !Ref Bucket DatabaseName: !Ref GlueDatabaseName GlueDatabaseInit: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/glueDatabaseInit/ Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - arn:aws:iam::aws:policy/AmazonAthenaFullAccess - S3CrudPolicy: BucketName: !Ref Bucket Bucket: Type: AWS::S3::Bucket Properties: LifecycleConfiguration: Rules: - Id: DeleteRecordsAfter2Days ExpirationInDays: 2 Status: Enabled Prefix: "data" - Id: DeleteAthenaReportsAfter2Days ExpirationInDays: 2 Status: Enabled Prefix: "twitter-ath-results" OriginAccessIdentity: Type: AWS::CloudFront::CloudFrontOriginAccessIdentity Properties: CloudFrontOriginAccessIdentityConfig: Comment: !Sub "for bucket ${Bucket}" CloudFront: Type: AWS::CloudFront::Distribution Properties: DistributionConfig: Origins: - DomainName: !Join - '' - - !Ref Bucket - !Sub .s3.${AWS::Region}.amazonaws.com OriginPath: /shared Id: !Ref Bucket S3OriginConfig: OriginAccessIdentity: !Join - '' - - origin-access-identity/cloudfront/ - !Ref OriginAccessIdentity Enabled: true DefaultRootObject: index.html DefaultCacheBehavior: AllowedMethods: - GET - HEAD - OPTIONS CachedMethods: - GET - HEAD - OPTIONS TargetOriginId: !Ref Bucket ForwardedValues: QueryString: 'false' Cookies: Forward: none ViewerProtocolPolicy: allow-all BucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref Bucket PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:GetObject Resource: - Fn::Join: - '' - - 'arn:aws:s3:::' - Ref: Bucket - /* Principal: CanonicalUser: !GetAtt OriginAccessIdentity.S3CanonicalUserId DdbImageTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "img_url" AttributeType: "S" KeySchema: - AttributeName: "img_url" KeyType: "HASH" BillingMode: PAY_PER_REQUEST TimeToLiveSpecification: AttributeName: expire_at Enabled: True HttpApi: Type: AWS::Serverless::HttpApi Properties: DefaultRouteSettings: ThrottlingBurstLimit: 200 FailOnWarnings: True CorsConfiguration: AllowHeaders: - "*" AllowMethods: - GET - POST AllowOrigins: - "*" CoreLayer: Type: AWS::Serverless::LayerVersion Properties: Description: requests awsxray emf ContentUri: ./layers/core CompatibleRuntimes: - python3.8 RetentionPolicy: Delete Metadata: BuildMethod: python3.8 PandasLayer: Type: AWS::Serverless::LayerVersion Properties: Description: pandas numpy ContentUri: ./layers/pandas CompatibleRuntimes: - python3.8 RetentionPolicy: Delete Metadata: BuildMethod: python3.8 Poller: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/poller/ Tracing: Active MemorySize: 128 Timeout: 60 Policies: - LambdaInvokePolicy: FunctionName: !Ref Parser - DynamoDBCrudPolicy: TableName: !Ref SearchCheckpoint - Statement: Effect: Allow Action: - ssm:GetParameters Resource: !Sub arn:${AWS::Partition}:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${SSMParameterPrefix}/* Environment: Variables: SSM_PARAMETER_PREFIX: !Ref SSMParameterPrefix SEARCH_TEXT: !Ref SearchText SEARCH_CHECKPOINT_TABLE_NAME: !Ref SearchCheckpoint TWEET_PROCESSOR_FUNCTION_NAME: !Ref Parser BATCH_SIZE: !Ref BatchSize Events: Timer: Type: Schedule Properties: Schedule: !If [IsPollingFrequencyInMinutesSingular, !Sub 'rate(${PollingFrequencyInMinutes} minute)', !Sub 'rate(${PollingFrequencyInMinutes} minutes)'] SearchCheckpoint: Type: AWS::Serverless::SimpleTable Parser: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/parser/ Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - S3CrudPolicy: BucketName: !Ref Bucket - StepFunctionsExecutionPolicy: StateMachineName: !GetAtt StateMachine.Name - DynamoDBCrudPolicy: TableName: !Ref DdbImageTable - SSMParameterReadPolicy: ParameterName: !Ref SSMParameterPrefix Environment: Variables: STATE_MACHINE_ARN: !Ref StateMachine DDB_IMAGE_TABLE: !Ref DdbImageTable GetStat: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/getStat/ Layers: - !Ref PandasLayer Events: ExplicitApi: # warning: creates a public endpoint Type: HttpApi Properties: ApiId: !Ref HttpApi Method: GET Path: /stat TimeoutInMillis: 29000 PayloadFormatVersion: "2.0" RouteSettings: ThrottlingBurstLimit: 100 Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - arn:aws:iam::aws:policy/CloudWatchReadOnlyAccess - SSMParameterReadPolicy: ParameterName: !Ref SSMParameterPrefix Environment: Variables: ParserLambdaName: !Ref Parser AthQueryLambdaName: !Ref AthenaQuery ProcessFacesLambdaName: !Ref ProcessFaces RekognitionLambdaName: !Ref Rekognition AthenaQuery: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/athenaQuery/ Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - S3CrudPolicy: BucketName: !Ref Bucket - arn:aws:iam::aws:policy/AmazonAthenaFullAccess - SSMParameterReadPolicy: ParameterName: !Ref SSMParameterPrefix Environment: Variables: BUCKET_NAME: !Ref Bucket DATABASE_NAME: !Ref GlueDatabaseName GetImage: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/getImage/ Events: ExplicitApi: # warning: creates a public endpoint Type: HttpApi Properties: ApiId: !Ref HttpApi Method: GET Path: /image TimeoutInMillis: 29000 PayloadFormatVersion: "2.0" RouteSettings: ThrottlingBurstLimit: 100 Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - SSMParameterReadPolicy: ParameterName: !Ref SSMParameterPrefix - LambdaInvokePolicy: FunctionName: !Ref AthenaQuery - S3CrudPolicy: BucketName: !Ref Bucket - arn:aws:iam::aws:policy/AmazonAthenaFullAccess Environment: Variables: BUCKET_NAME: !Ref Bucket AthQueryLambdaName: !Ref AthenaQuery DelImage: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/delImage/ Events: ExplicitApi: # warning: creates a public endpoint Type: HttpApi Properties: ApiId: !Ref HttpApi Method: POST Path: /delimage TimeoutInMillis: 29000 PayloadFormatVersion: "2.0" RouteSettings: ThrottlingBurstLimit: 100 Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - S3CrudPolicy: BucketName: !Ref Bucket Environment: Variables: BUCKET_NAME: !Ref Bucket FIREHOSE: !Ref TwitterDeliveryStream Rekognition: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/rekognition/ Environment: Variables: BUCKET_NAME: !Ref Bucket Policies: - S3CrudPolicy: BucketName: !Ref Bucket - SSMParameterReadPolicy: ParameterName: !Ref SSMParameterPrefix - arn:aws:iam::aws:policy/AmazonRekognitionFullAccess - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess ProcessFaces: Type: AWS::Serverless::Function Properties: CodeUri: ./lambdas/processFaces/ Environment: Variables: BUCKET_NAME: !Ref Bucket TwitterDeliveryStream: !Ref TwitterDeliveryStream Policies: - arn:aws:iam::aws:policy/AWSXrayWriteOnlyAccess - SSMParameterReadPolicy: ParameterName: !Ref SSMParameterPrefix - FirehoseCrudPolicy: DeliveryStreamName: "*" - ComprehendBasicAccessPolicy: {} - S3CrudPolicy: BucketName: !Ref Bucket StateMachine: Type: AWS::StepFunctions::StateMachine Properties: DefinitionString: !Sub - |- { "Comment": "Twitter selfie state machine", "StartAt": "Rekognition", "States": { "Rekognition": { "Type": "Task", "Resource": "${RunRekognitionArn}", "Next": "RekErrorHandler" }, "RekErrorHandler": { "Type" : "Choice", "Choices": [ { "Variable": "$.result", "StringEquals": "Succeed", "Next": "ProcessFaces" }, { "Variable": "$.result", "StringEquals": "Moderated", "Next": "ModerateState" }, { "Variable": "$.result", "StringEquals": "Fail", "Next": "FailState" } ], "Default": "FailState" }, "ProcessFaces": { "Type": "Task", "Resource": "${ProcessFacesArn}", "Next": "FaceErrorHandler" }, "FaceErrorHandler": { "Type" : "Choice", "Choices": [ { "Variable": "$.result", "StringEquals": "Succeed", "Next": "SucceedState" }, { "Variable": "$.result", "StringEquals": "Moderated", "Next": "SucceedState" }, { "Variable": "$.result", "StringEquals": "Fail", "Next": "FailState" } ], "Default": "FailState" }, "FailState": { "Type": "Fail", "Error": "DefaultStateError", "Cause": "No Matches!" }, "ModerateState": { "Type": "Succeed" }, "SucceedState": { "Type": "Succeed" } } } - ProcessFacesArn: !GetAtt ProcessFaces.Arn RunRekognitionArn: !GetAtt Rekognition.Arn RoleArn: !GetAtt [ StatesExecutionRole, Arn ] StatesExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - !Sub states.${AWS::Region}.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "lambda:InvokeFunction" Resource: "*" TwitterDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream DependsOn: - DeliveryStreamPolicy - GlueDatabase Properties: ExtendedS3DestinationConfiguration: Prefix: data/parquet-!{timestamp:yyyy}/ ErrorOutputPrefix: FirehoseFailures/!{firehose:error-output-type}/!{firehose:random-string}/ BucketARN: !Sub arn:aws:s3:::${Bucket} RoleARN: !GetAtt DeliveryStreamRole.Arn DataFormatConversionConfiguration: Enabled: true InputFormatConfiguration: Deserializer: OpenXJsonSerDe: {} OutputFormatConfiguration: Serializer: ParquetSerDe: {} SchemaConfiguration: Region: !Sub ${AWS::Region} RoleARN: !GetAtt DeliveryStreamRole.Arn DatabaseName: !Ref GlueDatabaseName TableName: json_records DeliveryStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' DeliveryStreamPolicy: Type: AWS::IAM::Policy Properties: Roles: - !Ref DeliveryStreamRole PolicyName: firehose_delivery_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Sub 'arn:aws:s3:::${Bucket}' - !Join - '' - - 'arn:aws:s3:::' - !Ref Bucket - '/*' - Effect: Allow Action: - 'glue:Start*' - 'glue:Stop*' - 'glue:Create*' - 'glue:Get*' - 'glue:List*' - 'glue:Search*' Resource: '*' Outputs: S3Bucket: Value: !Ref Bucket DdbImageTable: Value: !Ref DdbImageTable HttpApiUrl: Description: URL of your API endpoint Value: Fn::Sub: 'https://${HttpApi}.execute-api.${AWS::Region}.${AWS::URLSuffix}/' CloudFrontUrl: Description: CloudFront Url Value: !GetAtt CloudFront.DomainName HttpApiId: Description: Api id of HttpApi Value: Ref: HttpApi