#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # vim: tabstop=2 shiftwidth=2 softtabstop=2 expandtab import aws_cdk as cdk from aws_cdk import ( Stack, aws_iam, aws_s3 as s3, aws_kinesisfirehose ) from constructs import Construct class KinesisFirehoseStack(Stack): def __init__(self, scope: Construct, construct_id: str, trans_kinesis_stream, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) s3_bucket = s3.Bucket(self, "s3bucket", bucket_name="aws-analytics-immersion-day-{region}-{account}".format( region=cdk.Aws.REGION, account=cdk.Aws.ACCOUNT_ID)) firehose_role_policy_doc = aws_iam.PolicyDocument() firehose_role_policy_doc.add_statements(aws_iam.PolicyStatement(**{ "effect": aws_iam.Effect.ALLOW, "resources": [s3_bucket.bucket_arn, "{}/*".format(s3_bucket.bucket_arn)], "actions": ["s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject"] })) firehose_role_policy_doc.add_statements(aws_iam.PolicyStatement( effect=aws_iam.Effect.ALLOW, resources=["*"], actions=["glue:GetTable", "glue:GetTableVersion", "glue:GetTableVersions"] )) firehose_role_policy_doc.add_statements(aws_iam.PolicyStatement( effect=aws_iam.Effect.ALLOW, resources=[trans_kinesis_stream.stream_arn], # resources=[kinesis_stream_arn], actions=["kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords"] )) firehose_log_group_name = "/aws/kinesisfirehose/retail-trans" firehose_role_policy_doc.add_statements(aws_iam.PolicyStatement( effect=aws_iam.Effect.ALLOW, #XXX: The ARN will be formatted as follows: # arn:{partition}:{service}:{region}:{account}:{resource}{sep}}{resource-name} resources=[self.format_arn(service="logs", resource="log-group", resource_name="{}:log-stream:*".format(firehose_log_group_name), arn_format=cdk.ArnFormat.COLON_RESOURCE_NAME)], actions=["logs:PutLogEvents"] )) firehose_role = aws_iam.Role(self, "FirehoseDeliveryRole", role_name="FirehoseDeliveryRole", assumed_by=aws_iam.ServicePrincipal("firehose.amazonaws.com"), #XXX: use inline_policies to work around https://github.com/aws/aws-cdk/issues/5221 inline_policies={ "firehose_role_policy": firehose_role_policy_doc } ) trans_to_s3_delivery_stream = aws_kinesisfirehose.CfnDeliveryStream(self, "KinesisFirehoseToS3", delivery_stream_name="retail-trans", delivery_stream_type="KinesisStreamAsSource", kinesis_stream_source_configuration={ "kinesisStreamArn": trans_kinesis_stream.stream_arn, # "kinesisStreamArn": kinesis_stream_arn, "roleArn": firehose_role.role_arn }, extended_s3_destination_configuration={ "bucketArn": s3_bucket.bucket_arn, "bufferingHints": { "intervalInSeconds": 60, "sizeInMBs": 1 }, "cloudWatchLoggingOptions": { "enabled": True, "logGroupName": firehose_log_group_name, "logStreamName": "S3Delivery" }, "compressionFormat": "UNCOMPRESSED", # [GZIP | HADOOP_SNAPPY | Snappy | UNCOMPRESSED | ZIP] "prefix": "json-data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/", "errorOutputPrefix": "error-json/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}", "roleArn": firehose_role.role_arn } ) self.s3_bucket_name = s3_bucket.bucket_name cdk.CfnOutput(self, '{}_S3DestBucket'.format(self.stack_name), value=s3_bucket.bucket_name, export_name='S3DestBucket') cdk.CfnOutput(self, 'FirehoseRoleArn', value=firehose_role.role_arn, export_name='FirehoseRoleArn')