AWSTemplateFormatVersion: '2010-09-09' Parameters: OpenSearchMasterUserName: Description: Amazon OpenSearch Service - Username Default: "aosadmin" Type: String AssetsBucketName: Description: Assets bucket name Type: String LambdaKey: Description: Object key for Lambda Code Type: String KdaAppKey: Description: Object key for KDA Code Type: String Mappings: SubnetConfig: VPC: CIDR: '10.0.0.0/16' PublicOne: CIDR: '10.0.0.0/24' PrivateSubnetMSKOne: CIDR: '10.0.1.0/24' PrivateSubnetMSKTwo: CIDR: '10.0.2.0/24' PrivateSubnetMSKThree: CIDR: '10.0.3.0/24' AWSInstanceType2Arch: t2.small: Arch: HVM64 AWSRegionArch2AMI: us-east-1: HVM64: ami-03ededff12e34e59e us-east-2: HVM64: ami-0c7478fd229861c57 us-west-1: HVM64: ami-06542a822d33e2e40 us-west-2: HVM64: ami-0b36cd6786bcfe120 sa-east-1: HVM64: ami-00d10ca79f70a302a Resources: VPC: Type: AWS::EC2::VPC Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !FindInMap [ 'SubnetConfig', 'VPC', 'CIDR' ] Tags: - Key: 'Name' Value: 'MMVPC' PublicSubnetOne: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: { Ref: 'AWS::Region' } VpcId: !Ref 'VPC' CidrBlock: !FindInMap [ 'SubnetConfig', 'PublicOne', 'CIDR' ] MapPublicIpOnLaunch: true Tags: - Key: 'Name' Value: 'MMPublicSubnet' PrivateSubnetMSKOne: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: { Ref: 'AWS::Region' } VpcId: !Ref 'VPC' CidrBlock: !FindInMap [ 'SubnetConfig', 'PrivateSubnetMSKOne', 'CIDR' ] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: 'PrivateSubnetMSKOne' PrivateSubnetMSKTwo: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 1 - Fn::GetAZs: { Ref: 'AWS::Region' } VpcId: !Ref 'VPC' CidrBlock: !FindInMap [ 'SubnetConfig', 'PrivateSubnetMSKTwo', 'CIDR' ] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: 'PrivateSubnetMSKTwo' PrivateSubnetMSKThree: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 2 - Fn::GetAZs: { Ref: 'AWS::Region' } VpcId: !Ref 'VPC' CidrBlock: !FindInMap [ 'SubnetConfig', 'PrivateSubnetMSKThree', 'CIDR' ] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: 'PrivateSubnetMSKThree' InternetGateway: Type: AWS::EC2::InternetGateway GatewayAttachement: Type: AWS::EC2::VPCGatewayAttachment Properties: VpcId: !Ref VPC InternetGatewayId: !Ref InternetGateway NatGateway: Type: AWS::EC2::NatGateway DependsOn: NatPublicIP Properties: AllocationId: !GetAtt NatPublicIP.AllocationId SubnetId: !Ref PublicSubnetOne NatPublicIP: Type: AWS::EC2::EIP DependsOn: VPC Properties: Domain: VPC PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC PublicRoute: Type: AWS::EC2::Route DependsOn: GatewayAttachement Properties: RouteTableId: !Ref 'PublicRouteTable' DestinationCidrBlock: '0.0.0.0/0' GatewayId: !Ref 'InternetGateway' PublicSubnetOneRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PrivateRoute: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway PrivateRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC PrivateSubnetMSKOneRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetMSKOne PrivateSubnetMSKTwoRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetMSKTwo PrivateSubnetMSKThreeRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetMSKThree KDASecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Enable KDA access VpcId: !Ref VPC OpenSearchSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Enable Amazon OpenSearch access VpcId: !Ref 'VPC' SecurityGroupIngress: - IpProtocol: tcp FromPort: 443 ToPort: 443 SourceSecurityGroupId: !GetAtt KDASecurityGroup.GroupId - IpProtocol: tcp FromPort: 443 ToPort: 443 SourceSecurityGroupId: !GetAtt NGINXPlusInstancesSecurityGroup.GroupId - IpProtocol: tcp FromPort: 443 ToPort: 443 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId KafkaClientInstanceSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: KafkaClient Security Group VpcId: !Ref 'VPC' ProducerECSTaskSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Security Group for ECS task running producer application VpcId: !Ref 'VPC' MSKSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: MSK Security Group VpcId: !Ref 'VPC' SecurityGroupIngress: - IpProtocol: tcp FromPort: 9098 ToPort: 9098 SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId - IpProtocol: tcp FromPort: 9098 ToPort: 9098 SourceSecurityGroupId: !GetAtt ProducerECSTaskSecurityGroup.GroupId - IpProtocol: tcp FromPort: 9098 ToPort: 9098 SourceSecurityGroupId: !GetAtt KDASecurityGroup.GroupId - IpProtocol: -1 CidrIp: 0.0.0.0/0 KafkaClientEC2Instance: Type: AWS::EC2::Instance Properties: InstanceType: t3.medium IamInstanceProfile: !Ref EC2InstanceProfile AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: { Ref: 'AWS::Region' } SubnetId: !Ref 'PublicSubnetOne' SecurityGroupIds: [ !GetAtt KafkaClientInstanceSecurityGroup.GroupId ] ImageId: !FindInMap [ 'AWSRegionArch2AMI', !Ref 'AWS::Region', 'HVM64' ] Tags: - Key: 'Name' Value: 'KafkaClientInstance' BlockDeviceMappings: - DeviceName: /dev/xvda Ebs: VolumeSize: 20 VolumeType: gp2 DeleteOnTermination: true UserData: Fn::Base64: !Sub | #!/bin/bash yum update -y yum install python3.7 -y yum install java-1.8.0-openjdk-devel -y yum install nmap-ncat -y yum install git -y yum erase awscli -y yum install jq -y yum install maven -y amazon-linux-extras install docker -y service docker start usermod -a -G docker ec2-user cd /home/ec2-user wget https://bootstrap.pypa.io/get-pip.py su -c "python3.7 get-pip.py --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install boto3 --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install awscli --user" -s /bin/sh ec2-user # Install AWS CLI 2 - access with aws2 curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" unzip awscliv2.zip ./aws/install -b /usr/local/bin/aws2 su -c "ln -s /usr/local/bin/aws2/aws ~/.local/bin/aws2" -s /bin/sh ec2-user su -c "mkdir -p /tmp/kafka" -s /bin/sh ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/schema-registry.properties /tmp/kafka" -l ec2-user su -c "aws s3 cp s3://aws-streaming-artifacts/msk-lab-resources/producer.properties_msk /tmp/kafka" -l ec2-user # Pull & Build Clickstream producer cd /home/ec2-user git clone https://github.com/aws-samples/sasl-scram-secrets-manager-client-for-msk.git cd sasl-scram-secrets-manager-client-for-msk mvn clean install -f pom.xml cd /home/ec2-user git clone https://github.com/aws-samples/clickstream-producer-for-apache-kafka.git cd clickstream-producer-for-apache-kafka mvn clean package -f pom.xml mv target/KafkaClickstreamClient-1.0-SNAPSHOT.jar /tmp/kafka #Install kafka library cd /home/ec2-user mkdir -p kafka cd kafka wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar -xzf kafka_2.12-2.8.1.tgz --strip 1 #Install IAM Auth libraries cd /home/ec2-user/kafka/libs wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar cd /home/ec2-user cat < /home/ec2-user/kafka/config/client.properties security.protocol = SASL_SSL sasl.mechanism = AWS_MSK_IAM sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler EOF export CLASSPATH=/home/ec2-user/iam-auth/aws-msk-iam-auth-1.1.0-all.jar cat < /home/ec2-user/create-topics.sh /home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server \$BS --command-config /home/ec2-user/kafka/config/client.properties --create --topic clickstream --replication-factor 3 --partitions 3 /home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server \$BS --command-config /home/ec2-user/kafka/config/client.properties --create --topic Departments_Agg --replication-factor 3 --partitions 3 /home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server \$BS --command-config /home/ec2-user/kafka/config/client.properties --create --topic ClickEvents_UserId_Agg_Result --replication-factor 3 --partitions 3 /home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server \$BS --command-config /home/ec2-user/kafka/config/client.properties --create --topic User_Sessions_Aggregates_With_Order_Checkout --replication-factor 3 --partitions 3 /home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server \$BS --command-config /home/ec2-user/kafka/config/client.properties --list EOF chmod +x /home/ec2-user/create-topics.sh # wait until CF stack was created successfully aws --region ${AWS::Region} cloudformation wait stack-create-complete --stack-name '${AWS::StackName}' curl -u '${OpenSearchMasterUserName}:${RetrieveOpenSearchPassword.OpenSearchMasterPassword}' -X PATCH https://${OpenSearchServiceDomain.DomainEndpoint}/_plugins/_security/api/rolesmapping/all_access -H "Content-Type: application/json" -d '[{"op": "add", "path": "/backend_roles", "value": ["${KDARole.Arn}"]}]' # Create Amazon OpenSearch indices curl -s https://raw.githubusercontent.com/aws-samples/msk-serverless-data-pipeline/main/amazon-opensearch/index_mapping/departments_count.json| curl -u '${OpenSearchMasterUserName}:${RetrieveOpenSearchPassword.OpenSearchMasterPassword}' -s -w "\n" -XPUT https://${OpenSearchServiceDomain.DomainEndpoint}/departments_count -H "Content-Type: application/json" -d @- curl -s https://raw.githubusercontent.com/aws-samples/msk-serverless-data-pipeline/main/amazon-opensearch/index_mapping/user_session_counts.json| curl -u '${OpenSearchMasterUserName}:${RetrieveOpenSearchPassword.OpenSearchMasterPassword}' -s -w "\n" -XPUT https://${OpenSearchServiceDomain.DomainEndpoint}/user_session_counts -H "Content-Type: application/json" -d @- curl -s https://raw.githubusercontent.com/aws-samples/msk-serverless-data-pipeline/main/amazon-opensearch/index_mapping/user_session_details.json| curl -u '${OpenSearchMasterUserName}:${RetrieveOpenSearchPassword.OpenSearchMasterPassword}' -s -w "\n" -XPUT https://${OpenSearchServiceDomain.DomainEndpoint}/user_session_details -H "Content-Type: application/json" -d @- # Create OpenSearch Dashboard objects cd /home/ec2-user curl -s https://raw.githubusercontent.com/aws-samples/msk-serverless-data-pipeline/main/amazon-opensearch/dashboard/clickstream_dashboard.ndjson -o clickstream_dashboard.ndjson curl -XPOST https://${OpenSearchServiceDomain.DomainEndpoint}/_dashboards/auth/login -H "osd-xsrf: true" -H "content-type:application/json" -d '{"username":"${OpenSearchMasterUserName}", "password" : "${RetrieveOpenSearchPassword.OpenSearchMasterPassword}"} ' -c auth.txt curl -XPOST https://${OpenSearchServiceDomain.DomainEndpoint}/_dashboards/api/saved_objects/_import -H "osd-xsrf:true" -b auth.txt --form file=@clickstream_dashboard.ndjson EC2Role: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' Path: "/" Policies: - PolicyName: glue-schema-registry-access PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - glue:RegisterSchemaVersion - glue:GetSchemaVersion - glue:CreateSchema - glue:GetSchemaByDefinition Resource: '*' - PolicyName: msk-serverless-access PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - kafka-cluster:Connect Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/*/*' - Effect: Allow Action: - kafka-cluster:DescribeTopic - kafka-cluster:CreateTopic - kafka-cluster:WriteData - kafka-cluster:ReadData Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/*/*' - Effect: Allow Action: - kafka-cluster:AlterGroup - kafka-cluster:DescribeGroup Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/*/*' ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore - arn:aws:iam::aws:policy/AmazonSSMPatchAssociation EC2InstanceProfile: Type: AWS::IAM::InstanceProfile Properties: InstanceProfileName: !Join - '-' - - 'EC2FlinkMSKCFProfile' - !Ref 'AWS::StackName' Roles: - !Ref EC2Role NGINXPlusInstancesSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Enables access to NGINX Plus instances. VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: '443' ToPort: '443' CidrIp: '0.0.0.0/0' NGINXProxy: Type: AWS::EC2::Instance Properties: Tags: - Key: 'Name' Value: 'NGinxProxy' Metadata: AWS::CloudFormation::Init: configSets: bootstrap_install: - prepare_system - create_self_signed_cert - configure_nginx prepare_system: commands: install_nginx: command: amazon-linux-extras install -y nginx1 create_self_signed_cert: commands: create_self_signed_cert: command: sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout /etc/nginx/cert.key -out /etc/nginx/cert.crt -subj "/C=DE" configure_nginx: files: /etc/nginx/conf.d/default.conf: content: !Sub | server { listen 443 ssl; server_name $host; rewrite ^/$ https://$host/_dashboards redirect; # openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout /etc/nginx/cert.key -out /etc/nginx/cert.crt -subj /C=US/ST=./L=./O=./CN=.\n ssl_certificate /etc/nginx/cert.crt; ssl_certificate_key /etc/nginx/cert.key; ssl_session_cache builtin:1000 shared:SSL:10m; ssl_protocols TLSv1 TLSv1.1 TLSv1.2; ssl_ciphers HIGH:!aNULL:!eNULL:!EXPORT:!CAMELLIA:!D:!MD5:!PSK:!RC4; ssl_prefer_server_ciphers on; location /_dashboards { # Forward requests to OpenSearch Dashboards proxy_pass https://${OpenSearchServiceDomain.DomainEndpoint}/_dashboards; # Handle redirects to OpenSearch Dashboards proxy_redirect https://${OpenSearchServiceDomain.DomainEndpoint} https://$host; # Update cookie domain and path proxy_cookie_domain ${OpenSearchServiceDomain.DomainEndpoint} $host; proxy_cookie_path / /_dashboards/; # Response buffer settings proxy_buffer_size 128k; proxy_buffers 4 256k; proxy_busy_buffers_size 256k; } location ~ \/(log|sign|fav|forgot|change|saml|oauth2) { # Forward requests to OpenSearch proxy_pass https://${OpenSearchServiceDomain.DomainEndpoint}; # Handle redirects to Opensearch Dashboards proxy_redirect https://${OpenSearchServiceDomain.DomainEndpoint} https://$host; # Update cookie domain proxy_cookie_domain ${OpenSearchServiceDomain.DomainEndpoint} $host; } } services: sysvinit: nginx: enabled: 'true' ensureRunning: 'true' files: - /etc/nginx/conf.d/default.conf - /etc/nginx/conf.d/healthcheck.conf Properties: SubnetId: !Ref PublicSubnetOne ImageId: !FindInMap [ 'AWSRegionArch2AMI', !Ref 'AWS::Region', 'HVM64' ] InstanceType: t3.small SecurityGroupIds: - !Ref NGINXPlusInstancesSecurityGroup UserData: !Base64 Fn::Join: - '' - - "#!/bin/bash\n" - yum update -y - "\n" - !Sub '/opt/aws/bin/cfn-init -v --stack ${AWS::StackName} --resource NGINXProxy --configsets bootstrap_install --region ${AWS::Region}' - "\n" - !Sub '/opt/aws/bin/cfn-signal -e $? --stack ${AWS::StackName} --resource NGINXProxy --region ${AWS::Region}' - "\n" KDARole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: kinesisanalytics.amazonaws.com Action: 'sts:AssumeRole' Path: "/" Policies: - PolicyName: glue-schema-registry-access PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - glue:RegisterSchemaVersion - glue:GetSchemaVersion - glue:CreateSchema - glue:GetSchemaByDefinition Resource: '*' - PolicyName: vpc-read-only-permissions PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - ec2:DescribeVpcs - ec2:DescribeSubnets - ec2:DescribeSecurityGroups - ec2:DescribeDhcpOptions Resource: '*' - PolicyName: read-code PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:GetObject - s3:GetObjectVersion Resource: !Sub 'arn:aws:s3:::${AssetsBucketName}/*' - PolicyName: msk-serverless-access PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - kafka-cluster:Connect Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/*/*' - Effect: Allow Action: - kafka-cluster:DescribeTopic - kafka-cluster:CreateTopic - kafka-cluster:WriteData - kafka-cluster:ReadData Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/*/*' - Effect: Allow Action: - kafka-cluster:AlterGroup - kafka-cluster:DescribeGroup Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/*/*' ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonVPCFullAccess - arn:aws:iam::aws:policy/CloudWatchFullAccess OpenSearchMasterPasswordSecret: Type: AWS::SecretsManager::Secret Properties: Description: This secret has a dynamically generated secret password. GenerateSecretString: SecretStringTemplate: !Join [ '', [ '{"username": "', !Ref OpenSearchMasterUserName, '"}' ] ] GenerateStringKey: "password" PasswordLength: 10 ExcludeCharacters: "\" ' ( ) * + , - . / : ; < = > ! # ? @ [ \\ ] ^ _ ` { | } ~" RetrieveOpenSearchPasswordLambdaPolicy: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: 2012-10-17 Statement: - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Effect: Allow Resource: arn:aws:logs:*:*:* Sid: AllowCWLogsWrite - Action: - secretsmanager:GetSecretValue Effect: Allow Resource: !Ref OpenSearchMasterPasswordSecret RetrieveOpenSearchPasswordLambdaExecutionRole: Type: AWS::IAM::Role DependsOn: RetrieveOpenSearchPasswordLambdaPolicy Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com ManagedPolicyArns: - !Ref RetrieveOpenSearchPasswordLambdaPolicy Path: / RetrieveOpenSearchPasswordLambdaFunction: Type: AWS::Lambda::Function DependsOn: OpenSearchMasterPasswordSecret Properties: Handler: index.lambda_handler Role: !GetAtt RetrieveOpenSearchPasswordLambdaExecutionRole.Arn Runtime: python3.9 Timeout: 120 Code: ZipFile: | import json import boto3 import base64 import os import cfnresponse from botocore.exceptions import ClientError SECRET_ARN = os.getenv('SECRET_ARN') REGION = os.getenv('REGION') def lambda_handler(event, context): # Create a Secrets Manager client session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name=REGION ) secret = "" try: get_secret_value_response = client.get_secret_value( SecretId=SECRET_ARN ) except ClientError as err: print(err) cfnresponse.send(event, context, cfnresponse.FAILED, err) else: # Decrypts secret using the associated KMS key. # Depending on whether the secret is a string or binary, one of these fields will be populated. if 'SecretString' in get_secret_value_response: secret = get_secret_value_response['SecretString'] else: decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary']) password_secret = json.loads(secret) responseData = {"OpenSearchMasterPassword": password_secret["password"]} print(responseData) if responseData: cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData) else: cfnresponse.send(event, context, cfnresponse.FAILED, "Internal Error") Environment: Variables: SECRET_ARN: !Ref OpenSearchMasterPasswordSecret REGION: !Ref AWS::Region RetrieveOpenSearchPassword: Type: Custom::RetrieveOpenSearchPassword DependsOn: RetrieveOpenSearchPasswordLambdaFunction Properties: ServiceToken: Fn::GetAtt: RetrieveOpenSearchPasswordLambdaFunction.Arn AWSServiceRoleForAmazonOpenSearchService: Type: 'AWS::IAM::ServiceLinkedRole' Properties: AWSServiceName: es.amazonaws.com OpenSearchServiceDomain: Type: 'AWS::OpenSearchService::Domain' DependsOn: - OpenSearchSecurityGroup - AWSServiceRoleForAmazonOpenSearchService Properties: AccessPolicies: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "es:*", "Resource": { "Fn::Join": [ ":", [ "arn:aws:es", { "Ref": "AWS::Region" }, { "Ref": "AWS::AccountId" }, "domain/*" ] ] } } ] } DomainName: !Join - '-' - - 'aos-clickstream' - !Ref 'AWS::Region' EngineVersion: OpenSearch_1.3 ClusterConfig: InstanceCount: '1' InstanceType: r6g.large.search DomainEndpointOptions: EnforceHTTPS: true NodeToNodeEncryptionOptions: Enabled: true EncryptionAtRestOptions: Enabled: true EBSOptions: EBSEnabled: true Iops: '0' VolumeSize: '100' VolumeType: 'gp2' AccessPolicies: Version: '2012-10-17' Statement: - Effect: Allow Principal: AWS: '*' Action: 'es:*' Resource: '*' AdvancedOptions: rest.action.multi.allow_explicit_index: true AdvancedSecurityOptions: Enabled: true InternalUserDatabaseEnabled: true MasterUserOptions: MasterUserName: !Ref OpenSearchMasterUserName MasterUserPassword: !Join - "" - - "{{resolve:secretsmanager:" - !Ref OpenSearchMasterPasswordSecret - ":SecretString:password}}" VPCOptions: SubnetIds: - !Ref PrivateSubnetMSKOne SecurityGroupIds: - !Ref OpenSearchSecurityGroup UpdatePolicy: EnableVersionUpgrade: true GlueSchemaRegistry: Type: AWS::Glue::Registry Properties: Description: Glue Schema Registry to store all origanisation schemas Name: serverless ContainerCluster: Type: AWS::ECS::Cluster Properties: ClusterName: !Sub "${AWS::StackName}-msk-producer-cluster" CwLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-producer-log-group" ExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${AWS::StackName}-ecs-task-execution-role" AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: ecs-tasks.amazonaws.com Action: 'sts:AssumeRole' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy' - 'arn:aws:iam::aws:policy/AmazonElasticContainerRegistryPublicReadOnly' TaskRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${AWS::StackName}-ecs-task-role" AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: ecs-tasks.amazonaws.com Action: 'sts:AssumeRole' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AWSGlueSchemaRegistryFullAccess' - 'arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy' Policies: - PolicyName: !Sub "${AWS::StackName}-msk-producer-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - kafka-cluster:Connect - kafka-cluster:AlterCluster - kafka-cluster:DescribeCluster - kafka-cluster:*Topic* - kafka-cluster:WriteData - kafka-cluster:ReadData - kafka-cluster:AlterGroup - kafka-cluster:DescribeGroup Resource: '*' ProducerTaskDefinition: Type: AWS::ECS::TaskDefinition DependsOn: CwLogGroup Properties: cpu: 1024 memory: 2048 NetworkMode: awsvpc RequiresCompatibilities: - FARGATE TaskRoleArn: !Ref TaskRole ExecutionRoleArn: !Ref ExecutionRole ContainerDefinitions: - Name: clickstream-producer Image: public.ecr.aws/l2e5u6w9/sample-clickstream-producer:latest Environment: - Name: BOOTSTRAP_STRING Value: '' - Name: REGION Value: !Ref "AWS::Region" - Name: TOPIC_NAME Value: clickstream LogConfiguration: LogDriver: awslogs Options: awslogs-region: !Ref AWS::Region awslogs-group: !Ref CwLogGroup awslogs-stream-prefix: ecs LambdaSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: MskLambda-SG GroupName: MskLambda-SG SecurityGroupEgress: - CidrIp: 0.0.0.0/0 Description: Allow all outbound traffic by default IpProtocol: "-1" VpcId: !Ref VPC ProcessMskFunction: Type: AWS::Lambda::Function DependsOn: LambdaMskPolicy Properties: FunctionName: MskLambda-Function Code: S3Bucket: !Ref AssetsBucketName S3Key: !Ref LambdaKey Handler: com.amazonaws.kafka.samples.HandlerMSK Runtime: java8 Description: Java function to read MSK Data and store in S3 MemorySize: 512 Timeout: 300 Environment: Variables: BUCKET_NAME: !Ref DestinationBucket RETRIES: "3" VpcConfig: SecurityGroupIds: - !Ref LambdaSecurityGroup SubnetIds: - !Ref PrivateSubnetMSKOne - !Ref PrivateSubnetMSKTwo - !Ref PrivateSubnetMSKThree Role: !GetAtt LambdaRole.Arn LambdaMskPolicy: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Statement: - Action: - kafka-cluster:AlterGroup - kafka-cluster:Connect - kafka-cluster:DescribeClusterDynamicConfiguration - kafka-cluster:DescribeGroup - kafka-cluster:DescribeTopic - kafka-cluster:ReadData Effect: Allow Resource: - Fn::Join: - '' - - 'arn:aws:kafka:' - Ref: AWS::Region - ":" - Ref: AWS::AccountId - ":cluster/*" - 'arn:aws:kafka:' - Fn::Join: - '' - - 'arn:aws:kafka:' - Ref: AWS::Region - ":" - Ref: AWS::AccountId - ":group/*/*/*" - Fn::Join: - '' - - 'arn:aws:kafka:' - Ref: AWS::Region - ":" - Ref: AWS::AccountId - ":topic/*/*/*" - Action: - glue:GetSchemaVersion Effect: Allow Resource: '*' - Action: - s3:PutObject Effect: Allow Resource: - '*' Version: '2012-10-17' Description: '' ManagedPolicyName: MskLambda-ManagedPolicy Path: "/" DestinationBucket: Type: AWS::S3::Bucket DeletionPolicy: Delete LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: lambda.amazonaws.com Action: 'sts:AssumeRole' Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole - !Ref LambdaMskPolicy KDAClickstream: Type: AWS::KinesisAnalyticsV2::Application DependsOn: KDARole Properties: ApplicationConfiguration: ApplicationCodeConfiguration: CodeContent: S3ContentLocation: BucketARN: !Sub 'arn:aws:s3:::${AssetsBucketName}' FileKey: !Ref KdaAppKey CodeContentType: ZIPFILE VpcConfigurations: - SecurityGroupIds: - !Ref KDASecurityGroup SubnetIds: - !Ref PrivateSubnetMSKOne - !Ref PrivateSubnetMSKTwo - !Ref PrivateSubnetMSKThree ApplicationSnapshotConfiguration: SnapshotsEnabled: false FlinkApplicationConfiguration: ParallelismConfiguration: Parallelism: 8 ConfigurationType: CUSTOM MonitoringConfiguration: ConfigurationType: CUSTOM LogLevel: WARN MetricsLevel: OPERATOR EnvironmentProperties: PropertyGroups: - PropertyGroupId: FlinkApplicationProperties PropertyMap: { "OpenSearchEndpoint": !Sub 'https://${OpenSearchServiceDomain.DomainEndpoint}', "Region": !Ref 'AWS::Region', "BootstrapServers": "xxxxxxx:9098", "GroupId": "flink-clickstream-processor", "Topic": "clickstream", "DepartmentsAgg_Topic": "Departments_Agg", "clickEventsUserIdAggResult_Topic": "ClickEvents_UserId_Agg_Result", "userSessionsAggregatesWithOrderCheckout_Topic": "User_Sessions_Aggregates_With_Order_Checkout" } ApplicationDescription: KDA Flink App to analyze Clicktream data from MSK Serverless ApplicationName: !Join - '-' - - 'KDAFlinkClickstream' - !Ref 'AWS::StackName' RuntimeEnvironment: FLINK-1_13 ServiceExecutionRole: !GetAtt KDARole.Arn Outputs: MSKVPC: Description: VPC where MSK Serverless cluster needs to be created Value: !Ref VPC MSKPrivateSubnetOne: Description: Private Subnet to be used with MSK Serverless cluster Value: !Ref PrivateSubnetMSKOne MSKPrivateSubnetTwo: Description: Private Subnet to be used with MSK Serverless cluster Value: !Ref PrivateSubnetMSKTwo MSKPrivateSubnetThree: Description: Private Subnet to be used with MSK Serverless cluster Value: !Ref PrivateSubnetMSKThree MSKSecurityGroupName: Description: Security Group Name of MSK Serverless cluster Value: !Ref MSKSecurityGroup MSKClientEC2InstanceSsh: Description: The SSH for the EC2 instance Value: !Sub ssh -A ec2-user@${KafkaClientEC2Instance.PublicDnsName} OpenSearchMasterUserName: Description: Username used to login to Amazon OpenSearch Service Value: !Ref OpenSearchMasterUserName OpenSearchMasterPassword: Description: OpenSearch password retrieved from AWS Secret Manager Value: !GetAtt RetrieveOpenSearchPassword.OpenSearchMasterPassword OpenSearchDashboardEndpoint: Description: Url to open OpenSearch Dashboard Value: !Sub 'https://${NGINXProxy.PublicDnsName}/'