# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: 2010-09-09 Description: This template creates the AWS Glue resources such as schema registry, database, table, connection, Glue job etc. for Stream consumer application. Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String VpcId: Description: The ID of the VPC for security group Type: AWS::EC2::VPC::Id PrivateSubnet1: Description: Subnet used for creating MSK cluser and Glue Connection. Type: AWS::EC2::Subnet::Id AvailabilityZoneOfPrivateSubnet1: Description: Enter the AvailabilityZone for the private subnet1 used for Glue connection Type: AWS::EC2::AvailabilityZone::Name PrivateSubnet2: Description: Second subnet for MSK cluster. Type: AWS::EC2::Subnet::Id SecretArn: Description: Secretmanager ARN for MSK SASL/SCRAM authentication Type: String SecurityGroupForGlueConnection: Description: Enter the Security Group created in the first step. This will used by Glue connection and EC2 instance. Type: AWS::EC2::SecurityGroup::Id ScriptPath: Description: "Glue ETL script absolute S3 path. such as s3://s3-path/mskprocessing.py" Type: String SchemaRegistryName: Description: Name of the Glue schema registry Type: String Default: "test-schema-registry" MSKSchemaName: Description: Name of the Schema Type: String Default: "test_payload_schema" GlueDataBaseName: Description: Database name on Glue catalog Type: String Default: "test_glue_database" GlueTableName: Description: Table name on Glue catalog Type: String Default: "test_glue_table" GlueWorkerType: Description: Worker type for Glue job Type: String Default: Standard AllowedValues: [Standard, G.1X, G.2X, G.025X] ConstraintDescription: must be a valid Glue Worker type. NumberOfWorkers: Description: Number of workers in Glue job Type: Number Default: 2 MinValue: 2 S3BucketForOutput: Description: Bucket name for writing data from Glue Job Type: String TopicName: Description: MSK Topic name that need to be processed. Type: String Default: "test" Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "General Parameters" Parameters: - EnvironmentName - VpcId - PrivateSubnet1 - PrivateSubnet2 - SecretArn - Label: default: "AWS Glue connection Parameters" Parameters: - SecurityGroupForGlueConnection - AvailabilityZoneOfPrivateSubnet1 - Label: default: "AWS Glue Schema Registry Parameters" Parameters: - SchemaRegistryName - MSKSchemaName - Label: default: "AWS Glue Catalog Parameters" Parameters: - GlueDataBaseName - GlueTableName - Label: default: "AWS Glue Jobs Parameters" Parameters: - ScriptPath - GlueWorkerType - NumberOfWorkers - S3BucketForOutput - TopicName Resources: # Below section implement custom resource backed on Lambda to return MSK bootstrap brokers MSKSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: "Security group attached to MSK broker instances" GroupName: "msk-sg" VpcId: !Ref VpcId SecurityGroupIngress: - IpProtocol: tcp FromPort: 9096 ToPort: 9096 SourceSecurityGroupId: !Ref SecurityGroupForGlueConnection Description: "communicate with brokers by using SASL/SCRAM" - IpProtocol: tcp FromPort: 2181 ToPort: 2181 SourceSecurityGroupId: !Ref SecurityGroupForGlueConnection Description: "communicate with Apache ZooKeeper by using default port" SecurityGroupEgress: - Description: Allow all outbound traffic IpProtocol: "-1" CidrIp: 0.0.0.0/0 Tags: - Key: AppName Value: !Ref AWS::StackName MSKCluster: Type: 'AWS::MSK::Cluster' Properties: ClusterName: !Ref EnvironmentName KafkaVersion: 2.8.1 NumberOfBrokerNodes: 2 BrokerNodeGroupInfo: InstanceType: kafka.t3.small ClientSubnets: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 SecurityGroups: - !Ref MSKSecurityGroup ConfigurationInfo: Arn: !Ref MSKConfiguration Revision: 1 EnhancedMonitoring: PER_TOPIC_PER_BROKER ClientAuthentication: Sasl: Scram: Enabled: true Tags: AppName: !Sub '${EnvironmentName}-${AWS::StackName}' MSKConfiguration: Type: AWS::MSK::Configuration Properties: Description: "configuration properties for MSK cluster" KafkaVersionsList: - 2.8.1 Name: !Sub "msk-config-${EnvironmentName}" ServerProperties: | auto.create.topics.enable=true default.replication.factor=2 min.insync.replicas=2 num.io.threads=8 num.network.threads=5 num.partitions=1 num.replica.fetchers=2 replica.lag.time.max.ms=30000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 unclean.leader.election.enable=true zookeeper.session.timeout.ms=18000 MSKScram: Type: AWS::MSK::BatchScramSecret Properties: ClusterArn: !Ref MSKCluster SecretArnList: - !Ref SecretArn BootstrapBrokersFunctionLogs: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "/aws/lambda/cfn-msk-bootstrap-brokers-${EnvironmentName}" RetentionInDays: 30 BootstrapBrokersFunctionExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "glue-msk-getbroker-role-${EnvironmentName}" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: '/service-role/' Policies: - PolicyName: !Sub "cfn-msk-msk-brokers-policy-${EnvironmentName}" PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kafka:GetBootstrapBrokers Resource: !Ref MSKCluster - Effect: Allow Action: - secretsmanager:GetSecretValue Resource: !Ref SecretArn Tags: - Key: AppName Value: !Sub '${EnvironmentName}-${AWS::StackName}' BootstrapBrokersFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: | import json import logging import cfnresponse import boto3 session = boto3.session.Session() client = session.client('kafka') logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info(event) responseStatus = cfnresponse.FAILED responseData = {} ClusterArn = event['ResourceProperties'].get('ClusterArn') if ClusterArn: try: ClusterArn = event['ResourceProperties']['ClusterArn'] response = client.get_bootstrap_brokers( ClusterArn=ClusterArn ) logger.info(response) if (response['ResponseMetadata']['HTTPStatusCode'] == 200): responseStatus = cfnresponse.SUCCESS responseData['BootstrapBrokerStringSaslScram'] = response['BootstrapBrokerStringSaslScram'] except Exception: logger.exception('Signaling failure to CloudFormation.') cfnresponse.send(event, context, responseStatus, responseData) return FunctionName: !Sub "cfn-msk-bootstrap-brokers-${EnvironmentName}" Handler: index.lambda_handler Role: !GetAtt BootstrapBrokersFunctionExecutionRole.Arn Runtime: python3.10 Timeout: 30 BootstrapBrokers: Type: Custom::Function Properties: ServiceToken: !GetAtt BootstrapBrokersFunction.Arn ClusterArn: !Ref MSKCluster GlueConnectionMSK: Type: 'AWS::Glue::Connection' Properties: CatalogId: !Ref 'AWS::AccountId' ConnectionInput: Name: GlueMskConnection ConnectionProperties: KAFKA_BOOTSTRAP_SERVERS: !GetAtt BootstrapBrokers.BootstrapBrokerStringSaslScram SECRET_ID: !Ref SecretArn KAFKA_SSL_ENABLED: true KAFKA_SASL_MECHANISM: SCRAM-SHA-512 ConnectionType: KAFKA PhysicalConnectionRequirements: SecurityGroupIdList: - !Ref SecurityGroupForGlueConnection SubnetId: !Ref PrivateSubnet1 AvailabilityZone: !Ref AvailabilityZoneOfPrivateSubnet1 GlueSchemaRegistry: Type: AWS::Glue::Registry Properties: Description: Schema registry for test Name: !Ref SchemaRegistryName Tags: - Key: AppName Value: !Sub '${EnvironmentName}-${AWS::StackName}' MSKPayloadSchema: Type: AWS::Glue::Schema Properties: Compatibility: "BACKWARD" DataFormat: "AVRO" Name: !Ref MSKSchemaName Registry: Arn: !Ref GlueSchemaRegistry SchemaDefinition: > { "type": "record","namespace": "ABC_Organization","name": "Employee","fields": [{"name": "Name","type": "string"},{"name": "Age","type": "int"},{"name": "address","type": {"type": "record","name": "addressRecord","fields": [{"name": "street","type": "string"},{"name": "zipcode","type": "int" }]}}]} GlueDataBase: Type: AWS::Glue::Database Properties: CatalogId: !Ref 'AWS::AccountId' DatabaseInput: Name: !Ref GlueDataBaseName GlueTable: Type: AWS::Glue::Table Properties: DatabaseName: !Ref GlueDataBase CatalogId: !Ref 'AWS::AccountId' TableInput: Name : !Ref GlueTableName Parameters: classification: avro connectionName: !Ref GlueConnectionMSK StorageDescriptor: SchemaReference: SchemaVersionId: !GetAtt MSKPayloadSchema.InitialSchemaVersionId Parameters: connectionName: !Ref GlueConnectionMSK typeOfData: kafka SerdeInfo: SerializationLibrary: 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' Parameters: serialization.format: 1 InputFormat: 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OutputFormat: 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' GlueIamRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "Glue-ServiceRole-${EnvironmentName}" AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - sts:AssumeRole Path: '/' ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" Policies: - PolicyName: !Sub "glue-secretm-access-policy-${EnvironmentName}" PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - secretsmanager:GetSecretValue Resource: !Ref SecretArn Tags: - Key: AppName Value: !Sub '${EnvironmentName}-${AWS::StackName}' GlueCrawler: Type: AWS::Glue::Crawler Properties: Name: !Sub "GlueCrawler-${EnvironmentName}" Role: !Ref GlueIamRole DatabaseName: !Ref GlueDataBase Targets: S3Targets: - Path: !Sub 's3://${S3BucketForOutput}/output/' SchemaChangePolicy: UpdateBehavior: "UPDATE_IN_DATABASE" DeleteBehavior: "LOG" Tags: "AppName": !Sub '${EnvironmentName}-${AWS::StackName}' GlueStreamingJob: Type: AWS::Glue::Job Properties: Name: !Sub "GlueStreamingJob-${EnvironmentName}" Connections: Connections: - !Ref GlueConnectionMSK Command: Name: gluestreaming ScriptLocation: !Ref ScriptPath PythonVersion: "3" Description: "Glue straming job for processing MSK topic" ExecutionProperty: MaxConcurrentRuns: 1 GlueVersion: "4.0" WorkerType: !Ref GlueWorkerType NumberOfWorkers: !Ref NumberOfWorkers DefaultArguments: "--enable-continuous-cloudwatch-log": "true" "--enable-job-insights": "true" "--enable-metrics": "true" "--enable-spark-ui": "true" "--job-bookmark-option": "job-bookmark-disable" "--spark-event-logs-path": !Sub "s3://${S3BucketForOutput}/eventlogs/" "--database_name": !Ref GlueDataBase "--table_name": !Ref GlueTable "--topic_name": !Ref TopicName "--dest_dir": !Sub "s3://${S3BucketForOutput}/" Role: !Ref GlueIamRole Tags: AppName: !Sub '${EnvironmentName}-${AWS::StackName}' Outputs: MSKClusterArn: Description: ARN of created MSK cluster Value: !Ref MSKCluster MSKBootstrapServers: Description: MSK Bootstrap Servers Value: !GetAtt BootstrapBrokers.BootstrapBrokerStringSaslScram GlueJobName: Description: Glue job name Value: !Ref GlueStreamingJob GlueDatabase: Description: Glue database name Value: !Ref GlueDataBase GlueTableName: Description: Glue table name Value: !Ref GlueTable GlueSchemaRegistryName: Description: Glue schema registry name Value: !Ref SchemaRegistryName SchemaName: Description: "Schema registered on Schema registry. Schema definision is created per AWS Public documentation https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas" Value: !Ref MSKSchemaName OutputBucketName: Value: !Ref S3BucketForOutput Description: Name of the Amazon S3 bucket used to write Glue Streaming job output. GlueJobIAMRole: Value: !Ref GlueIamRole Description: IAM Role ARN for Glue job. GlueCrawlerName: Value: !Ref GlueCrawler Description: Name of the Crawler.