AWSTemplateFormatVersion: 2010-09-09 Parameters: KeyName: Description: Name of an existing EC2 KeyPair to enable SSH access to the instance Type: 'AWS::EC2::KeyPair::KeyName' ConstraintDescription: Can contain only ASCII characters. SSHLocation: Description: The IP address range that can be used to SSH to the EC2 instances Type: String MinLength: '9' MaxLength: '18' Default: 0.0.0.0/0 AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})' ConstraintDescription: Must be a valid IP CIDR range of the form x.x.x.x/x Mappings: SubnetConfig: VPC: CIDR: 40.0.0.0/16 PublicOne: CIDR: 40.0.0.0/24 PublicTwo: CIDR: 40.0.1.0/24 PrivateOne: CIDR: 40.0.2.0/24 PrivateTwo: CIDR: 40.0.3.0/24 PrivateThree: CIDR: 40.0.4.0/24 eks: Ipv4: CIDR: 10.0.0.0/24 RegionAMI: us-east-1: HVM64: ami-05577ed0e20b23acc us-east-2: HVM64: ami-024a56c3b615774ff us-west-1: HVM64: ami-0b4659d78cb302fd7 us-west-2: HVM64: ami-0401f0c3aba47c977 eu-west-1: HVM64: ami-0dd3a3853fdd6d881 eu-west-2: HVM64: ami-09185ed014c94f88f eu-west-3: HVM64: ami-09d2e5f499c96f3ec eu-north-1: HVM64: ami-01f6ea424a4df90ff eu-central-1: HVM64: ami-08d7c8a4e9c511008 ap-south-1: HVM64: ami-085f6b955743bf0ec ap-southeast-2: HVM64: ami-064f016bd6570e60b ap-southeast-1: HVM64: ami-0e50e3f9e217865a4 ap-northeast-1: HVM64: ami-022b0a1c51d8329d4 Resources: VPC: Type: 'AWS::EC2::VPC' Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !FindInMap - SubnetConfig - VPC - CIDR Tags: - Key: Name Value: eks-msk-aks-Vpc PublicSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PublicOne - CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: eks-msk-aks-PublicSubnetOne PublicSubnetTwo: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 1 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PublicTwo - CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: eks-msk-aks-PublicSubnetTwo PrivateSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateOne - CIDR Tags: - Key: Name Value: eks-msk-aks-PrivateSubnetOne PrivateSubnetTwo: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 1 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateTwo - CIDR Tags: - Key: Name Value: eks-msk-aks-PrivateSubnetTwo PrivateSubnetThree: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 2 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateThree - CIDR Tags: - Key: Name Value: eks-msk-aks-PrivateSubnetThree InternetGateway: Type: 'AWS::EC2::InternetGateway' NatGatewayOneEIP: Type: AWS::EC2::EIP DependsOn: GatewayAttachement Properties: Domain: vpc NatGatewayOne: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGatewayOneEIP.AllocationId SubnetId: !Ref PublicSubnetOne NatGatewayTwoEIP: Type: AWS::EC2::EIP DependsOn: GatewayAttachement Properties: Domain: vpc NatGatewayTwo: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGatewayTwoEIP.AllocationId SubnetId: !Ref PublicSubnetTwo GatewayAttachement: Type: 'AWS::EC2::VPCGatewayAttachment' Properties: VpcId: !Ref VPC InternetGatewayId: !Ref InternetGateway PublicRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PublicRoute: Type: 'AWS::EC2::Route' DependsOn: GatewayAttachement Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PublicSubnetTwoRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnetTwo RouteTableId: !Ref PublicRouteTable PrivateRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PrivateRoute: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGatewayOne PrivateSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetOne PrivateSubnetTwoRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetTwo PrivateSubnetThreeRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetThree KafkaClientInstanceSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable SSH access via port 22 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !Ref SSHLocation EksSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable TCP access via ports 443, 10250, 53 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 443 ToPort: 443 CidrIp: !Ref SSHLocation - IpProtocol: tcp FromPort: 10250 ToPort: 10250 CidrIp: !Ref SSHLocation - IpProtocol: tcp FromPort: 53 ToPort: 53 CidrIp: !Ref SSHLocation - IpProtocol: udp FromPort: 53 ToPort: 53 CidrIp: !Ref SSHLocation MSKSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable TCP access via ports 2181, 9094, 9092 VpcId: !Ref VPC GroupName: eks-msk-aks-msksg SecurityGroupIngress: - IpProtocol: tcp FromPort: 2181 ToPort: 2181 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9092 ToPort: 9098 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9092 ToPort: 9098 SourceSecurityGroupId: !GetAtt - EksSecurityGroup - GroupId ### Log group for the blog testing BlogLogGroup: Type: 'AWS::Logs::LogGroup' Properties: LogGroupName: eks-msk-aks RetentionInDays: 7 Tags: - Key: Name Value: eks-msk-aks-Loggroup KafkaClientEC2Instance: DependsOn: BlogMSKCluster Type: 'AWS::EC2::Instance' Properties: InstanceType: t2.medium KeyName: !Ref KeyName IamInstanceProfile: !Ref EC2InstanceProfile AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' SubnetId: !Ref PublicSubnetOne SecurityGroupIds: - !GetAtt - KafkaClientInstanceSecurityGroup - GroupId ImageId: !FindInMap - RegionAMI - !Ref 'AWS::Region' - HVM64 Tags: - Key: Name Value: eks-msk-aks-KafkaClientInstance UserData: !Base64 > #!/bin/bash yum update -y && yum install -y java-11-amazon-corretto && yum install -y jq export WORKING_DIR=/home/ec2-user cd $WORKING_DIR echo "export PATH=.local/bin:$PATH" >> .bash_profile mkdir kafka && cd kafka wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz tar -xzf kafka_2.12-2.6.2.tgz cd $WORKING_DIR chown -R ec2-user ./kafka chgrp -R ec2-user ./kafka export REGION=$(curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region) export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) export ZK=$(aws kafka list-clusters --region $REGION --output text | grep arn | grep CLUSTERINFOLIST | grep aws-MSKCluster | awk '{print $9}') wget -P $WORKING_DIR/kafka/kafka_2.12-2.6.2/lib https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar cat << EoF > kafka/kafka_2.12-2.6.2/bin/client.properties security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler EoF kafka/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --command-config client.properties --zookeeper $ZK --replication-factor 3 --partitions 3 --topic twitter_input curl -L https://downloads.datastax.com/kafka/kafka-connect-cassandra-sink.tar.gz -o kafka-connect-cassandra-sink.tar.gz tar zxf kafka-connect-cassandra-sink.tar.gz zip -r kafka-connect-cassandra-sink.zip kafka-connect-cassandra-sink-1.4.0 aws s3 cp --region $REGION kafka-connect-cassandra-sink.zip s3://blog-eks-msk-aks-$AWS_ACCOUNT_ID cat << EoF > ./kafka-sink-plugin.json { "name": "kafka-keyspaces-sink-plugin", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "arn:aws:s3:::blog-eks-msk-aks-$AWS_ACCOUNT_ID", "fileKey": "kafka-connect-cassandra-sink.zip" } } } EoF aws kafkaconnect create-custom-plugin --cli-input-json file://kafka-sink-plugin.json --region $REGION EKSRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: - "ec2.amazonaws.com" - "eks.amazonaws.com" Action: 'sts:AssumeRole' Path: / Policies: - PolicyName: msk-iam PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "kafka-cluster:*" Resource: '*' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonEKSClusterPolicy' - 'arn:aws:iam::aws:policy/AmazonEKSServicePolicy' - 'arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess' - 'arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy' - 'arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly' - 'arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy' EC2Role: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: - "ec2.amazonaws.com" - "kafkaconnect.amazonaws.com" - "delivery.logs.amazonaws.com" Action: 'sts:AssumeRole' Path: / Policies: - PolicyName: mskconnect PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "kafkaconnect:*" Resource: '*' - PolicyName: msk-iam PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "kafka-cluster:*" Resource: '*' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonMSKFullAccess' - 'arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess' - 'arn:aws:iam::aws:policy/AmazonS3FullAccess' MskRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: - "kafkaconnect.amazonaws.com" - "delivery.logs.amazonaws.com" Action: 'sts:AssumeRole' Path: / Policies: - PolicyName: mskconnect PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "kafkaconnect:*" Resource: '*' - PolicyName: msk-iam PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "kafka-cluster:*" Resource: '*' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonMSKFullAccess' - 'arn:aws:iam::aws:policy/AmazonS3FullAccess' EC2InstanceProfile: Type: 'AWS::IAM::InstanceProfile' Properties: InstanceProfileName: EC2MSKProfile Roles: - !Ref EC2Role BlogMSKCluster: Type: 'AWS::MSK::Cluster' Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo - !Ref PrivateSubnetThree InstanceType: kafka.t3.small SecurityGroups: - !GetAtt - MSKSecurityGroup - GroupId StorageInfo: EBSStorageInfo: VolumeSize: 50 ClusterName: aws-MSKCluster ClientAuthentication: Sasl: Iam: Enabled: true Unauthenticated: Enabled: true EncryptionInfo: EncryptionInTransit: ClientBroker: TLS_PLAINTEXT InCluster: true EnhancedMonitoring: PER_TOPIC_PER_BROKER KafkaVersion: 2.6.2 NumberOfBrokerNodes: 3 AppEcrRepository: Type: 'AWS::ECR::Repository' Properties: RepositoryName: "amazon-keyspaces-with-apache-kafka" EKSCluster: Type: AWS::EKS::Cluster Properties: Name: eks-twitter-cluster Version: "1.22" RoleArn: !GetAtt EKSRole.Arn ResourcesVpcConfig: SecurityGroupIds: - !GetAtt - EksSecurityGroup - GroupId SubnetIds: - !Ref PublicSubnetOne - !Ref PublicSubnetTwo EndpointPublicAccess: true EndpointPrivateAccess: true PublicAccessCidrs: [ !Ref SSHLocation ] KubernetesNetworkConfig: IpFamily: ipv4 ServiceIpv4Cidr: !FindInMap - eks - Ipv4 - CIDR Logging: ClusterLogging: EnabledTypes: - Type: api - Type: audit Tags: - Key: "Name" Value: "eks-msk-aks-ekscluster" EKSNodegroup: Type: 'AWS::EKS::Nodegroup' Properties: ClusterName: eks-twitter-cluster NodeRole: !GetAtt EKSRole.Arn NodegroupName: eks-compute ScalingConfig: MinSize: 2 DesiredSize: 2 MaxSize: 4 Labels: Name: eks-msk-aks-compute Subnets: - !Ref PublicSubnetOne - !Ref PublicSubnetTwo DependsOn : EKSCluster MSKConnectorSG: Type: 'AWS::EC2::SecurityGroupIngress' DependsOn: MSKSecurityGroup Properties: GroupId: !Ref MSKSecurityGroup IpProtocol: tcp FromPort: 2181 ToPort: 9142 SourceSecurityGroupId: !GetAtt - MSKSecurityGroup - GroupId EksClusterSG: Type: 'AWS::EC2::SecurityGroupIngress' DependsOn: EKSNodegroup Properties: GroupId: !Ref MSKSecurityGroup IpProtocol: tcp FromPort: 9092 ToPort: 9098 SourceSecurityGroupId: !GetAtt - EKSCluster - ClusterSecurityGroupId # KMS key for Amazon Keyspaces Encryption KmsKey: Type: "AWS::KMS::Key" Properties: EnableKeyRotation: true MultiRegion: false KeySpec: SYMMETRIC_DEFAULT KeyPolicy: Version: "2012-10-17" Statement: - Sid: "Enable IAM User Permissions" Effect: "Allow" Principal: AWS: Fn::Join: - "" - - "arn:aws:iam::" - Ref: "AWS::AccountId" - ":root" Action: "kms:*" Resource: "*" #Private VPC endpoint for Amazon Keyspaces access KeyspacesEndpoint: Type: AWS::EC2::VPCEndpoint DependsOn: MSKConnectorSG Properties: ServiceName: !Sub 'com.amazonaws.${AWS::Region}.cassandra' PrivateDnsEnabled: True SecurityGroupIds: - !Ref MSKSecurityGroup SubnetIds: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo - !Ref PrivateSubnetThree VpcEndpointType: Interface VpcId: !Ref VPC #Amazon Keyspaces Keyspace and Table Keyspace: Type: AWS::Cassandra::Keyspace Properties: KeyspaceName: aws_blog Tags: - Key: Name Value: eks-msk-aks TweetidTable: Type: AWS::Cassandra::Table DependsOn: Keyspace Properties: KeyspaceName: aws_blog TableName: tweet_by_tweet_id PartitionKeyColumns: - ColumnName: tweet_id ColumnType: bigint ClusteringKeyColumns: - Column: ColumnName: hashtag ColumnType: text OrderBy: DESC RegularColumns: - ColumnName: tweet_time ColumnType: timestamp - ColumnName: lang ColumnType: text - ColumnName: tweet ColumnType: TEXT - ColumnName: username ColumnType: TEXT DefaultTimeToLive: 720000 EncryptionSpecification: EncryptionType: CUSTOMER_MANAGED_KMS_KEY KmsKeyIdentifier: !Ref KmsKey Tags: - Key: Name Value: eks-msk-aks UserTable: Type: AWS::Cassandra::Table DependsOn: Keyspace Properties: KeyspaceName: aws_blog TableName: tweet_by_user PartitionKeyColumns: - ColumnName: username ColumnType: text ClusteringKeyColumns: - Column: ColumnName: tweet_time ColumnType: timestamp OrderBy: DESC RegularColumns: - ColumnName: tweet_id ColumnType: bigint - ColumnName: lang ColumnType: text - ColumnName: tweet ColumnType: TEXT - ColumnName: hashtag ColumnType: TEXT DefaultTimeToLive: 720000 EncryptionSpecification: EncryptionType: CUSTOMER_MANAGED_KMS_KEY KmsKeyIdentifier: !Ref KmsKey Tags: - Key: Name Value: eks-msk-aks Outputs: keyspacesVPCId: Description: The ID of the VPC endpoint created Value: !Ref KeyspacesEndpoint PublicSubnetOne: Description: The name of the public subnet created Value: !Ref PublicSubnetOne PublicSubnetTwo: Description: The name of the public subnet created Value: !Ref PublicSubnetTwo PrivateSubnetOne: Description: The ID of private subnet one created Value: !Ref PrivateSubnetOne PrivateSubnetTwo: Description: The ID of private subnet two created Value: !Ref PrivateSubnetTwo PrivateSubnetThree: Description: The ID of private subnet three created Value: !Ref PrivateSubnetThree MSKSecurityGroupID: Description: The ID of the security group created for the MSK clusters Value: !GetAtt MSKSecurityGroup.GroupId EKSClusterSecurityID: Description: The ID of the security group created for the EKS clusters for node group Value: !GetAtt EKSCluster.ClusterSecurityGroupId EKSRoleID: Description: The Arn for the EKS Role Value: !GetAtt EKSRole.Arn EKSRolename: Description: The name of the EKS Role Value: !Ref EKSRole Ec2Rolename: Description: The name of the EC2 Role Value: !Ref EC2Role Ec2RoleID: Description: The Arn for the Ec2 Role Value: !GetAtt EC2Role.Arn MSKconnectRolename: Description: The name of the msk connect Role Value: !Ref MskRole MSKconnectRoleID: Description: The Arn of the msk connect Role Value: !GetAtt MskRole.Arn EKSClusterID: Description: The Arn for the EKS cluster Value: !Ref EKSCluster BlogMSKClusterArn: Description: The Arn for the MSK cluster Value: !Ref BlogMSKCluster kafkaclinetinstance: Description: The Arn for the MSK cluster Value: !Ref KafkaClientEC2Instance ECRRepository: Description: The ECR Repository Value: !GetAtt AppEcrRepository.RepositoryUri kmsid: Description: The KMS key created for the Amazon Keyspaces Encryption Value: !Ref KmsKey