# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
AWSTemplateFormatVersion: 2010-09-09
Description: This template creates the AWS Glue resources such as schema registry, database, table, connection, Glue job etc. for Stream consumer application.
Parameters:
EnvironmentName:
Description: An environment name that is prefixed to resource names
Type: String
VpcId:
Description: The ID of the VPC for security group
Type: AWS::EC2::VPC::Id
PrivateSubnet1:
Description: Subnet used for creating MSK cluser and Glue Connection.
Type: AWS::EC2::Subnet::Id
AvailabilityZoneOfPrivateSubnet1:
Description: Enter the AvailabilityZone for the private subnet1 used for Glue connection
Type: AWS::EC2::AvailabilityZone::Name
PrivateSubnet2:
Description: Second subnet for MSK cluster.
Type: AWS::EC2::Subnet::Id
SecretArn:
Description: Secretmanager ARN for MSK SASL/SCRAM authentication
Type: String
SecurityGroupForGlueConnection:
Description: Enter the Security Group created in the first step. This will used by Glue connection and EC2 instance.
Type: AWS::EC2::SecurityGroup::Id
ScriptPath:
Description: "Glue ETL script absolute S3 path. such as s3://s3-path/mskprocessing.py"
Type: String
SchemaRegistryName:
Description: Name of the Glue schema registry
Type: String
Default: "test-schema-registry"
MSKSchemaName:
Description: Name of the Schema
Type: String
Default: "test_payload_schema"
GlueDataBaseName:
Description: Database name on Glue catalog
Type: String
Default: "test_glue_database"
GlueTableName:
Description: Table name on Glue catalog
Type: String
Default: "test_glue_table"
GlueWorkerType:
Description: Worker type for Glue job
Type: String
Default: Standard
AllowedValues: [Standard, G.1X, G.2X, G.025X]
ConstraintDescription: must be a valid Glue Worker type.
NumberOfWorkers:
Description: Number of workers in Glue job
Type: Number
Default: 2
MinValue: 2
S3BucketForOutput:
Description: Bucket name for writing data from Glue Job
Type: String
TopicName:
Description: MSK Topic name that need to be processed.
Type: String
Default: "test"
Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
-
Label:
default: "General Parameters"
Parameters:
- EnvironmentName
- VpcId
- PrivateSubnet1
- PrivateSubnet2
- SecretArn
-
Label:
default: "AWS Glue connection Parameters"
Parameters:
- SecurityGroupForGlueConnection
- AvailabilityZoneOfPrivateSubnet1
-
Label:
default: "AWS Glue Schema Registry Parameters"
Parameters:
- SchemaRegistryName
- MSKSchemaName
-
Label:
default: "AWS Glue Catalog Parameters"
Parameters:
- GlueDataBaseName
- GlueTableName
-
Label:
default: "AWS Glue Jobs Parameters"
Parameters:
- ScriptPath
- GlueWorkerType
- NumberOfWorkers
- S3BucketForOutput
- TopicName
Resources:
# Below section implement custom resource backed on Lambda to return MSK bootstrap brokers
MSKSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: "Security group attached to MSK broker instances"
GroupName: "msk-sg"
VpcId: !Ref VpcId
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 9096
ToPort: 9096
SourceSecurityGroupId: !Ref SecurityGroupForGlueConnection
Description: "communicate with brokers by using SASL/SCRAM"
- IpProtocol: tcp
FromPort: 2181
ToPort: 2181
SourceSecurityGroupId: !Ref SecurityGroupForGlueConnection
Description: "communicate with Apache ZooKeeper by using default port"
SecurityGroupEgress:
- Description: Allow all outbound traffic
IpProtocol: "-1"
CidrIp: 0.0.0.0/0
Tags:
-
Key: AppName
Value: !Ref AWS::StackName
MSKCluster:
Type: 'AWS::MSK::Cluster'
Properties:
ClusterName: !Ref EnvironmentName
KafkaVersion: 2.8.1
NumberOfBrokerNodes: 2
BrokerNodeGroupInfo:
InstanceType: kafka.t3.small
ClientSubnets:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroups:
- !Ref MSKSecurityGroup
ConfigurationInfo:
Arn: !Ref MSKConfiguration
Revision: 1
EnhancedMonitoring: PER_TOPIC_PER_BROKER
ClientAuthentication:
Sasl:
Scram:
Enabled: true
Tags:
AppName: !Sub '${EnvironmentName}-${AWS::StackName}'
MSKConfiguration:
Type: AWS::MSK::Configuration
Properties:
Description: "configuration properties for MSK cluster"
KafkaVersionsList:
- 2.8.1
Name: !Sub "msk-config-${EnvironmentName}"
ServerProperties: |
auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
MSKScram:
Type: AWS::MSK::BatchScramSecret
Properties:
ClusterArn: !Ref MSKCluster
SecretArnList:
- !Ref SecretArn
BootstrapBrokersFunctionLogs:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/cfn-msk-bootstrap-brokers-${EnvironmentName}"
RetentionInDays: 30
BootstrapBrokersFunctionExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "glue-msk-getbroker-role-${EnvironmentName}"
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: '/service-role/'
Policies:
- PolicyName: !Sub "cfn-msk-msk-brokers-policy-${EnvironmentName}"
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kafka:GetBootstrapBrokers
Resource: !Ref MSKCluster
- Effect: Allow
Action:
- secretsmanager:GetSecretValue
Resource: !Ref SecretArn
Tags:
- Key: AppName
Value: !Sub '${EnvironmentName}-${AWS::StackName}'
BootstrapBrokersFunction:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |
import json
import logging
import cfnresponse
import boto3
session = boto3.session.Session()
client = session.client('kafka')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(event)
responseStatus = cfnresponse.FAILED
responseData = {}
ClusterArn = event['ResourceProperties'].get('ClusterArn')
if ClusterArn:
try:
ClusterArn = event['ResourceProperties']['ClusterArn']
response = client.get_bootstrap_brokers(
ClusterArn=ClusterArn
)
logger.info(response)
if (response['ResponseMetadata']['HTTPStatusCode'] == 200):
responseStatus = cfnresponse.SUCCESS
responseData['BootstrapBrokerStringSaslScram'] = response['BootstrapBrokerStringSaslScram']
except Exception:
logger.exception('Signaling failure to CloudFormation.')
cfnresponse.send(event, context, responseStatus, responseData)
return
FunctionName: !Sub "cfn-msk-bootstrap-brokers-${EnvironmentName}"
Handler: index.lambda_handler
Role: !GetAtt BootstrapBrokersFunctionExecutionRole.Arn
Runtime: python3.10
Timeout: 30
BootstrapBrokers:
Type: Custom::Function
Properties:
ServiceToken: !GetAtt BootstrapBrokersFunction.Arn
ClusterArn: !Ref MSKCluster
GlueConnectionMSK:
Type: 'AWS::Glue::Connection'
Properties:
CatalogId: !Ref 'AWS::AccountId'
ConnectionInput:
Name: GlueMskConnection
ConnectionProperties:
KAFKA_BOOTSTRAP_SERVERS: !GetAtt BootstrapBrokers.BootstrapBrokerStringSaslScram
SECRET_ID: !Ref SecretArn
KAFKA_SSL_ENABLED: true
KAFKA_SASL_MECHANISM: SCRAM-SHA-512
ConnectionType: KAFKA
PhysicalConnectionRequirements:
SecurityGroupIdList:
- !Ref SecurityGroupForGlueConnection
SubnetId: !Ref PrivateSubnet1
AvailabilityZone: !Ref AvailabilityZoneOfPrivateSubnet1
GlueSchemaRegistry:
Type: AWS::Glue::Registry
Properties:
Description: Schema registry for test
Name: !Ref SchemaRegistryName
Tags:
- Key: AppName
Value: !Sub '${EnvironmentName}-${AWS::StackName}'
MSKPayloadSchema:
Type: AWS::Glue::Schema
Properties:
Compatibility: "BACKWARD"
DataFormat: "AVRO"
Name: !Ref MSKSchemaName
Registry:
Arn: !Ref GlueSchemaRegistry
SchemaDefinition: >
{ "type": "record","namespace": "ABC_Organization","name": "Employee","fields": [{"name": "Name","type": "string"},{"name": "Age","type": "int"},{"name": "address","type": {"type": "record","name": "addressRecord","fields": [{"name": "street","type": "string"},{"name": "zipcode","type": "int" }]}}]}
GlueDataBase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref 'AWS::AccountId'
DatabaseInput:
Name: !Ref GlueDataBaseName
GlueTable:
Type: AWS::Glue::Table
Properties:
DatabaseName: !Ref GlueDataBase
CatalogId: !Ref 'AWS::AccountId'
TableInput:
Name : !Ref GlueTableName
Parameters:
classification: avro
connectionName: !Ref GlueConnectionMSK
StorageDescriptor:
SchemaReference:
SchemaVersionId: !GetAtt MSKPayloadSchema.InitialSchemaVersionId
Parameters:
connectionName: !Ref GlueConnectionMSK
typeOfData: kafka
SerdeInfo:
SerializationLibrary: 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
Parameters:
serialization.format: 1
InputFormat: 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OutputFormat: 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
GlueIamRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "Glue-ServiceRole-${EnvironmentName}"
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action:
- sts:AssumeRole
Path: '/'
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
Policies:
- PolicyName: !Sub "glue-secretm-access-policy-${EnvironmentName}"
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- secretsmanager:GetSecretValue
Resource: !Ref SecretArn
Tags:
- Key: AppName
Value: !Sub '${EnvironmentName}-${AWS::StackName}'
GlueCrawler:
Type: AWS::Glue::Crawler
Properties:
Name: !Sub "GlueCrawler-${EnvironmentName}"
Role: !Ref GlueIamRole
DatabaseName: !Ref GlueDataBase
Targets:
S3Targets:
- Path: !Sub 's3://${S3BucketForOutput}/output/'
SchemaChangePolicy:
UpdateBehavior: "UPDATE_IN_DATABASE"
DeleteBehavior: "LOG"
Tags:
"AppName": !Sub '${EnvironmentName}-${AWS::StackName}'
GlueStreamingJob:
Type: AWS::Glue::Job
Properties:
Name: !Sub "GlueStreamingJob-${EnvironmentName}"
Connections:
Connections:
- !Ref GlueConnectionMSK
Command:
Name: gluestreaming
ScriptLocation: !Ref ScriptPath
PythonVersion: "3"
Description: "Glue straming job for processing MSK topic"
ExecutionProperty:
MaxConcurrentRuns: 1
GlueVersion: "4.0"
WorkerType: !Ref GlueWorkerType
NumberOfWorkers: !Ref NumberOfWorkers
DefaultArguments:
"--enable-continuous-cloudwatch-log": "true"
"--enable-job-insights": "true"
"--enable-metrics": "true"
"--enable-spark-ui": "true"
"--job-bookmark-option": "job-bookmark-disable"
"--spark-event-logs-path": !Sub "s3://${S3BucketForOutput}/eventlogs/"
"--database_name": !Ref GlueDataBase
"--table_name": !Ref GlueTable
"--topic_name": !Ref TopicName
"--dest_dir": !Sub "s3://${S3BucketForOutput}/"
Role: !Ref GlueIamRole
Tags:
AppName: !Sub '${EnvironmentName}-${AWS::StackName}'
Outputs:
MSKClusterArn:
Description: ARN of created MSK cluster
Value: !Ref MSKCluster
MSKBootstrapServers:
Description: MSK Bootstrap Servers
Value: !GetAtt BootstrapBrokers.BootstrapBrokerStringSaslScram
GlueJobName:
Description: Glue job name
Value: !Ref GlueStreamingJob
GlueDatabase:
Description: Glue database name
Value: !Ref GlueDataBase
GlueTableName:
Description: Glue table name
Value: !Ref GlueTable
GlueSchemaRegistryName:
Description: Glue schema registry name
Value: !Ref SchemaRegistryName
SchemaName:
Description: "Schema registered on Schema registry. Schema definision is created per AWS Public documentation https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas"
Value: !Ref MSKSchemaName
OutputBucketName:
Value: !Ref S3BucketForOutput
Description: Name of the Amazon S3 bucket used to write Glue Streaming job output.
GlueJobIAMRole:
Value: !Ref GlueIamRole
Description: IAM Role ARN for Glue job.
GlueCrawlerName:
Value: !Ref GlueCrawler
Description: Name of the Crawler.