# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
 
AWSTemplateFormatVersion: 2010-09-09

Description: This template creates the AWS Glue resources such as schema registry, database, table, connection, Glue job etc. for Stream consumer application.

Parameters:
  EnvironmentName:
    Description: An environment name that is prefixed to resource names
    Type: String
  VpcId:
    Description: The ID of the VPC for security group
    Type: AWS::EC2::VPC::Id
  PrivateSubnet1:
    Description: Subnet used for creating MSK cluser and Glue Connection.
    Type: AWS::EC2::Subnet::Id
  AvailabilityZoneOfPrivateSubnet1:
    Description: Enter the AvailabilityZone for the private subnet1 used for Glue connection
    Type: AWS::EC2::AvailabilityZone::Name
  PrivateSubnet2:
    Description: Second subnet for MSK cluster.
    Type: AWS::EC2::Subnet::Id
  SecretArn:
    Description: Secretmanager ARN for MSK SASL/SCRAM authentication
    Type: String
  SecurityGroupForGlueConnection:
    Description: Enter the Security Group created in the first step. This will used by Glue connection and EC2 instance.
    Type: AWS::EC2::SecurityGroup::Id
  ScriptPath:
    Description: "Glue ETL script absolute S3 path. such as s3://s3-path/mskprocessing.py"
    Type: String
  SchemaRegistryName:
    Description: Name of the Glue schema registry
    Type: String
    Default: "test-schema-registry"
  MSKSchemaName:
    Description: Name of the Schema
    Type: String
    Default: "test_payload_schema"
  GlueDataBaseName:
    Description: Database name on Glue catalog
    Type: String
    Default: "test_glue_database"
  GlueTableName:
    Description: Table name on Glue catalog
    Type: String
    Default: "test_glue_table"
  GlueWorkerType:
    Description: Worker type for Glue job
    Type: String
    Default: Standard
    AllowedValues: [Standard, G.1X, G.2X, G.025X]
    ConstraintDescription: must be a valid Glue Worker type.
  NumberOfWorkers:
    Description: Number of workers in Glue job
    Type: Number
    Default: 2
    MinValue: 2
  S3BucketForOutput:
    Description: Bucket name for writing data from Glue Job
    Type: String
  TopicName:
    Description: MSK Topic name that need to be processed.
    Type: String
    Default: "test"
  
Metadata: 
  AWS::CloudFormation::Interface: 
    ParameterGroups:
      - 
        Label: 
          default: "General Parameters"
        Parameters: 
          - EnvironmentName
          - VpcId
          - PrivateSubnet1
          - PrivateSubnet2
          - SecretArn
      - 
        Label: 
          default: "AWS Glue connection Parameters"
        Parameters: 
          - SecurityGroupForGlueConnection
          - AvailabilityZoneOfPrivateSubnet1  
      - 
        Label: 
          default: "AWS Glue Schema Registry Parameters"
        Parameters: 
          - SchemaRegistryName
          - MSKSchemaName
      - 
        Label: 
          default: "AWS Glue Catalog Parameters"
        Parameters: 
          - GlueDataBaseName
          - GlueTableName
      - 
        Label: 
          default: "AWS Glue Jobs Parameters"
        Parameters:
          - ScriptPath
          - GlueWorkerType
          - NumberOfWorkers
          - S3BucketForOutput
          - TopicName

