Description: 'AWS CloudFormation Sample Template Glue_Streaming_Joining_Nested_Data: This template demonstrates how to join nested JSON data from two kinesis data streams. **WARNING** This template creates multiple AWS services. You will be billed for the AWS resources used if you create a stack from this template.' Parameters: # The name of the job to be created S3BucketName: Type: String Default: glue-streaming-join-blog-bucket GlueServiceRoleName: Type: String Default: GlueStreamingDemoServiceRole GlueJobName: Type: String Default: awsglue-streaming-join-nested-json GlueDatabaseName: Type: String Default: awsglue-streaming-join-nested-json-db RecommenderStreamName: Type: String Default: recommenderStream NestedPurchaseStreamName: Type: String Default: nestedPurchaseStream Resources: S3Bucket: Type: AWS::S3::Bucket Properties: AccessControl: "LogDeliveryWrite" BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Join - '-' - - !Ref S3BucketName - !Ref AWS::AccountId NestedPurchaseStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Ref NestedPurchaseStreamName RetentionPeriodHours: 24 ShardCount: 2 RecommenderStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Ref RecommenderStreamName RetentionPeriodHours: 24 ShardCount: 2 AwsGlueStreamingJoinNestedJsonJob: Type: AWS::Glue::Job Properties: Name: !Ref GlueJobName Command: Name: gluestreaming ScriptLocation: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/streaming_join_job.py' Role: !Ref GlueServiceRole MaxRetries: 1 Timeout: 2880 GlueVersion: 4.0 MaxCapacity: 2 DefaultArguments: --TempDir: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/temporary/' --class: "GlueApp" --enable-auto-scaling: "true" --enable-continuous-cloudwatch-log: "true" --enable-glue-datacatalog: "true" --enable-job-insights: "true" --enable-metrics: "true" --enable-spark-ui: "true" --job-bookmark-option: "job-bookmark-disable" --job-language: "python" --output_path: !Join - '/' - - 's3:/' - !Ref S3Bucket --spark-event-logs-path: !Join - '/' - - 's3:/' - !Ref S3Bucket - 'code/sparkHistoryLogs/' GlueCatalogDatabase: Type: 'AWS::Glue::Database' Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: !Ref GlueDatabaseName Description: Database created for defining schemas on the kinesis data streams NestedPurchaseGlueTable: Type: 'AWS::Glue::Table' Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref GlueDatabaseName TableInput: Description: Table for purchase data Name: nested-purchase-stream-table TableType: EXTERNAL_TABLE Parameters: { "classification": "json" } StorageDescriptor: Location: !Ref NestedPurchaseStreamName Compressed: False InputFormat: "org.apache.hadoop.mapred.TextInputFormat" OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" SerdeInfo: SerializationLibrary: "org.openx.data.jsonserde.JsonSerDe" Parameters: typeOfData: "kinesis" streamARN: Fn::GetAtt: - "NestedPurchaseStream" - "Arn" Columns: - Name: "body" Type: "struct,quantity_purchased:bigint>>>,transaction_id:string,purchase_time:string,user_id:string>" DependsOn: [GlueCatalogDatabase, NestedPurchaseStream] RecommenderResultGlueTable: Type: 'AWS::Glue::Table' Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref GlueDatabaseName TableInput: Description: Table for recommendations based on purchase of a product Name: recommender-stream-table TableType: EXTERNAL_TABLE Parameters: { "classification": "json" } StorageDescriptor: Location: !Ref RecommenderStreamName Compressed: False InputFormat: "org.apache.hadoop.mapred.TextInputFormat" OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" SerdeInfo: SerializationLibrary: "org.openx.data.jsonserde.JsonSerDe" Parameters: typeOfData: "kinesis" streamARN: Fn::GetAtt: - "RecommenderStream" - "Arn" Columns: - Name: "purchase_id" Type: "string" - Name: "next_best_product_sku" Type: "string" - Name: "likelihood_score" Type: "bigint" - Name: "purchase_time" Type: "bigint" - Name: "recommendation_processed_time" Type: "bigint" DependsOn: [GlueCatalogDatabase, RecommenderStream] GlueServiceRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: !Ref GlueServiceRoleName AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"glue.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Policies: - PolicyName: GlueS3AccessPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: ['s3:*','s3-object-lambda:*'] Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - /* ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" - "arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess" Description: "Allows Glue to call AWS services on your behalf." DependsOn: [S3Bucket] IAMPolicyDynamoDBTable: Type: "AWS::IAM::Policy" Properties: PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "dynamodb:DeleteItem", "dynamodb:DescribeTable", "dynamodb:GetItem", "dynamodb:DescribeKinesisStreamingDestination", "dynamodb:BatchGetItem", "dynamodb:DisableKinesisStreamingDestination", "dynamodb:BatchWriteItem", "dynamodb:ConditionCheckItem", "dynamodb:PutItem", "dynamodb:PartiQLUpdate", "dynamodb:Scan", "dynamodb:Query", "dynamodb:UpdateItem", "dynamodb:DescribeTimeToLive", "dynamodb:GetShardIterator", "dynamodb:UpdateTable", "dynamodb:GetRecords", "dynamodb:DescribeReservedCapacityOfferings", "dynamodb:ListTables", "dynamodb:DescribeLimits" ], "Resource": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${DynamoDBTable}" } ] } Roles: - !Ref GlueServiceRole PolicyName: "accessdynamotblpolicy0v8glq" DynamoDBTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "ddb_partition" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" TableName: "enriched_purchases" KeySchema: - AttributeName: "ddb_partition" KeyType: "HASH" Outputs: S3Bucket: Value: !Ref S3Bucket