AWSTemplateFormatVersion: '2010-09-09' Transform: 'AWS::Serverless-2016-10-31' Description: Template to create BikeNow analytics demo Parameters: GlueDatabaseName: Description: Name of AWS Glue database Type: String StationStatusStreamArn: Description: ARN of DynamoDB stream for bike station status Type: String StationDetailStreamArn: Description: ARN of DynamoDB stream for bike station detail Type: String StationReviewStreamArn: Description: ARN of DynamoDB stream for user reviews Type: String RedshiftVPC: Description: VPC to launch the Redshift cluster Type: AWS::EC2::VPC::Id RedshiftSubnets: Description: Subnets for Redshift subnet groups Type: List RedshiftAZ: Description: AZ for Redshift Glue connection Type: String RedshiftDbName: Description: Name of Redshift database Type: String ArtifactsBucket: Description: Name of S3 bucket containing artifacts Type: String Mappings: QuicksightMap: us-east-1: CidrIp: us-east-2: CidrIp: us-west-2: CidrIp: eu-central-1: CidrIp: eu-west-1: CidrIp: eu-west-2: CidrIp: Resources: # -------------------------------- KINESIS ROLES RoleFirehoseDataLake: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: sts:AssumeRole Condition: StringEquals: sts:ExternalId: Ref: AWS::AccountId Policies: - PolicyName: RoleFirehoseDataLakePolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !GetAtt S3BucketDataLake.Arn - Fn::Join: - "/" - - !GetAtt S3BucketDataLake.Arn - "*" # -------------------------------- KINESIS STREAMS FirehoseStationStatus: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: DirectPut S3DestinationConfiguration: BucketARN: !GetAtt S3BucketDataLake.Arn RoleARN: !GetAtt RoleFirehoseDataLake.Arn Prefix: raw/station_status_history/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ ErrorOutputPrefix: error/station_status_history/ CompressionFormat: GZIP BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 FirehoseStationDetail: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: DirectPut S3DestinationConfiguration: BucketARN: !GetAtt S3BucketDataLake.Arn RoleARN: !GetAtt RoleFirehoseDataLake.Arn Prefix: raw/station_detail_history/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ ErrorOutputPrefix: error/station_detail_history/ CompressionFormat: GZIP BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 FirehoseReviewDetail: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: DirectPut S3DestinationConfiguration: BucketARN: !GetAtt S3BucketDataLake.Arn RoleARN: !GetAtt RoleFirehoseDataLake.Arn Prefix: raw/review_history/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ ErrorOutputPrefix: error/review_history/ CompressionFormat: GZIP BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 # -------------------------------- LAMBDA ROLES RoleStatusDataLakeLambda: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - Action: sts:AssumeRole Policies: - PolicyName: StationDataLakeLambdaPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: - arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: - !Ref StationStatusStreamArn - !Ref StationDetailStreamArn - !Ref StationReviewStreamArn - Effect: Allow Action: - firehose:DeleteDeliveryStream - firehose:PutRecord - firehose:PutRecordBatch - firehose:UpdateDestination Resource: - !GetAtt FirehoseStationStatus.Arn - !GetAtt FirehoseStationDetail.Arn - !GetAtt FirehoseReviewDetail.Arn - Effect: Allow Action: - comprehend:DetectEntities - comprehend:DetectKeyPhrases - comprehend:DetectDominantLanguage - comprehend:DetectSentiment - comprehend:DetectSyntax Resource: - '*' RoleSetupRedshiftDdlLambda: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole Policies: - PolicyName: RoleSetupRedshiftDdlLambdaPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: - arn:aws:logs:*:*:* - Effect: Allow Action: - secretsmanager:DescribeSecret - secretsmanager:GetSecretValue Resource: !Ref SecretRedshiftMaster - Effect: Allow Action: - redshift:DescribeClusters Resource: - '*' # -------------------------------- LAMBDA FUNCTIONS LambdaStatusDataLake: Type: AWS::Serverless::Function Properties: Handler: index.lambda_handler Runtime: python3.8 CodeUri: ../lambdas/stream_station_status_to_s3 Role: !GetAtt RoleStatusDataLakeLambda.Arn Description: Publish latest bike station status to Firehose stream MemorySize: 256 Timeout: 900 Environment: Variables: FIREHOSE_STREAM_NAME: !Ref FirehoseStationStatus Events: DynamoDbEvent: Type: DynamoDB Properties: StartingPosition: LATEST Stream: !Ref StationStatusStreamArn LambdaDetailDataLake: Type: AWS::Serverless::Function Properties: Handler: index.lambda_handler Runtime: python3.8 CodeUri: ../lambdas/stream_station_status_to_s3 Role: !GetAtt RoleStatusDataLakeLambda.Arn Description: Publish latest bike station detail to Firehose stream MemorySize: 256 Timeout: 900 Environment: Variables: FIREHOSE_STREAM_NAME: !Ref FirehoseStationDetail Events: DynamoDbEvent: Type: DynamoDB Properties: StartingPosition: LATEST Stream: !Ref StationDetailStreamArn LambdaReviewDataLake: Type: AWS::Serverless::Function Properties: Handler: index.lambda_handler Runtime: python3.8 CodeUri: ../lambdas/stream_station_review_to_s3 Role: !GetAtt RoleStatusDataLakeLambda.Arn Description: Publish latest user reviews to Firehose stream MemorySize: 256 Timeout: 900 Environment: Variables: FIREHOSE_STREAM_NAME: !Ref FirehoseReviewDetail Events: DynamoDbEvent: Type: DynamoDB Properties: BatchSize: 10 StartingPosition: LATEST Stream: !Ref StationReviewStreamArn LambdaSetupRedshiftDdl: Type: AWS::Serverless::Function Properties: Handler: index.lambda_handler Runtime: python3.7 CodeUri: ../lambdas/setup_datawarehouse_ddl Role: !GetAtt RoleSetupRedshiftDdlLambda.Arn Description: Deploy DDL scripts to Redshift MemorySize: 128 Timeout: 900 VpcConfig: SecurityGroupIds: - !Ref SGroupRedshift SubnetIds: !Ref RedshiftSubnets Environment: Variables: DB_CREDS: !Ref SecretRedshiftMaster GLUE_DB: !Ref GlueDbDataLake IAM_ROLE_ARN: !GetAtt RoleRedshift.Arn REDSHIFT_NAME: !Ref RedshiftBikeDemo # -------------------------------- DATA LAKE STORAGE S3BucketDataLake: Type: AWS::S3::Bucket Properties: AccessControl: Private LifecycleConfiguration: Rules: - Id: Expire all after 180 days. Status: Enabled ExpirationInDays: 180 - Id: Expire station detail after 3 days. Status: Enabled ExpirationInDays: 3 Prefix: raw/station_detail_history/ DeletionPolicy: Retain # -------------------------------- AWS GLUE ROLES RoleGlueDbCrawler: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: RoleGlueDbCrawlerPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: - arn:aws:logs:*:*:* - Effect: Allow Action: - s3:GetObject - s3:ListBucket Resource: - !GetAtt S3BucketDataLake.Arn - Fn::Join: - "/" - - !GetAtt S3BucketDataLake.Arn - "*" # -------------------------------- AWS GLUE DATABASE GlueDbDataLake: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: !Ref GlueDatabaseName Description: AWS Glue database for bike station and trip data GlueTableStationHistory: Type: AWS::Glue::Table Properties: DatabaseName: !Ref GlueDbDataLake CatalogId: !Ref AWS::AccountId TableInput: Name: station_status_history Description: Historical data of bike station status TableType: EXTERNAL_TABLE Parameters: classification: json typeOfData: file compressionType: gzip EXTERNAL: true PartitionKeys: - Type: string Name: year - Type: string Name: month - Type: string Name: day - Type: string Name: hour StorageDescriptor: InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: Location: !Sub 's3://${S3BucketDataLake}/raw/station_status_history/' StoredAsSubDirectories: true Compressed: true SerdeInfo: SerializationLibrary: Parameters: paths: is_installed,is_renting,is_returning,last_reported,num_bikes_available,station_id Columns: - Type: boolean Name: is_installed - Type: boolean Name: is_returning - Type: boolean Name: is_renting - Type: int Name: num_bikes_available - Type: int Name: last_reported - Type: int Name: station_id GlueTableDetailHistory: Type: AWS::Glue::Table Properties: DatabaseName: !Ref GlueDbDataLake CatalogId: !Ref AWS::AccountId TableInput: Name: station_detail_history Description: Historical data of bike station detail TableType: EXTERNAL_TABLE Parameters: classification: json typeOfData: file compressionType: gzip EXTERNAL: true PartitionKeys: - Type: string Name: year - Type: string Name: month - Type: string Name: day - Type: string Name: hour StorageDescriptor: InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: Location: !Sub 's3://${S3BucketDataLake}/raw/station_detail_history/' StoredAsSubDirectories: true Compressed: true SerdeInfo: SerializationLibrary: Parameters: paths: capacity,last_updated,lat,lon,name,station_id Columns: - Type: int Name: last_updated - Type: int Name: station_id - Type: string Name: name - Type: string Name: lon - Type: string Name: lat - Type: int Name: capacity GlueTableReview: Type: AWS::Glue::Table Properties: DatabaseName: !Ref GlueDbDataLake CatalogId: !Ref AWS::AccountId TableInput: Name: station_review_sentiment Description: User reviews of bike stations and sentiment score TableType: EXTERNAL_TABLE Parameters: classification: json typeOfData: file compressionType: gzip EXTERNAL: true PartitionKeys: - Type: string Name: year - Type: string Name: month - Type: string Name: day - Type: string Name: hour StorageDescriptor: InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: Location: !Sub 's3://${S3BucketDataLake}/raw/review_history/' StoredAsSubDirectories: true Compressed: true SerdeInfo: SerializationLibrary: Parameters: paths: station_name,user_id,review,station_id,create_date,sentiment,sentiment_mixed,sentiment_neutral,sentiment_positive,sentiment_negative Columns: - Type: string Name: station_name - Type: string Name: user_id - Type: string Name: review - Type: int Name: station_id - Type: int Name: create_date - Type: string Name: sentiment - Type: double Name: sentiment_mixed - Type: double Name: sentiment_neutral - Type: double Name: sentiment_positive - Type: double Name: sentiment_negative CrawlerStationHistory: Type: AWS::Glue::Crawler Properties: DatabaseName: !Ref GlueDbDataLake Description: Crawler to update partitions and statistics in Bike Demo database Role: !GetAtt RoleGlueDbCrawler.Arn SchemaChangePolicy: DeleteBehavior: LOG UpdateBehavior: LOG Targets: CatalogTargets: - DatabaseName: !Ref GlueDbDataLake Tables: - !Ref GlueTableStationHistory - !Ref GlueTableDetailHistory - !Ref GlueTableReview # -------------------------------- REDSHIFT SECURITY/SUBNET GROUP SGroupRedshift: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Redshift security group SecurityGroupEgress: - CidrIp: IpProtocol: '-1' SecurityGroupIngress: - CidrIp: !FindInMap [QuicksightMap, !Ref AWS::Region, CidrIp] IpProtocol: tcp FromPort: 5439 ToPort: 5439 VpcId: !Ref RedshiftVPC SGroupIngressRedshift: Type: AWS::EC2::SecurityGroupIngress Properties: FromPort: 0 GroupId: !GetAtt SGroupRedshift.GroupId IpProtocol: '-1' SourceSecurityGroupId: !GetAtt SGroupRedshift.GroupId ToPort: 65535 SubnetGroupRedshift: Type: AWS::Redshift::ClusterSubnetGroup Properties: Description: Subnet group for Bike demo Redshift cluster SubnetIds: !Ref RedshiftSubnets # -------------------------------- REDSHIFT ROLES RoleRedshift: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: Policies: - PolicyName: RoleRedshiftPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:Get* - s3:List* - s3:Put* Resource: - !Sub 'arn:aws:s3:::${S3BucketDataLake}' - !Sub 'arn:aws:s3:::${S3BucketDataLake}/*' - Effect: Allow Action: - glue:GetDatabase - glue:GetDatabases - glue:GetTable - glue:GetTables - glue:GetPartitions Resource: - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog' - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDbDataLake}' - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDbDataLake}/*' # -------------------------------- REDSHIFT SECRETS SecretRedshiftMaster: Type: AWS::SecretsManager::Secret Properties: Description: Manage secret for Redshift cluster master user GenerateSecretString: SecretStringTemplate: '{"username": "master"}' GenerateStringKey: "password" PasswordLength: 16 ExcludeCharacters: "\"@'/\\" SecretAttachRedshiftMaster: Type: AWS::SecretsManager::SecretTargetAttachment Properties: TargetType: AWS::Redshift::Cluster SecretId: !Ref SecretRedshiftMaster TargetId: !Ref RedshiftBikeDemo # -------------------------------- REDSHIFT CLUSTER RedshiftBikeDemo: Type: AWS::Redshift::Cluster Properties: DBName: !Ref RedshiftDbName MasterUsername: !Join ['', ['{{resolve:secretsmanager:', !Ref SecretRedshiftMaster, ':SecretString:username}}']] MasterUserPassword: !Join ['', ['{{resolve:secretsmanager:', !Ref SecretRedshiftMaster, ':SecretString:password}}']] PubliclyAccessible: false ClusterType: single-node NodeType: dc2.large ClusterSubnetGroupName: !Ref SubnetGroupRedshift IamRoles: - !GetAtt RoleRedshift.Arn VpcSecurityGroupIds: - !Ref SGroupRedshift # -------------------------------- GLUE CONNECTION GlueConnRedshift: Type: AWS::Glue::Connection DependsOn: RedshiftBikeDemo Properties: CatalogId: !Ref AWS::AccountId ConnectionInput: Name: !Sub '${AWS::StackName}-GlueConnRedshift' # Required for in-place updates Description: AWS Glue connection to Bike Demo Redshift cluster? ConnectionType: JDBC ConnectionProperties: JDBC_CONNECTION_URL: !Sub - 'jdbc:redshift://${EndpointAddress}:${EndpointPort}/${RedshiftDbName}' - EndpointAddress: !GetAtt RedshiftBikeDemo.Endpoint.Address EndpointPort: !GetAtt RedshiftBikeDemo.Endpoint.Port USERNAME: !Join ['', ['{{resolve:secretsmanager:', !Ref SecretRedshiftMaster, ':SecretString:username}}']] PASSWORD: !Join ['', ['{{resolve:secretsmanager:', !Ref SecretRedshiftMaster, ':SecretString:password}}']] PhysicalConnectionRequirements: SecurityGroupIdList: - !Ref SGroupRedshift SubnetId: !Select [0, !Ref RedshiftSubnets] AvailabilityZone: !Ref RedshiftAZ # -------------------------------- GLUE EXECUTION ROLE RoleGlueDbCrawler: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: RoleGlueDbCrawlerPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: - arn:aws:logs:*:*:* - Effect: Allow Action: - s3:GetObject - s3:ListBucket Resource: - !GetAtt S3BucketDataLake.Arn - Fn::Join: - "/" - - !GetAtt S3BucketDataLake.Arn - "*" RoleLoadStationHistoryJob: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: RoleLoadStationHistoryJobPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: - arn:aws:logs:*:*:* - Effect: Allow Action: - s3:GetObject - s3:ListBucket Resource: - !Sub 'arn:aws:s3:::${ArtifactsBucket}' - !Sub 'arn:aws:s3:::${ArtifactsBucket}/*' - Effect: Allow Action: - secretsmanager:DescribeSecret - secretsmanager:GetSecretValue Resource: !Ref SecretRedshiftMaster # -------------------------------- GLUE JOB JobLoadStationDetail: Type: AWS::Glue::Job Properties: Description: Glue Job to load bike station detail data to Redshift Role: !Ref RoleLoadStationHistoryJob GlueVersion: 1.0 Connections: Connections: - !Ref GlueConnRedshift Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub 's3://${ArtifactsBucket}/artifacts/' DefaultArguments: '--db_creds': !Ref SecretRedshiftMaster '--glue_db': bikedb_spectrum MaxCapacity: 0.0625 Timeout: 30 JobLoadStationHistory: Type: AWS::Glue::Job Properties: Description: Glue Job to load bike station history data to Redshift Role: !Ref RoleLoadStationHistoryJob GlueVersion: 1.0 Connections: Connections: - !Ref GlueConnRedshift Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub 's3://${ArtifactsBucket}/artifacts/' DefaultArguments: '--db_creds': !Ref SecretRedshiftMaster '--glue_db': bikedb_spectrum MaxCapacity: 0.0625 Timeout: 30 JobLoadStationReview: Type: AWS::Glue::Job Properties: Description: Glue Job to load user review sentiments to Redshift Role: !Ref RoleLoadStationHistoryJob GlueVersion: 1.0 Connections: Connections: - !Ref GlueConnRedshift Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub 's3://${ArtifactsBucket}/artifacts/' DefaultArguments: '--db_creds': !Ref SecretRedshiftMaster '--glue_db': bikedb_spectrum MaxCapacity: 0.0625 Timeout: 30 # -------------------------------- GLUE WORKFLOW TriggerLoadStationHistory: Type: AWS::Glue::Trigger Properties: Description: Trigger to load station history to Redshift after Crawler Type: CONDITIONAL StartOnCreation: true Actions: - JobName: !Ref JobLoadStationHistory Predicate: Conditions: - LogicalOperator: EQUALS CrawlerName: !Ref CrawlerStationHistory CrawlState: SUCCEEDED WorkflowName: !Ref WorkflowLoadStationHistory TriggerLoadStationDetail: Type: AWS::Glue::Trigger Properties: Description: Trigger to load station detail to Redshift after Crawler Type: CONDITIONAL StartOnCreation: true Actions: - JobName: !Ref JobLoadStationDetail Predicate: Conditions: - LogicalOperator: EQUALS CrawlerName: !Ref CrawlerStationHistory CrawlState: SUCCEEDED WorkflowName: !Ref WorkflowLoadStationHistory TriggerLoadStationReview: Type: AWS::Glue::Trigger Properties: Description: Trigger to load user reviews to Redshift after Crawler Type: CONDITIONAL StartOnCreation: true Actions: - JobName: !Ref JobLoadStationReview Predicate: Conditions: - LogicalOperator: EQUALS CrawlerName: !Ref CrawlerStationHistory CrawlState: SUCCEEDED WorkflowName: !Ref WorkflowLoadStationHistory TriggerStationHistoryCrawler: Type: AWS::Glue::Trigger Properties: Description: Trigger to schedule Crawler to update bike station partitions Type: SCHEDULED StartOnCreation: true Schedule: cron(15 * * * ? *) Actions: - CrawlerName: !Ref CrawlerStationHistory WorkflowName: !Ref WorkflowLoadStationHistory WorkflowLoadStationHistory: Type: AWS::Glue::Workflow Properties: Description: Workflow for updating bike station history partition and load data to Redshift # -------------------------------- GLUE ENDPOINT GlueEndpoint: Type: AWS::Glue::DevEndpoint Properties: GlueVersion: 1.0 RoleArn: !GetAtt RoleLoadStationHistoryJob.Arn Arguments: GLUE_PYTHON_VERSION: 3 # -------------------------------- SETUP SetupRedshiftDdl: Type: Custom::SetupFunction DependsOn: - RedshiftBikeDemo - RoleSetupRedshiftDdlLambda - LambdaSetupRedshiftDdl Properties: SchemaVersion: 1.1 # Change this to force update ServiceToken: !GetAtt LambdaSetupRedshiftDdl.Arn Outputs: DataLakeS3Bucket: Value: !Ref S3BucketDataLake Description: S3 bucket name of data lake GlueEndpointName: Value: !Ref GlueEndpoint Description: Name of Glue development endpoint RedshiftSecurityGroup: Value: !Ref SGroupRedshift Description: Security Group for Redshift cluster