Resources:
# Below section implement custom resource backed on Lambda to return MSK bootstrap brokers
  MSKSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: "Security group attached to MSK broker instances"
      GroupName: "msk-sg"
      VpcId: !Ref VpcId
      SecurityGroupIngress:
      - IpProtocol: tcp
        FromPort: 9096
        ToPort: 9096
        SourceSecurityGroupId: !Ref SecurityGroupForGlueConnection
        Description: "communicate with brokers by using SASL/SCRAM"
      - IpProtocol: tcp
        FromPort: 2181
        ToPort: 2181
        SourceSecurityGroupId: !Ref SecurityGroupForGlueConnection
        Description: "communicate with Apache ZooKeeper by using default port"
      SecurityGroupEgress:
      - Description: Allow all outbound traffic
        IpProtocol: "-1"
        CidrIp: 0.0.0.0/0
      Tags:
          -
            Key: AppName
            Value: !Ref AWS::StackName  
  MSKCluster:
    Type: 'AWS::MSK::Cluster'
    Properties:
      ClusterName: !Ref EnvironmentName
      KafkaVersion: 2.8.1
      NumberOfBrokerNodes: 2
      BrokerNodeGroupInfo:
        InstanceType: kafka.t3.small
        ClientSubnets:
          - !Ref PrivateSubnet1
          - !Ref PrivateSubnet2
        SecurityGroups:
          - !Ref MSKSecurityGroup
      ConfigurationInfo:
        Arn: !Ref MSKConfiguration
        Revision: 1
      EnhancedMonitoring: PER_TOPIC_PER_BROKER
      ClientAuthentication:
        Sasl:
          Scram:
            Enabled: true
      Tags:
        AppName: !Sub '${EnvironmentName}-${AWS::StackName}'

  MSKConfiguration:
    Type: AWS::MSK::Configuration
    Properties:
      Description: "configuration properties for MSK cluster"
      KafkaVersionsList:
        - 2.8.1
      Name: !Sub "msk-config-${EnvironmentName}"
      ServerProperties: |
        auto.create.topics.enable=true
        default.replication.factor=2
        min.insync.replicas=2
        num.io.threads=8
        num.network.threads=5
        num.partitions=1
        num.replica.fetchers=2
        replica.lag.time.max.ms=30000
        socket.receive.buffer.bytes=102400
        socket.request.max.bytes=104857600
        socket.send.buffer.bytes=102400
        unclean.leader.election.enable=true
        zookeeper.session.timeout.ms=18000
  

  MSKScram:
    Type: AWS::MSK::BatchScramSecret
    Properties:
      ClusterArn: !Ref MSKCluster
      SecretArnList:
      - !Ref SecretArn
  BootstrapBrokersFunctionLogs:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/cfn-msk-bootstrap-brokers-${EnvironmentName}"
      RetentionInDays: 30
  
  BootstrapBrokersFunctionExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "glue-msk-getbroker-role-${EnvironmentName}"
      ManagedPolicyArns:
        -  "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      Path: '/service-role/'
      Policies:
      - PolicyName: !Sub "cfn-msk-msk-brokers-policy-${EnvironmentName}"
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
              - kafka:GetBootstrapBrokers
            Resource: !Ref MSKCluster
          - Effect: Allow
            Action:
              - secretsmanager:GetSecretValue
            Resource: !Ref SecretArn
      Tags: 
        - Key: AppName
          Value: !Sub '${EnvironmentName}-${AWS::StackName}'
            
  BootstrapBrokersFunction:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |
          import json
          import logging
          import cfnresponse
          import boto3
          session = boto3.session.Session()
          client = session.client('kafka')
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
            
          def lambda_handler(event, context):
            logger.info(event)
            responseStatus = cfnresponse.FAILED
            responseData = {}
            ClusterArn = event['ResourceProperties'].get('ClusterArn')
            if ClusterArn:
              try:
                ClusterArn = event['ResourceProperties']['ClusterArn']
                response = client.get_bootstrap_brokers(
                    ClusterArn=ClusterArn
                )
                logger.info(response)
                if (response['ResponseMetadata']['HTTPStatusCode'] == 200):
                    responseStatus = cfnresponse.SUCCESS
                    responseData['BootstrapBrokerStringSaslScram'] = response['BootstrapBrokerStringSaslScram']
                
              except Exception:
                logger.exception('Signaling failure to CloudFormation.')
            cfnresponse.send(event, context, responseStatus, responseData)
            return
      FunctionName: !Sub "cfn-msk-bootstrap-brokers-${EnvironmentName}"
      Handler: index.lambda_handler
      Role: !GetAtt BootstrapBrokersFunctionExecutionRole.Arn
      Runtime: python3.10
      Timeout: 30

  BootstrapBrokers:
    Type: Custom::Function
    Properties:
      ServiceToken: !GetAtt BootstrapBrokersFunction.Arn
      ClusterArn: !Ref MSKCluster

  GlueConnectionMSK:
    Type: 'AWS::Glue::Connection'
    Properties:
      CatalogId: !Ref 'AWS::AccountId'
      ConnectionInput:
        Name: GlueMskConnection
        ConnectionProperties:  
          KAFKA_BOOTSTRAP_SERVERS: !GetAtt BootstrapBrokers.BootstrapBrokerStringSaslScram
          SECRET_ID: !Ref SecretArn
          KAFKA_SSL_ENABLED: true
          KAFKA_SASL_MECHANISM: SCRAM-SHA-512
        ConnectionType: KAFKA
        PhysicalConnectionRequirements:
          SecurityGroupIdList: 
            - !Ref SecurityGroupForGlueConnection
          SubnetId: !Ref PrivateSubnet1
          AvailabilityZone: !Ref AvailabilityZoneOfPrivateSubnet1
  
  GlueSchemaRegistry:
    Type: AWS::Glue::Registry
    Properties: 
      Description: Schema registry for test
      Name: !Ref SchemaRegistryName
      Tags: 
        - Key: AppName
          Value: !Sub '${EnvironmentName}-${AWS::StackName}'
  
  MSKPayloadSchema:
    Type: AWS::Glue::Schema
    Properties:
      Compatibility: "BACKWARD"
      DataFormat: "AVRO"
      Name: !Ref MSKSchemaName
      Registry: 
        Arn: !Ref GlueSchemaRegistry
      SchemaDefinition: >
        { "type": "record","namespace": "ABC_Organization","name": "Employee","fields": [{"name": "Name","type": "string"},{"name": "Age","type": "int"},{"name": "address","type": {"type": "record","name": "addressRecord","fields": [{"name": "street","type": "string"},{"name": "zipcode","type": "int" }]}}]}
  GlueDataBase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref 'AWS::AccountId'
      DatabaseInput:
        Name: !Ref GlueDataBaseName
  GlueTable:
    Type: AWS::Glue::Table
    Properties:
      DatabaseName: !Ref GlueDataBase
      CatalogId: !Ref 'AWS::AccountId'
      TableInput:    
        Name : !Ref GlueTableName
        Parameters:
          classification: avro
          connectionName: !Ref GlueConnectionMSK
        StorageDescriptor:
          SchemaReference:
            SchemaVersionId: !GetAtt MSKPayloadSchema.InitialSchemaVersionId
          Parameters: 
            connectionName: !Ref GlueConnectionMSK
            typeOfData: kafka
          SerdeInfo:
            SerializationLibrary: 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
            Parameters:
              serialization.format: 1
          InputFormat: 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
          OutputFormat: 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  GlueIamRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "Glue-ServiceRole-${EnvironmentName}"
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - glue.amazonaws.com
          Action:
          - sts:AssumeRole
      Path: '/'
      ManagedPolicyArns:
        -  "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
      Policies:
      - PolicyName: !Sub "glue-secretm-access-policy-${EnvironmentName}"
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
              - secretsmanager:GetSecretValue
            Resource: !Ref SecretArn
      Tags: 
        - Key: AppName
          Value: !Sub '${EnvironmentName}-${AWS::StackName}'
  GlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub "GlueCrawler-${EnvironmentName}"
      Role: !Ref GlueIamRole
      DatabaseName: !Ref GlueDataBase
      Targets:
        S3Targets:
          - Path: !Sub 's3://${S3BucketForOutput}/output/'
      SchemaChangePolicy:
        UpdateBehavior: "UPDATE_IN_DATABASE"
        DeleteBehavior: "LOG"
      Tags:
        "AppName": !Sub '${EnvironmentName}-${AWS::StackName}'
  
  GlueStreamingJob:
    Type: AWS::Glue::Job
    Properties:
      Name: !Sub "GlueStreamingJob-${EnvironmentName}"
      Connections:
        Connections:
          -  !Ref GlueConnectionMSK
      Command:
        Name: gluestreaming
        ScriptLocation: !Ref ScriptPath
        PythonVersion: "3"
      Description: "Glue straming job for processing MSK topic"
      ExecutionProperty:
        MaxConcurrentRuns: 1
      GlueVersion: "4.0"
      WorkerType: !Ref GlueWorkerType
      NumberOfWorkers: !Ref NumberOfWorkers
      DefaultArguments:
        "--enable-continuous-cloudwatch-log": "true"
        "--enable-job-insights": "true"
        "--enable-metrics": "true"
        "--enable-spark-ui": "true"
        "--job-bookmark-option": "job-bookmark-disable"
        "--spark-event-logs-path": !Sub "s3://${S3BucketForOutput}/eventlogs/"
        "--database_name": !Ref GlueDataBase
        "--table_name": !Ref GlueTable
        "--topic_name": !Ref TopicName
        "--dest_dir": !Sub "s3://${S3BucketForOutput}/"
      Role: !Ref GlueIamRole
      Tags:
        AppName: !Sub '${EnvironmentName}-${AWS::StackName}'
  
Outputs:
  MSKClusterArn:
    Description: ARN of created MSK cluster
    Value: !Ref MSKCluster
  
  MSKBootstrapServers:
    Description: MSK Bootstrap Servers
    Value: !GetAtt BootstrapBrokers.BootstrapBrokerStringSaslScram
  GlueJobName:
     Description: Glue job name
     Value: !Ref GlueStreamingJob
  GlueDatabase: 
    Description: Glue database name
    Value: !Ref GlueDataBase
  GlueTableName:
    Description: Glue table name
    Value: !Ref GlueTable
  GlueSchemaRegistryName:
     Description: Glue schema registry name
     Value: !Ref SchemaRegistryName
  SchemaName: 
    Description: "Schema registered on Schema registry. Schema definision is created per AWS Public documentation https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas"
    Value: !Ref MSKSchemaName
  OutputBucketName:
    Value: !Ref S3BucketForOutput
    Description: Name of the Amazon S3 bucket used to write Glue Streaming job output.
  GlueJobIAMRole:
    Value: !Ref GlueIamRole
    Description: IAM Role ARN for Glue job.
  GlueCrawlerName:
    Value: !Ref GlueCrawler
    Description: Name of the Crawler.