AWSTemplateFormatVersion: 2010-09-09 Description: DynamoDB Streaming Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Network & Security Configuration" Parameters: - Keyname - VpcCIDR - PublicSubnet1CIDR - PublicSubnet2CIDR ParameterLabels: Keyname: default: "Keypair name" VpcCIDR: default: "IP range for new VPC" PublicSubnet1CIDR: default: "Public Subnet 1 IP range" PublicSubnet2CIDR: default: "Public Subnet 2 IP range" Parameters: EnvironmentName: Description: An environment name that will be prefixed to resource names Type: String Default: hudi-blog VpcCIDR: Description: Enter the IP range (CIDR notation) for this VPC or leave it default Type: String Default: 10.0.0.0/16 PublicSubnet1CIDR: Description: Enter the IP range (CIDR notation) for the public subnet in the first Availability Zone or leave it default Type: String Default: 10.0.1.0/24 PublicSubnet2CIDR: Description: Enter the IP range (CIDR notation) for the public subnet in the second Availability Zone or leave it default Type: String Default: 10.0.2.0/24 Keyname: Description: Chose key name Type: AWS::EC2::KeyPair::KeyName Resources: VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR InstanceTenancy: default EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: !Ref EnvironmentName PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: us-west-1a CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Ref EnvironmentName PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: us-west-1b CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Ref EnvironmentName InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: !Ref EnvironmentName InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes # blogHubSg: # Type: AWS::EC2::SecurityGroup # Properties: # GroupDescription: String # GroupName: blogHubSg # SecurityGroupIngress: # - # IpProtocol: tcp # FromPort: 22 # ToPort: 22 # CidrIp: 0.0.0.0/0 # - # IpProtocol: tcp # FromPort: 8081 # ToPort: 8081 # CidrIp: 0.0.0.0/0 # VpcId: !Ref VPC DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 # AppS3Bucket: # Type: AWS::S3::Bucket # Properties: # BucketName: !Ref FlinkS3Bucket # AccessControl: Private S3Bucket: Type: AWS::S3::Bucket Properties: AccessControl: Private BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' VersioningConfiguration: Status: Enabled EmrInstanceRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "ec2.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role' #- 'arn:aws:iam::aws:policy/AmazonMSKFullAccess' EMRClusterServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: Effect: Allow Principal: Service: - 'elasticmapreduce.amazonaws.com' Action: - 'sts:AssumeRole' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole' Path: / MyEmrCluster: Type: AWS::EMR::Cluster Properties: Name: blog-demo JobFlowRole: !Ref EmrInstanceProfile LogUri: !Sub 's3://${S3Bucket}/EMRLOGS/' ReleaseLabel: emr-5.30.0 # Configurations: # - # Classification: "flink-conf" # ConfigurationProperties: # taskmanager.numberOfTaskSlots: 2 Applications: - Name: Hadoop - Name: Spark - Name: Tez - Name: Hive Instances: MasterInstanceGroup: InstanceCount: 1 InstanceType: m4.xlarge Market: ON_DEMAND Name: cfnMaster CoreInstanceGroup: InstanceCount: 2 InstanceType: m4.large Market: ON_DEMAND Name: cfnCore Ec2SubnetId: !Ref PublicSubnet1 Ec2KeyName: !Ref Keyname VisibleToAllUsers: true ServiceRole: !Ref EMRClusterServiceRole EmrInstanceProfile: Type: "AWS::IAM::InstanceProfile" Properties: Path: / Roles: - Ref: "EmrInstanceRole" SourceS3Bucket: Type: 'AWS::S3::Bucket' Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' VersioningConfiguration: Status: Enabled RawS3Bucket: Type: 'AWS::S3::Bucket' Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' VersioningConfiguration: Status: Enabled ProcessedS3Bucket: Type: 'AWS::S3::Bucket' Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' VersioningConfiguration: Status: Enabled DynamoDBTable: Type: AWS::DynamoDB::Table Properties: TableName: !Sub 'order_transaction_${AWS::StackName}' AttributeDefinitions: - AttributeName: order_id AttributeType: S - AttributeName: item_id AttributeType: S KeySchema: - AttributeName: order_id KeyType: HASH - AttributeName: item_id KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 InputKinesisStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Sub 'order-data-stream-${AWS::StackName}' ShardCount: 1 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis Deliverystream: DependsOn: - DeliveryPolicy Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: KinesisStreamAsSource KinesisStreamSourceConfiguration: KinesisStreamARN: !GetAtt InputKinesisStream.Arn RoleARN: !GetAtt DeliveryRole.Arn ExtendedS3DestinationConfiguration: BucketARN: !GetAtt RawS3Bucket.Arn BufferingHints: IntervalInSeconds: '60' SizeInMBs: '64' CompressionFormat: UNCOMPRESSED RoleARN: !GetAtt DeliveryRole.Arn ProcessingConfiguration: Enabled: 'true' Processors: - Parameters: - ParameterName: LambdaArn ParameterValue: !GetAtt ProcessLambdaFunction.Arn Type: Lambda DataFormatConversionConfiguration: SchemaConfiguration: CatalogId: !Ref AWS::AccountId RoleARN: !GetAtt DeliveryRole.Arn DatabaseName: !Ref GlueDatabase TableName: !Ref GlueTable Region: !Ref AWS::Region VersionId: LATEST InputFormatConfiguration: Deserializer: OpenXJsonSerDe: {} OutputFormatConfiguration: Serializer: ParquetSerDe: {} Enabled: True DeliveryRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' DeliveryPolicy: Type: AWS::IAM::Policy Properties: PolicyName: firehose_delivery_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Sub 'arn:aws:s3:::${RawS3Bucket}' - !Sub 'arn:aws:s3:::${RawS3Bucket}*' - Effect: Allow Action: - 'lambda:InvokeFunction' - 'lambda:GetFunctionConfiguration' Resource: !GetAtt ProcessLambdaFunction.Arn - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: !GetAtt ProcessLambdaFunction.Arn - Effect: Allow Action: - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*' - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' - 'kinesis:ListShards' Resource: !GetAtt InputKinesisStream.Arn - Effect: Allow Action: - 'glue:GetTableVersions' Resource: '*' Roles: - !Ref DeliveryRole ProcessLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: arn:aws:logs:*:*:* ProcessLambdaFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: | from botocore.vendored import requests import base64 import json def lambda_handler(event, context): output = [] for record in event['records']: payload=base64.b64decode(record["data"]) payload_json = json.loads(payload)['dynamodb'] # Check if this is a new/updated item if 'NewImage' in payload_json: dynamo_record = payload_json['NewImage'] parquet_schema_record = { 'order_id': dynamo_record['order_id']['S'], 'item_id': dynamo_record['item_id']['S'], 'customer_id': dynamo_record['customer_id']['S'], 'product': dynamo_record['product']['S'], 'amount': dynamo_record['amount']['S'], 'currency': dynamo_record['currency']['S'], 'time_stamp': dynamo_record['time_stamp']['S'], 'transaction_date': dynamo_record['transaction_date']['S'] } output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(json.dumps(parquet_schema_record).encode('utf-8')).decode('utf-8') } else: output_record = { 'recordId': record['recordId'], 'result': 'Dropped', 'data': record['data'] } output.append(output_record) records = {'records': output} return records Handler: index.lambda_handler Role: !GetAtt ProcessLambdaExecutionRole.Arn Runtime: python3.8 Timeout: 600 MemorySize: 128 CsvToDDBLambdaRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com - s3.amazonaws.com Action: - 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' - 'arn:aws:iam::aws:policy/AWSLambdaInvocation-DynamoDB' - 'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess' Policies: - PolicyName: policyname PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Resource: !GetAtt DynamoDBTable.Arn Action: - 'dynamodb:PutItem' - 'dynamodb:BatchWriteItem' CsvToDDBLambdaFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Role: !GetAtt - CsvToDDBLambdaRole - Arn Code: ZipFile: !Join - |+ - - import json - import boto3 - import os - import csv - import codecs - import sys - '' - s3 = boto3.resource('s3') - dynamodb = boto3.resource('dynamodb') - '' - 'bucket = os.environ[''bucket'']' - 'key = os.environ[''key'']' - 'tableName = os.environ[''table'']' - '' - 'def lambda_handler(event, context):' - '' - '' - ' #get() does not store in memory' - ' try:' - ' obj = s3.Object(bucket, key).get()[''Body'']' - ' except:' - ' print("S3 Object could not be opened. Check environment variable. ")' - ' try:' - ' table = dynamodb.Table(tableName)' - ' except:' - ' print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")' - '' - ' batch_size = 100' - ' batch = []' - '' - ' #DictReader is a generator; not stored in memory' - ' for row in csv.DictReader(codecs.getreader(''utf-8'')(obj)):' - ' if len(batch) >= batch_size:' - ' write_to_dynamo(batch)' - ' batch.clear()' - '' - ' batch.append(row)' - '' - ' if batch:' - ' write_to_dynamo(batch)' - '' - ' return {' - ' ''statusCode'': 200,' - ' ''body'': json.dumps(''Uploaded to DynamoDB Table'')' - ' }' - '' - '' - 'def write_to_dynamo(rows):' - ' try:' - ' table = dynamodb.Table(tableName)' - ' except:' - ' print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")' - '' - ' try:' - ' with table.batch_writer() as batch:' - ' for i in range(len(rows)):' - ' batch.put_item(' - ' Item=rows[i]' - ' )' - ' except:' - ' print("Error executing batch_writer")' Runtime: python3.7 Timeout: 900 MemorySize: 3008 Environment: Variables: bucket: !Ref SourceS3Bucket key: 'order_data_09_02_2020.csv' table: !Ref DynamoDBTable GlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: {} GlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref GlueDatabase TableInput: Retention: 0 StorageDescriptor: Columns: - Name: order_id Type: string - Name: item_id Type: string - Name: customer_id Type: string - Name: product Type: string - Name: amount Type: string - Name: currency Type: string - Name: time_stamp Type: string - Name: transaction_date Type: string InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: false NumberOfBuckets: -1 SerdeInfo: SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Parameters: serialization.format: '1' BucketColumns: [] SortColumns: [] StoredAsSubDirectories: false Outputs: SourceS3Bucket: Value: !Ref SourceS3Bucket RawS3Bucket: Value: !Ref RawS3Bucket ProcessedS3Bucket: Value: !Ref ProcessedS3Bucket