AWSTemplateFormatVersion: 2010-09-09 Parameters: KeyName: Description: Name of an existing EC2 KeyPair to enable SSH access to the instance Type: 'AWS::EC2::KeyPair::KeyName' ConstraintDescription: Can contain only ASCII characters. SSHLocation: Description: The IP address range that can be used to SSH to the EC2 instances Type: String MinLength: '9' MaxLength: '18' Default: 0.0.0.0/0 AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})' ConstraintDescription: Must be a valid IP CIDR range of the form x.x.x.x/x Mappings: SubnetConfig: VPC: CIDR: 10.0.0.0/16 PublicOne: CIDR: 10.0.0.0/24 PrivateOne: CIDR: 10.0.1.0/24 PrivateTwo: CIDR: 10.0.2.0/24 PrivateThree: CIDR: 10.0.3.0/24 RegionAMI: us-east-1: HVM64: ami-0d5eff06f840b45e9 us-west-2: HVM64: ami-0cf6f5c8a62fa5da6 Resources: VPC: Type: 'AWS::EC2::VPC' Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !FindInMap - SubnetConfig - VPC - CIDR Tags: - Key: Name Value: MskDemoVpc PublicSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PublicOne - CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: MskPublicSubnet PrivateSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateOne - CIDR Tags: - Key: Name Value: MskPrivateSubnetOne PrivateSubnetTwo: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 1 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateTwo - CIDR Tags: - Key: Name Value: MskPrivateSubnetTwo PrivateSubnetThree: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 2 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateThree - CIDR Tags: - Key: Name Value: MskPrivateSubnetThree InternetGateway: Type: 'AWS::EC2::InternetGateway' NatGatewayEIP: Type: AWS::EC2::EIP DependsOn: GatewayAttachement Properties: Domain: vpc NatGateway: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGatewayEIP.AllocationId SubnetId: !Ref PublicSubnetOne GatewayAttachement: Type: 'AWS::EC2::VPCGatewayAttachment' Properties: VpcId: !Ref VPC InternetGatewayId: !Ref InternetGateway PublicRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PublicRoute: Type: 'AWS::EC2::Route' DependsOn: GatewayAttachement Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PrivateRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC DefaultPrivateRoute: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway PrivateSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetOne PrivateSubnetTwoRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetTwo PrivateSubnetThreeRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetThree KafkaClientInstanceSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable SSH access via port 22 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !Ref SSHLocation MSKSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable TCP access via ports 2181, 9094, 9092 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 2181 ToPort: 2181 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9094 ToPort: 9094 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9092 ToPort: 9092 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId KafkaClientEC2Instance: DependsOn: MSKCluster Type: 'AWS::EC2::Instance' Properties: InstanceType: t2.medium KeyName: !Ref KeyName IamInstanceProfile: !Ref EC2InstanceProfile AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' SubnetId: !Ref PublicSubnetOne SecurityGroupIds: - !GetAtt - KafkaClientInstanceSecurityGroup - GroupId ImageId: !FindInMap - RegionAMI - !Ref 'AWS::Region' - HVM64 Tags: - Key: Name Value: KafkaClientInstance UserData: !Base64 > #!/bin/bash yum update -y yum install java-1.8.0-openjdk-devel -y yum install -y jq cd /home/ec2-user echo "export PATH=.local/bin:$PATH" >> .bash_profile mkdir kafka cd kafka wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.12-2.6.1.tgz tar -xzf kafka_2.12-2.6.1.tgz cd /home/ec2-user chown -R ec2-user ./kafka chgrp -R ec2-user ./kafka export REGION=$(curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region) export ZK=$(aws kafka list-clusters --region $REGION --output text | grep arn | grep CLUSTERINFOLIST | grep MSKCluster | awk '{print $9}') kafka/kafka_2.12-2.6.1/bin/kafka-topics.sh --create --zookeeper $ZK --replication-factor 3 --partitions 3 --topic twitter_input kafka/kafka_2.12-2.6.1/bin/kafka-topics.sh --create --zookeeper $ZK --replication-factor 3 --partitions 3 --topic twitter_output EC2Role: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonMSKFullAccess' - 'arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess' EC2InstanceProfile: Type: 'AWS::IAM::InstanceProfile' Properties: InstanceProfileName: EC2MSKCFProfile Roles: - !Ref EC2Role MSKCluster: Type: 'AWS::MSK::Cluster' Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo - !Ref PrivateSubnetThree InstanceType: kafka.t3.small SecurityGroups: - !GetAtt - MSKSecurityGroup - GroupId StorageInfo: EBSStorageInfo: VolumeSize: 100 ClusterName: MSKCluster EncryptionInfo: EncryptionInTransit: ClientBroker: TLS_PLAINTEXT InCluster: true EnhancedMonitoring: PER_TOPIC_PER_BROKER KafkaVersion: 2.6.1 NumberOfBrokerNodes: 3 EcrRepository1: Type: 'AWS::ECR::Repository' Properties: RepositoryName: "kafka-streams-msk" EcrRepository2: Type: 'AWS::ECR::Repository' Properties: RepositoryName: "twitter-stream-producer" Outputs: VPCId: Description: The ID of the VPC created Value: !Ref VPC PublicSubnetOne: Description: The name of the public subnet created Value: !Ref PublicSubnetOne PrivateSubnetOne: Description: The ID of private subnet one created Value: !Ref PrivateSubnetOne PrivateSubnetTwo: Description: The ID of private subnet two created Value: !Ref PrivateSubnetTwo PrivateSubnetThree: Description: The ID of private subnet three created Value: !Ref PrivateSubnetThree MSKSecurityGroupID: Description: The ID of the security group created for the MSK clusters Value: !GetAtt - MSKSecurityGroup - GroupId MSKClusterArn: Description: The Arn for the MSK cluster Value: !Ref MSKCluster