AWSTemplateFormatVersion: '2010-09-09' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Network Configuration - Generic" Parameters: - WanttoDeployinNewVPC - VPCCidr - Label: default: "Network Configuration - For New VPC" Parameters: - PrivateSubnetMSKOneCidr - PrivateSubnetMSKTwoCidr - PrivateSubnetMSKThreeCidr - PublicOneCidr - Label: default: "Network Configuration - For Existing VPC" Parameters: - VPCID - SubnetID1 - SubnetID2 - SubnetID3 - PublicSubnetID - Label: default: "Amazon MSK Cluster Configuration" Parameters: - MSKClusterName - kafkaVersion - numberOfBrokerNodes - brokerNodeInstanceType - Label: default: "Prometheus Configuration" Parameters: - KeyName - LatestAmiId ParameterLabels: WanttoDeployinNewVPC: default: "Stack to be depployed in NEW VPC? (true/false) - if false, you MUST provide VPCCidr and other details under Existing VPC section" Parameters: VPCCidr: Description: VPC CIDR for the deploying the stackcs Type: String Default: '' PublicOneCidr: Description: CIDR for the public subnet Default: '' Type: String PrivateSubnetMSKOneCidr: Description: CIDR for the private Subnet-1 Type: String Default: '' PrivateSubnetMSKTwoCidr: Description: CIDR for the private Subnet-2 Type: String Default: '' PrivateSubnetMSKThreeCidr: Description: CIDR for the private Subnet-3 Type: String Default: '' SubnetID1: Description: Subnet ID-1 from the existing VPC Type: String SubnetID2: Description: Subnet ID-2 from the existing VPC Type: String SubnetID3: Description: Subnet ID-3 from the existing VPC Type: String PublicSubnetID: Description: Subnet ID-4 from the existing VPC Type: String VPCID: Description: VPC ID for the existing VPC Type: String MSKClusterName: Description: Name of the Amazon MSK Cluster Type: String Default: "DemoMSK" kafkaVersion: Description: Specify the desired Kafka software version Type: String Default: "2.8.1" AllowedValues: - 2.8.1 - 2.8.0 - 2.7.1 - 2.7.0 - 2.6.2 - 2.6.1 - 2.6.0 - 2.5.1 - - 2.3.1 - 2.2.1 numberOfBrokerNodes: Description: The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets Type: Number Default: 3 brokerNodeInstanceType: Description: Specify the instance type to use for the kafka brokers. e.g. kafka.m5.large Type: String Default: "kafka.m5.large" KeyName: Description: Name of an existing EC2 KeyPair to enable SSH access to the instance Type: AWS::EC2::KeyPair::KeyName Default: DemoMSKKeyPair ConstraintDescription: Can contain only ASCII characters. LatestAmiId: Type: 'AWS::SSM::Parameter::Value' Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' WanttoDeployinNewVPC: Type: String Description: Specify if you want to deploy in the exising VPC or new VPC Default: "true" AllowedValues: - "true" - "false" Conditions: ShouldCreateResource: !Equals [true, !Ref WanttoDeployinNewVPC] #========= Resources: #========= #======== VPC Related Resource Creation - Begin ========= VPC: Type: AWS::EC2::VPC Condition: ShouldCreateResource Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !Ref VPCCidr Tags: - Key: 'Name' Value: 'DemoMSKVPC' PublicSubnetOne: Type: AWS::EC2::Subnet Condition: ShouldCreateResource Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'VPC' CidrBlock: !Ref PublicOneCidr MapPublicIpOnLaunch: true Tags: - Key: 'Name' Value: 'PublicSubnet' PrivateSubnetMSKOne: Type: AWS::EC2::Subnet Condition: ShouldCreateResource Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'VPC' CidrBlock: !Ref PrivateSubnetMSKOneCidr MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: 'PrivateSubnetMSKOne' PrivateSubnetMSKTwo: Type: AWS::EC2::Subnet Condition: ShouldCreateResource Properties: AvailabilityZone: Fn::Select: - 1 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'VPC' CidrBlock: !Ref PrivateSubnetMSKTwoCidr MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: 'PrivateSubnetMSKTwo' PrivateSubnetMSKThree: Type: AWS::EC2::Subnet Condition: ShouldCreateResource Properties: AvailabilityZone: Fn::Select: - 2 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'VPC' CidrBlock: !Ref PrivateSubnetMSKThreeCidr MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: 'PrivateSubnetMSKThree' InternetGateway: Type: AWS::EC2::InternetGateway Condition: ShouldCreateResource GatewayAttachement: Type: AWS::EC2::VPCGatewayAttachment Condition: ShouldCreateResource Properties: VpcId: !Ref 'VPC' InternetGatewayId: !Ref 'InternetGateway' NATEIP: Type: AWS::EC2::EIP Condition: ShouldCreateResource DependsOn: GatewayAttachement Properties: Domain: vpc NATGateway: Type: AWS::EC2::NatGateway Condition: ShouldCreateResource Properties: AllocationId: !GetAtt NATEIP.AllocationId SubnetId: !Ref 'PublicSubnetOne' Tags: - Key: 'Name' Value: 'ConfluentKafkaNATGateway' PublicRouteTable: Type: AWS::EC2::RouteTable Condition: ShouldCreateResource Properties: VpcId: !Ref 'VPC' PublicRoute: Type: AWS::EC2::Route Condition: ShouldCreateResource DependsOn: GatewayAttachement Properties: RouteTableId: !Ref 'PublicRouteTable' DestinationCidrBlock: '' GatewayId: !Ref 'InternetGateway' PublicSubnetOneRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Condition: ShouldCreateResource Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PrivateRouteTable: Type: AWS::EC2::RouteTable Condition: ShouldCreateResource Properties: VpcId: !Ref 'VPC' PrivateRoute: Type: AWS::EC2::Route Condition: ShouldCreateResource DependsOn: NATGateway Properties: RouteTableId: !Ref 'PrivateRouteTable' DestinationCidrBlock: '' NatGatewayId: !Ref 'NATGateway' PrivateSubnetMSKOneRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Condition: ShouldCreateResource Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetMSKOne PrivateSubnetMSKTwoRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Condition: ShouldCreateResource Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetMSKTwo PrivateSubnetMSKThreeRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Condition: ShouldCreateResource Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetMSKThree #======== VPC Related Resource Creation - End ========= KafkaClientInstanceSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Enable SSH access via port 22 from BastionHostSecurityGroup #VpcId: !Ref VPCID VpcId: !If [ShouldCreateResource, !Ref VPC, !Ref VPCID] SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !If [ShouldCreateResource, !Ref PublicOneCidr, !Ref VPCCidr] #!Ref PublicOneCidr #VPCCidr - IpProtocol: tcp FromPort: 3500 ToPort: 3500 CidrIp: !Ref PublicOneCidr - IpProtocol: tcp FromPort: 3600 ToPort: 3600 CidrIp: !Ref PublicOneCidr - IpProtocol: tcp FromPort: 3800 ToPort: 3800 CidrIp: !Ref PublicOneCidr - IpProtocol: tcp FromPort: 3900 ToPort: 3900 CidrIp: !Ref PublicOneCidr #KafkaClientIngressSSH: # Type: AWS::EC2::SecurityGroupIngress # DependsOn: KafkaClientInstanceSecurityGroup # Properties: # Description: SSH Access # GroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId # IpProtocol: tcp # FromPort: 22 # ToPort: 22 # SourceSecurityGroupId: !GetAtt PrometheusServerSecurityGroup.GroupId PrometheusServerSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Enable SSH access via port 22 from BastionHostSecurityGroup VpcId: !If [ShouldCreateResource, !Ref VPC, !Ref 'VPCID'] #!Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !If [ShouldCreateResource, !Ref PublicOneCidr, !Ref VPCCidr] #!Ref PublicOneCidr - IpProtocol: tcp FromPort: 9090 ToPort: 9090 CidrIp: KafkaClusterSecurityGroup: Type: AWS::EC2::SecurityGroup DependsOn: KafkaClientInstanceSecurityGroup Properties: GroupDescription: Access to the Kafka service on the MSK cluster VpcId: !If [ShouldCreateResource, !Ref VPC, !Ref VPCID] #!Ref VPC KafkaCluserSGIngressMonitoring: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClusterSecurityGroup Properties: Description: Prometheus-Monitoring GroupId: !GetAtt KafkaClusterSecurityGroup.GroupId IpProtocol: tcp FromPort: 11001 ToPort: 11002 SourceSecurityGroupId: !GetAtt PrometheusServerSecurityGroup.GroupId KafkaClusterSGIngressPlainText: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClusterSecurityGroup Properties: Description: Plaintext Kafka GroupId: !GetAtt KafkaClusterSecurityGroup.GroupId IpProtocol: tcp FromPort: 9094 ToPort: 9094 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClusterSGIngressEncrypted: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClusterSecurityGroup Properties: Description: Encrypted Kafka GroupId: !GetAtt KafkaClusterSecurityGroup.GroupId IpProtocol: tcp FromPort: 9092 ToPort: 9092 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClusterSGIngressIAMAuth: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClusterSecurityGroup Properties: Description: IAM Authentication Kafka GroupId: !GetAtt KafkaClusterSecurityGroup.GroupId IpProtocol: tcp FromPort: 9098 ToPort: 9098 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClusterSGIngressZookeeper: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClusterSecurityGroup Properties: Description: Zookeeper Access GroupId: !GetAtt KafkaClusterSecurityGroup.GroupId IpProtocol: tcp FromPort: 2181 ToPort: 2181 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClientInstanceSecurityGroup8081: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClientInstanceSecurityGroup Properties: Description: Enable access to Schema Registry inside the KafkaClientInstanceSecurityGroup GroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId IpProtocol: tcp FromPort: 8081 ToPort: 8081 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClientInstanceSecurityGroup8083: Type: AWS::EC2::SecurityGroupIngress DependsOn: KafkaClientInstanceSecurityGroup Properties: Description: Enable access to Kafka Connect inside the KafkaClientInstanceSecurityGroup GroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId IpProtocol: tcp FromPort: 8083 ToPort: 8083 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId Cloud9EC2Bastion: Type: AWS::Cloud9::EnvironmentEC2 Properties: AutomaticStopTimeMinutes: 600 Description: "Cloud9 EC2 environment" InstanceType: t3.medium Name: !Sub "${AWS::StackName}-Cloud9EC2Bastion" SubnetId: !If [ShouldCreateResource, !Ref PublicSubnetOne, !Ref PublicSubnetID] #!Ref PublicSubnetOne Tags: - Key: 'Purpose' Value: 'Cloud9EC2BastionHostInstance' KafkaClientEC2Instance: Type: AWS::EC2::Instance Properties: InstanceType: t3.medium KeyName: !Ref 'KeyName' IamInstanceProfile: !Ref KafkaClientProfile AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} SubnetId: !If [ShouldCreateResource, !Ref PrivateSubnetMSKOne, !Ref SubnetID1] # !Ref PrivateSubnetMSKOne SecurityGroupIds: [!GetAtt KafkaClientInstanceSecurityGroup.GroupId] ImageId: !Ref LatestAmiId Tags: - Key: 'Name' Value: 'KafkaClientInstance' UserData: Fn::Base64: !Sub | #!/bin/bash yum update -y yum install python3.7 -y yum install java-1.8.0-openjdk-devel -y yum install nmap-ncat -y yum install git -y yum erase awscli -y yum install jq -y amazon-linux-extras install docker -y service docker start usermod -a -G docker ec2-user cd /home/ec2-user wget su -c "python3.7 --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install boto3 --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install awscli --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install kafka-python --user" -s /bin/sh ec2-user # install AWS CLI 2 - access with aws2 curl "" -o "" unzip ./aws/install -b /usr/local/bin/aws2 su -c "ln -s /usr/local/bin/aws2/aws ~/.local/bin/aws2" -s /bin/sh ec2-user # Create dirs, get Apache Kafka 2.7.0 and unpack it su -c "mkdir -p kafka270 confluent" -s /bin/sh ec2-user cd /home/ec2-user ln -s /home/ec2-user/kafka270 /home/ec2-user/kafka cd kafka270 su -c "wget" -s /bin/sh ec2-user su -c "tar -xzf kafka_2.12-2.7.0.tgz --strip 1" -s /bin/sh ec2-user # Get Confluent Community and unpack it cd /home/ec2-user cd confluent su -c "wget" -s /bin/sh ec2-user su -c "tar -xzf confluent-community-5.4.1-2.12.tar.gz --strip 1" -s /bin/sh ec2-user # Initialize the Kafka cert trust store su -c 'find /usr/lib/jvm/ -name "cacerts" -exec cp {} /tmp/kafka.client.truststore.jks \;' -s /bin/sh ec2-user cd /tmp su -c "mkdir -p kafka" -s /bin/sh ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/producer.properties_msk /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/jars/KafkaClickstreamClient-1.0-SNAPSHOT.jar /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/jars/KafkaClickstreamConsumer-1.0-SNAPSHOT.jar /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/jars/CustomMM2ReplicationPolicy-1.0-SNAPSHOT.jar /home/ec2-user/confluent/share/java/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/jars/MM2GroupOffsetSync-1.0-SNAPSHOT.jar /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/MSKLabs/schema-registry-ssl/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/AuthMSK-1.0-SNAPSHOT.jar /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/ /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/mm2-msc.json /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/mm2-hbc.json /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/mm2-cpc.json /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/mm2-cpc-cust-repl-policy.json /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/mm2-msc-cust-repl-policy.json /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/kafka-connect.yml /home/ec2-user/prometheus" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/kafka-producer-consumer.yml /home/ec2-user/prometheus" -l ec2-user # Setup unit in systemd for Schema Registry echo -n " [Unit] Description=Confluent Schema Registry [Service] Type=simple User=ec2-user ExecStart=/bin/sh -c '/home/ec2-user/confluent/bin/schema-registry-start /tmp/kafka/ > /tmp/kafka/schema-registry.log 2>&1' ExecStop=/home/ec2-user/confluent/bin/schema-registry-stop Restart=on-abnormal [Install]" > /etc/systemd/system/confluent-schema-registry.service # Setup unit in systemd for Kafka Connect echo -n " [Unit] Description=Kafka Connect [Service] Type=simple User=ec2-user ExecStart=/bin/sh -c '/home/ec2-user/kafka/bin/ /tmp/kafka/ > /tmp/kafka/kafka-connect.log 2>&1' Restart=on-abnormal [Install]" > /etc/systemd/system/kafka-connect.service #setup bash env su -c "echo 'export PS1=\"KafkaClientEC2Instance1 [\u@\h \W\\]$ \"' >> /home/ec2-user/.bash_profile" -s /bin/sh ec2-user su -c "echo '[ -f /tmp/kafka/setup_env ] && . /tmp/kafka/setup_env' >> /home/ec2-user/.bash_profile" -s /bin/sh ec2-user # Configure - Needed for IAM Authentication mode of Amazon MSK su -c "cp /usr/lib/jvm/jre-1.8.0-openjdk- /tmp/kafka.client.truststore.jks" -s /bin/sh ec2-user su -c "echo ssl.truststore.location=/tmp/kafka.client.truststore.jks >> /home/ec2-user/kafka/config/" -s /bin/sh ec2-user su -c "echo security.protocol=SASL_SSL >> /home/ec2-user/kafka/config/" -s /bin/sh ec2-user su -c "echo sasl.mechanism=AWS_MSK_IAM >> /home/ec2-user/kafka/config/" -s /bin/sh ec2-user su -c "echo required\; >> /home/ec2-user/kafka/config/" -s /bin/sh ec2-user su -c "echo >> /home/ec2-user/kafka/config/" -s /bin/sh ec2-user # Download jar file for MSK IAM Authentication cd /home/ec2-user/kafka/libs su -c "wget" -s /bin/sh ec2-user PrometheusEC2Instance: Type: AWS::EC2::Instance Properties: InstanceType: t3.medium KeyName: !Ref 'KeyName' IamInstanceProfile: !Ref PrometheusProfile AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} SubnetId: !If [ShouldCreateResource, !Ref PrivateSubnetMSKOne, !Ref SubnetID1] #!Ref PrivateSubnetMSKOne SecurityGroupIds: [!GetAtt PrometheusServerSecurityGroup.GroupId] ImageId: !Ref LatestAmiId Tags: - Key: 'Name' Value: 'Prometheus_Server' UserData: Fn::Base64: !Sub | #!/bin/bash yum update -y yum install python3.7 -y yum install java-1.8.0-openjdk-devel -y yum install nmap-ncat -y yum install git -y yum erase awscli -y yum install jq -y amazon-linux-extras install docker -y service docker start usermod -a -G docker ec2-user PrometheusRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: Action: 'sts:AssumeRole' RoleName: !Sub "PrometheusAMPRole-${AWS::AccountId}-${AWS::Region}" Description: Role to access AMP Workspace PrometheusProfile: Type: AWS::IAM::InstanceProfile Properties: InstanceProfileName: !Join - '-' - - 'Ec2MSSKCFProfile' - !Ref 'AWS::AccountId' - !Ref 'AWS::Region' Roles: - !Ref PrometheusRole #==== # Managed Policy for Prometheus Server #==== PrometheusManagedPolicy: Type: AWS::IAM::ManagedPolicy DependsOn: PrometheusRole # Ensure Role is created first Properties: Description: Managed Policy to access prometheus Workspace ManagedPolicyName: !Sub "PrometheusAccessToAMP-${AWS::AccountId}-${AWS::Region}" Path: / PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - aps:GetLabels - aps:GetMetricMetadata - aps:GetSeries - aps:QueryMetrics - aps:RemoteWrite Resource: "*" Roles: - !Sub "PrometheusAMPRole-${AWS::AccountId}-${AWS::Region}" #==== KafkaClientRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' Description: Role to access MSK Using IAM RoleName: !Sub "KafkaClientRole-${AWS::AccountId}-${AWS::Region}" KafkaClientProfile: Type: AWS::IAM::InstanceProfile Properties: InstanceProfileName: !Join - '-' - - 'KafkaClientRole' - !Ref 'AWS::AccountId' - !Ref 'AWS::Region' Roles: - !Ref KafkaClientRole #===== MSKLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: "DemoMSKCWLog" RetentionInDays: 1 MSKClusterCustomConfig: Type: AWS::MSK::Configuration Properties: Name: "TestConfigDemoMSK" ServerProperties: | log.retention.hours = 8 = 18000 default.replication.factor = 3 min.insync.replicas = 2 =8 = 5 num.partitions = 1 num.replica.fetchers = 2 = 30000 socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 unclean.leader.election.enable = true #=== MSKIAMManagedPolicy: Type: AWS::IAM::ManagedPolicy DependsOn: KafkaClientRole Properties: Description: Managed Policy to access MSK using IAM attached to KafkaClientRole ManagedPolicyName: !Sub "MSKConnectAuthentication-${AWS::AccountId}-${AWS::Region}" Path: / PolicyDocument: Version: '2012-10-17' Statement: # At Cluster level permission - Effect: Allow Action: - kafka:Get* - kafka:Describe* - kafka:List* - kafka:Tag* Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':cluster/' - '*' # At Specific Cluster level permission - Effect: Allow Action: - kafka-cluster:Connect - kafka-cluster:AlterCluster - kafka-cluster:DescribeCluster - kafka-cluster:DescribeClusterDynamicConfiguration Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':cluster/' - !Ref MSKClusterName # ARN of the Cluster - '/*' # At Topic level permission - Effect: Allow Action: - kafka-cluster:*Topic* - kafka-cluster:WriteData - kafka-cluster:ReadData - kafka-cluster:Connect Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':topic/' - !Ref MSKClusterName - '/*/*' # At group level permission - Effect: Allow Action: - kafka-cluster:AlterGroup - kafka-cluster:DescribeGroup - kafka-cluster:Connect - kafka-cluster:DescribeTopic - kafka-cluster:ReadData Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':group/' - !Ref MSKClusterName - '/*/*' Roles: - !Sub "KafkaClientRole-${AWS::AccountId}-${AWS::Region}" MSKCluster: Type: 'AWS::MSK::Cluster' Properties: ClusterName: !Ref MSKClusterName KafkaVersion: !Ref kafkaVersion ConfigurationInfo: Arn: !Ref MSKClusterCustomConfig Revision: 1 NumberOfBrokerNodes: !Ref numberOfBrokerNodes BrokerNodeGroupInfo: InstanceType: !Ref brokerNodeInstanceType #ClientSubnets: [!Ref 'PrivateSubnetMSKOne', !Ref 'PrivateSubnetMSKTwo', !Ref 'PrivateSubnetMSKThree'] ClientSubnets: [!If [ShouldCreateResource, !Ref PrivateSubnetMSKOne, !Ref SubnetID1], !If [ShouldCreateResource, !Ref PrivateSubnetMSKTwo, !Ref SubnetID2], !If [ShouldCreateResource, !Ref PrivateSubnetMSKThree, !Ref SubnetID3]] SecurityGroups: [!GetAtt KafkaClusterSecurityGroup.GroupId] StorageInfo: EBSStorageInfo: VolumeSize: 100 ClientAuthentication: Unauthenticated: Enabled: false Sasl: Iam: Enabled: True EncryptionInfo: EncryptionInTransit: ClientBroker: "TLS" ##"TLS_PLAINTEXT" InCluster: true EnhancedMonitoring: "PER_TOPIC_PER_PARTITION" OpenMonitoring: Prometheus: JmxExporter: EnabledInBroker: true NodeExporter: EnabledInBroker: true LoggingInfo: BrokerLogs: CloudWatchLogs: Enabled: true LogGroup: !Ref MSKLogGroup #===== #======== Outputs: #======== VPCId: Description: The ID of the VPC created Value: !If [ShouldCreateResource, !Ref VPC, !Ref VPCID] #!Ref 'VPC' PublicSubnetOne: Description: The name of the public subnet created Value: !If [ShouldCreateResource, !Ref PublicSubnetOne, !Ref PublicSubnetID] #!Ref 'PublicSubnetOne' PrivateSubnetMSKOne: Description: The ID of private subnet one created Value: !If [ShouldCreateResource, !Ref PrivateSubnetMSKOne, !Ref SubnetID1] #!Ref 'PrivateSubnetMSKOne' PrivateSubnetMSKTwo: Description: The ID of private subnet two created Value: !If [ShouldCreateResource, !Ref PrivateSubnetMSKTwo, !Ref SubnetID2] #!Ref 'PrivateSubnetMSKTwo' PrivateSubnetMSKThree: Description: The ID of private subnet three created Value: !If [ShouldCreateResource, !Ref PrivateSubnetMSKThree, !Ref SubnetID3] #!Ref 'PrivateSubnetMSKThree' VPCStackName: Description: The name of the VPC Stack Value: !Ref 'AWS::StackName' KafkaClientEC2InstanceSecurityGroupId: Description: The security group id for the EC2 instance Value: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClusterSecurityGroupId: Description: The security group id for the MSK Cluster Value: !GetAtt KafkaClusterSecurityGroup.GroupId MSKClusterArn: Description: ARN of created MSK cluster Value: !Ref MSKCluster KafkaClientPrivateIP: Description: The Private IP of the Kafka Client Value: !GetAtt KafkaClientEC2Instance.PrivateIp PrometheusInstancePrivateIP: Description: The Private IP of the Prometheus server Value: !GetAtt PrometheusEC2Instance.PrivateIp SSHPrometheusInstance: Description: SSH command for Prometheus instance Value: !Join ["" , [ "ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@", !GetAtt PrometheusEC2Instance.PrivateIp ]] SSHKafkaClientEC2Instance: Description: SSH command for Kafka the EC2 instance Value: !Join ["" , [ "ssh -i /home/ec2-user/environment/EC2KeyMSKDemo ec2-user@", !GetAtt KafkaClientEC2Instance.PrivateIp ]]