Description: This template creates Amazon MSK Cluster with 3 brokers. #=========== # Parameters #=========== Parameters: KeyName: Description: Name of an existing EC2 KeyPair to enable SSH access to the instance Type: 'AWS::EC2::KeyPair::KeyName' Default: DemoMSKKeyPair VPCStackName: Description: Name of the CFn Stack for VPC Stack Type: String Default: "ResourceForMSKDemo" MSKClusterName: Description: Name of the Amazon MSK Cluster Type: String Default: "DemoMSK" #VpcId: # Description: The ID of the VPC for security group # Type: String # Default: "" LatestAmiId: Type: 'AWS::SSM::Parameter::Value' Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' kafkaVersion: Description: Specify the desired Kafka software version Type: String Default: "2.8.1" AllowedValues: - 2.8.1 - 2.8.0 - 2.7.1 - 2.7.0 - 2.6.2 - 2.6.1 - 2.6.0 - 2.5.1 - 2.4.1.1 - 2.3.1 - 2.2.1 numberOfBrokerNodes: Description: The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets Type: Number Default: 3 #brokerNodeClientSubnets: # Description: A list of subnets to connect to in client VPC # Type: String # CommaDelimitedList # Default: "" brokerNodeInstanceType: Description: Specify the instance type to use for the kafka brokers. e.g. kafka.m5.large Type: String Default: "kafka.m5.large" #brokerNodeSecurityGroups: # Description: A list of the security groups to associate with the elastic network interfaces to control who can communicate with the cluster # Type: String # Default: "" #Conditions: #IsMSKClientSubnetEmpty: true # !Equals [!Ref brokerNodeClientSubnets, "" ] #IsMSKVPCEmpty: true # !Equals [!Ref VpcId, "" ] #IsBrokerSecGrpEmpty: true # !Equals [!Ref brokerNodeSecurityGroups, "" ] #=========== # Resources #=========== Resources: MSKLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: "DemoMSKCWLog" RetentionInDays: 7 MSKClusterCustomConfig: Type: AWS::MSK::Configuration Properties: Name: "TestConfigDemoMSK1" ServerProperties: | delete.topic.enable = true log.retention.hours = 8 zookeeper.session.timeout.ms = 18000 default.replication.factor = 3 min.insync.replicas = 2 num.io.threads =8 num.network.threads = 5 num.partitions = 1 num.replica.fetchers = 2 replica.lag.time.max.ms = 30000 socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 unclean.leader.election.enable = true #=== MSKIAMManagedPolicy: Type: AWS::IAM::ManagedPolicy Properties: Description: Managed Policy to access MSK using IAM attached to KafkaClientRole ManagedPolicyName: MSKConnectAuthentication Path: / PolicyDocument: Version: '2012-10-17' Statement: # At Cluster level permission - Effect: Allow Action: - kafka:Get* - kafka:Describe* - kafka:List* - kafka:Tag* Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':cluster/' - '*' # At Specific Cluster level permission - Effect: Allow Action: - kafka-cluster:Connect - kafka-cluster:AlterCluster - kafka-cluster:DescribeCluster - kafka-cluster:DescribeClusterDynamicConfiguration Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':cluster/' - !Ref MSKClusterName # ARN of the Cluster - '/*' # At Topic level permission - Effect: Allow Action: - kafka-cluster:*Topic* - kafka-cluster:WriteData - kafka-cluster:ReadData - kafka-cluster:Connect Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':topic/' - !Ref MSKClusterName - '/*/*' # At group level permission - Effect: Allow Action: - kafka-cluster:AlterGroup - kafka-cluster:DescribeGroup - kafka-cluster:Connect - kafka-cluster:DescribeTopic - kafka-cluster:ReadData Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':group/' - !Ref MSKClusterName - '/*/*' Roles: - "KafkaClientRole" MSKCluster: Type: 'AWS::MSK::Cluster' Properties: ClusterName: !Ref MSKClusterName # !Sub # - 'DemoMSKCluster-${RandomGUID}' # - { RandomGUID: !Select [0, !Split ["-", !Select [2, !Split ["/", !Ref AWS::StackId ]]]] } KafkaVersion: !Ref kafkaVersion ConfigurationInfo: Arn: !Ref MSKClusterCustomConfig Revision: 1 NumberOfBrokerNodes: !Ref numberOfBrokerNodes BrokerNodeGroupInfo: InstanceType: !Ref brokerNodeInstanceType # if no subnets are provided, defautl subnets of MSK VPC will be used # Should be multiple of no. of brokers defined #ClientSubnets: !If [ IsMSKClientSubnetEmpty, [ !ImportValue ResourceForMSKDemo-PrivateSubnetMSKOne, !ImportValue ResourceForMSKDemo-PrivateSubnetMSKTwo, !ImportValue ResourceForMSKDemo-PrivateSubnetMSKThree ], !Ref brokerNodeClientSubnets ] ClientSubnets: - Fn::ImportValue: !Sub ${VPCStackName}-PrivateSubnetMSKOne - Fn::ImportValue: !Sub ${VPCStackName}-PrivateSubnetMSKTwo - Fn::ImportValue: !Sub ${VPCStackName}-PrivateSubnetMSKThree #Fn::If: #- IsMSKClientSubnetEmpty #- - Fn::ImportValue: !Sub ${VPCStackName}-PrivateSubnetMSKOne # - Fn::ImportValue: !Sub ${VPCStackName}-PrivateSubnetMSKTwo # - Fn::ImportValue: !Sub ${VPCStackName}-PrivateSubnetMSKThree #- !Ref brokerNodeClientSubnets SecurityGroups: - Fn::ImportValue: !Sub ${VPCStackName}-KafkaClusterSecurityGroupId #- !Ref brokerNodeSecurityGroups #- !If [ IsBrokerSecGrpEmpty, !ImportValue ResourceForMSKDemo-KafkaClusterSecurityGroupId, !Ref brokerNodeSecurityGroups ] #Fn::If: # - IsBrokerSecGrpEmpty # - - Fn::ImportValue: !Sub ${VPCStackName}-KafkaClusterSecurityGroupId # - !Ref brokerNodeSecurityGroups StorageInfo: EBSStorageInfo: VolumeSize: 100 ClientAuthentication: Unauthenticated: Enabled: false Sasl: Iam: Enabled: True EncryptionInfo: EncryptionInTransit: ClientBroker: "TLS" ##"TLS_PLAINTEXT" InCluster: true EnhancedMonitoring: "PER_TOPIC_PER_PARTITION" OpenMonitoring: Prometheus: JmxExporter: EnabledInBroker: true NodeExporter: EnabledInBroker: true LoggingInfo: BrokerLogs: CloudWatchLogs: Enabled: true LogGroup: !Ref MSKLogGroup Outputs: MSKClusterArn: Description: ARN of created MSK cluster Value: !Ref MSKCluster