AWSTemplateFormatVersion: '2010-09-09' Description: Real time Fraud Prevention blog #Parameters Parameters: BucketName: Type: String Description: The bucket name where the source code zip is present S3SourceCodePath: Type: String Description: S3 key for the zip file Default: lambda-functions.zip S3connectorPath: Type: String Description: S3 key for the connector zip file Default: confluentinc-kafka-connect-elasticsearch-11.1.6.zip YourEmail: Type: String Description: The email that will receive notifications. E.g awsblog@example.com KafkaInputTopic: Type: String Description: Input topic name Default: transactions KafkaOutputTopic: Type: String Description: Output topic name Default: processed_transactions FraudDetectorName: Type: String Description: Fraud Detector name to use Default: transaction_fraud_detector FraudDetectorEventName: Type: String Description: Fraud Detector event name to use. Eg transactions Default: transaction_event FraudDetectorEntityType: Type: String Description: Fraud Detector entity type name to use. Eg customer Default: customer OpenSearchMasterUsername: Type: String Description: Opensearch Master Username Default: admin OpenSearchMasterPassword: Type: String Description: Opensearch Master Password NoEcho: TRUE # Resources Resources: # VPC and Subnets BlogVPC: Type: AWS::EC2::VPC Properties: CidrBlock: 172.31.0.0/16 EnableDnsHostnames: true EnableDnsSupport: true Tags: - Key: Name Value: !Ref AWS::StackName publicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref BlogVPC AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: "" CidrBlock: 172.31.0.0/24 MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName","public-subnet" ] ] privateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref BlogVPC AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: "" CidrBlock: 172.31.3.0/24 MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName","private-subnet-a" ] ] privateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref BlogVPC AvailabilityZone: Fn::Select: - 1 - Fn::GetAZs: "" CidrBlock: 172.31.2.0/24 MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName","private-subnet-b" ] ] internetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName","gateway" ] ] gatewayToInternet: Type: AWS::EC2::VPCGatewayAttachment Properties: VpcId: !Ref BlogVPC InternetGatewayId: !Ref internetGateway publicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref BlogVPC publicRoute: Type: AWS::EC2::Route DependsOn: gatewayToInternet Properties: RouteTableId: !Ref publicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref internetGateway publicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref publicSubnet1 RouteTableId: !Ref publicRouteTable natGateway: Type: AWS::EC2::NatGateway DependsOn: natPublicIP Properties: AllocationId: !GetAtt natPublicIP.AllocationId SubnetId: !Ref publicSubnet1 natPublicIP: Type: AWS::EC2::EIP DependsOn: BlogVPC Properties: Domain: vpc privateRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref BlogVPC privateRoute: Type: AWS::EC2::Route Properties: RouteTableId: !Ref privateRouteTable DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref natGateway privateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref privateSubnet1 RouteTableId: !Ref privateRouteTable privateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref privateSubnet2 RouteTableId: !Ref privateRouteTable InboundRule: Type: AWS::EC2::SecurityGroupIngress Properties: CidrIp: 172.31.0.0/24 Description: Cloud9 to access OpenSearch FromPort: 443 GroupId: !GetAtt BlogVPC.DefaultSecurityGroup IpProtocol: tcp ToPort: 443 # Lambda Role LambdaServiceExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com Version: '2012-10-17' Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - SNS:Publish Resource: - Ref: fdSNSTopic - Effect: Allow Action: - ec2:CreateNetworkInterface - ec2:DescribeNetworkInterfaces - ec2:DescribeVpcs - ec2:DeleteNetworkInterface - ec2:DescribeSubnets - ec2:DescribeSecurityGroups - kafka:DescribeCluster - kafka:GetBootstrapBrokers - kafka:CreateConfiguration - kafka:DeleteConfiguration Resource: "*" # Config Lambda Role ConfigLambdaServiceExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com Version: '2012-10-17' Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kafkaconnect:CreateConnector - kafkaconnect:CreateCustomPlugin - kafkaconnect:CreateWorkerConfiguration - kafkaconnect:DeleteConnector - kafkaconnect:DescribeConnector - kafkaconnect:DescribeCustomPlugin - kafkaconnect:DescribeWorkerConfiguration - kafkaconnect:ListConnectors - kafkaconnect:ListCustomPlugins - kafkaconnect:ListWorkerConfigurations - kafkaconnect:UpdateConnector - ec2:CreateNetworkInterface - ec2:DescribeNetworkInterfaces - ec2:DescribeVpcs - ec2:DeleteNetworkInterface - ec2:DescribeSubnets - ec2:DescribeSecurityGroups - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - logs:CreateLogDelivery - logs:GetLogDelivery - logs:ListLogDeliveries - logs:PutResourcePolicy - logs:DescribeResourcePolicies - logs:DescribeLogGroups - logs:DeleteLogDelivery - logs:PutResourcePolicy - logs:DescribeResourcePolicies - logs:DescribeLogGroups - kafka:ListConfigurations - kafka:GetBootstrapBrokers Resource: "*" - Effect: Allow Action: - iam:AttachRolePolicy - iam:PutRolePolicy Resource: "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*" - Effect: Allow Action: - iam:PassRole Resource: !GetAtt MSKConnectServiceRole.Arn - Effect: Allow Action: - kafka:CreateConfiguration - kafka:DeleteConfiguration Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:*" - Effect: Allow Action: - kafka:UpdateClusterConfiguration - kafka:DescribeCluster - kafka:GetBootstrapBrokers Resource: !Ref MSKCluster - Effect: Allow Action: - kinesisanalytics:UpdateApplication - kinesisanalytics:StartApplication - kinesisanalytics:DescribeApplication - kinesisanalytics:AddApplicationVpcConfiguration Resource: !Sub "arn:aws:kinesisanalytics:${AWS::Region}:${AWS::AccountId}:application/${AWS::StackName}-kda-app" - Effect: Allow Action: - s3:GetObject Resource: !Sub "arn:aws:s3:::${BucketName}/*" - Effect: Allow Action: - iam:CreateServiceLinkedRole Resource: "arn:aws:iam::*:role/aws-service-role/kafkaconnect.amazonaws.com/AWSServiceRoleForKafkaConnect*" Condition: StringEquals: 'iam:AWSServiceName': kafkaconnect.amazonaws.com # MSK Connect Role MSKConnectServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - kafkaconnect.amazonaws.com Version: '2012-10-17' Path: "/" Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - ec2:CreateNetworkInterface - ec2:DescribeSubnets - ec2:DescribeVpcs - ec2:DescribeSecurityGroups - logs:CreateLogDelivery - logs:GetLogDelivery - logs:DeleteLogDelivery - logs:ListLogDeliveries - logs:PutResourcePolicy - logs:DescribeResourcePolicies - logs:DescribeLogGroups - logs:PutLogEvents - logs:DescribeLogStreams Resource: "*" - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:DeleteObject - s3:ListBucket - s3:GetObjectVersion Resource: - !Sub "arn:aws:s3:::${BucketName}" - !Sub "arn:aws:s3:::${BucketName}/*" - Effect: Allow Action: - es:ESHttp* Resource: - !GetAtt OpenSearchDomain.Arn # Event scheduledEvent: Type: AWS::Events::Rule Properties: Description: CloudWatch event to trigger lambda SG function ScheduleExpression: rate(1 minute) State: DISABLED Targets: - Arn: Fn::GetAtt: - fdLambdaStreamProducer - Arn Id: LambdaTarget LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: Fn::GetAtt: - fdLambdaStreamProducer - Arn Action: lambda:InvokeFunction Principal: events.amazonaws.com SourceArn: Fn::GetAtt: - scheduledEvent - Arn #Producer Lambda fdLambdaStreamProducer: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${AWS::StackName}-LambdaProducer" Code: S3Bucket: Ref: BucketName S3Key: Ref: S3SourceCodePath Handler: fdLambdaStreamProducer.lambda_handler MemorySize: 128 Role: Fn::GetAtt: - LambdaServiceExecutionRole - Arn Environment: Variables: InputKafkaTopic: Ref: KafkaInputTopic mskClusterArn: !Ref MSKCluster Runtime: python3.6 Timeout: 300 VpcConfig: SecurityGroupIds: - !GetAtt BlogVPC.DefaultSecurityGroup SubnetIds: - !Ref privateSubnet1 - !Ref privateSubnet2 #Consumer Lambda fdLambdaConsumer: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${AWS::StackName}-LambdaConsumer" Code: S3Bucket: Ref: BucketName S3Key: Ref: S3SourceCodePath Handler: fdLambdaConsumer.lambda_handler MemorySize: 128 Role: Fn::GetAtt: - LambdaServiceExecutionRole - Arn Environment: Variables: SNSTopicArn: !Ref fdSNSTopic Runtime: python3.6 Timeout: 300 #SNS Topic fdSNSTopic: Type: AWS::SNS::Topic SNSSubscription: Type: AWS::SNS::Subscription Properties: Endpoint: Ref: YourEmail Protocol: email TopicArn: !Ref fdSNSTopic # Event Source Mapping EventSourceMappingSNS: Type: AWS::Lambda::EventSourceMapping Properties: Enabled: false EventSourceArn: !Ref MSKCluster FunctionName: Fn::GetAtt: - fdLambdaConsumer - Arn StartingPosition: TRIM_HORIZON Topics: - Ref: KafkaOutputTopic # MSK Cluster MSKLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-msk" MSKCluster: Type: AWS::MSK::Cluster Properties: ClusterName: !Sub "${AWS::StackName}-msk" KafkaVersion: 2.2.1 NumberOfBrokerNodes: 2 LoggingInfo: BrokerLogs: CloudWatchLogs: Enabled: true LogGroup: !Ref MSKLogGroup EncryptionInfo: EncryptionInTransit: ClientBroker: "TLS" BrokerNodeGroupInfo: InstanceType: kafka.m5.large ClientSubnets: - !Ref privateSubnet1 - !Ref privateSubnet2 # KDA Resources KDAFlinkLogGroup: Type: AWS::Logs::LogGroup KDAFlinkLogStream: Type: AWS::Logs::LogStream Properties: LogGroupName: !Ref KDAFlinkLogGroup # KDA Role KinesisAnalyticsRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - kinesisanalytics.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: KinesisAnalyticsRolePolicy PolicyDocument: Statement: - Effect: Allow Action: - s3:GetObject - s3:GetObjectVersion Resource: - !Sub "arn:aws:s3:::${BucketName}/RealTimeFraudPrevention.zip" - Effect: Allow Action: - frauddetector:DescribeDetector - frauddetector:GetEventPrediction - frauddetector:GetKMSEncryptionKey - frauddetector:DescribeModelVersions Resource: "*" - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:DeleteObject - s3:ListBucket Resource: - !Sub "arn:aws:s3:::${BucketName}" - !Sub "arn:aws:s3:::${BucketName}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*" - Effect: Allow Action: - logs:DescribeLogStreams Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${KDAFlinkLogGroup}:log-stream:*" - Effect: Allow Action: - logs:PutLogEvents Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${KDAFlinkLogGroup}:log-stream:${KDAFlinkLogStream}" - Effect: Allow Action: - "ec2:DescribeVpcs" - "ec2:DescribeSubnets" - "ec2:DescribeSecurityGroups" - "ec2:DescribeDhcpOptions" - "ec2:CreateNetworkInterface" - "ec2:CreateNetworkInterfacePermission" - "ec2:DescribeNetworkInterfaces" - "ec2:DeleteNetworkInterface" Resource: "*" #KDA Flink Application KDAFlinkApplication: Type: AWS::KinesisAnalyticsV2::Application Properties: RuntimeEnvironment: FLINK-1_11 ServiceExecutionRole: !GetAtt KinesisAnalyticsRole.Arn ApplicationName: !Sub "${AWS::StackName}-kda-app" ApplicationConfiguration: ApplicationCodeConfiguration: CodeContent: S3ContentLocation: BucketARN: !Sub "arn:aws:s3:::${BucketName}" FileKey: "RealTimeFraudPrevention.zip" CodeContentType: ZIPFILE ApplicationSnapshotConfiguration: SnapshotsEnabled: true FlinkApplicationConfiguration: MonitoringConfiguration: ConfigurationType: CUSTOM LogLevel: INFO MetricsLevel: TASK ParallelismConfiguration: AutoScalingEnabled: true ConfigurationType: CUSTOM Parallelism: 12 ParallelismPerKPU: 1 EnvironmentProperties: PropertyGroups: - PropertyGroupId: kinesis.analytics.flink.run.options PropertyMap: python: "RealTimeFraudPrevention/main.py" jarfile: "RealTimeFraudPrevention/lib/flink-sql-connector-kafka_2.11-1.11.2.jar" - PropertyGroupId: producer.config.0 PropertyMap: input.topic.name: Ref: KafkaInputTopic aws.region: !Sub "${AWS::Region}" bootstrap.servers: "CHANGE THIS MSK ENDPOINT" frauddetector.name: Ref: FraudDetectorName frauddetector.event.name: !Ref FraudDetectorEventName frauddetector.entity.type: !Ref FraudDetectorEntityType - PropertyGroupId: consumer.config.0 PropertyMap: output.topic.name: Ref: KafkaOutputTopic KDAFlinkAppCloudwatch: Type: AWS::KinesisAnalyticsV2::ApplicationCloudWatchLoggingOption Properties: ApplicationName: !Ref KDAFlinkApplication CloudWatchLoggingOption: LogStreamARN: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${KDAFlinkLogGroup}:log-stream:${KDAFlinkLogStream}" # Opensearch Service and KMS Key KMSKey: Type: AWS::KMS::Key Properties: Enabled: true EnableKeyRotation: true PendingWindowInDays: 7 KeyPolicy: Version: 2012-10-17 Id: key-default-1 Statement: - Sid: Allow administration of the key Effect: Allow Principal: AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" Action: 'kms:*' Resource: !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/*" - Sid: Allow use of the key Effect: Allow Principal: Service: "es.amazonaws.com" Action: - 'kms:DescribeKey' - 'kms:Encrypt' - 'kms:Decrypt' - 'kms:ReEncrypt*' - 'kms:GenerateDataKey*' - 'kms:CreateGrant' - 'kms:Delete*' Resource: !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/*" Condition: StringEquals: 'kms:CallerAccount': !Sub '${AWS::AccountId}' 'kms:ViaService': es.eu-west-1.amazonaws.com - Sid: Allow OpenSearch service principals to describe the key directly Effect: Allow Principal: Service: "es.amazonaws.com" Action: - 'kms:Describe*' - 'kms:Get*' - 'kms:List*' - 'kms:EnableKeyRotation' Resource: !Sub "arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/*" OpenSearchServiceLinkedRole: Type: AWS::IAM::ServiceLinkedRole Properties: AWSServiceName: opensearchservice.amazonaws.com OpenSearchDomain: Type: AWS::OpenSearchService::Domain Properties: DomainName: !Sub "${AWS::StackName}-os" EngineVersion: 'OpenSearch_1.0' ClusterConfig: DedicatedMasterCount: 3 DedicatedMasterEnabled: true DedicatedMasterType: m5.large.search InstanceCount: 2 InstanceType: m5.large.search ZoneAwarenessEnabled: true ZoneAwarenessConfig: AvailabilityZoneCount: 2 WarmEnabled: false AccessPolicies: Version: '2012-10-17' Statement: - Effect: 'Allow' Principal: AWS: '*' Action: 'es:*' Resource: !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${AWS::StackName}-os/*' NodeToNodeEncryptionOptions: Enabled: true EBSOptions: EBSEnabled: true VolumeType: gp2 VolumeSize: 10 CognitoOptions: Enabled: false EncryptionAtRestOptions: Enabled: true KmsKeyId: !Ref KMSKey DomainEndpointOptions: CustomEndpointEnabled: false EnforceHTTPS: true TLSSecurityPolicy: 'Policy-Min-TLS-1-0-2019-07' AdvancedOptions: override_main_response_version: true AdvancedSecurityOptions: Enabled: true InternalUserDatabaseEnabled: true MasterUserOptions: MasterUserName: Ref: OpenSearchMasterUsername MasterUserPassword: Ref: OpenSearchMasterPassword VPCOptions: SecurityGroupIds: - !GetAtt BlogVPC.DefaultSecurityGroup SubnetIds: - !Ref privateSubnet1 - !Ref privateSubnet2 LogPublishingOptions: ES_APPLICATION_LOGS: CloudWatchLogsLogGroupArn: !GetAtt OpenSearchApplicationLogGroup.Arn Enabled: true SEARCH_SLOW_LOGS: CloudWatchLogsLogGroupArn: !GetAtt OpenSearchSearchSlowLogGroup.Arn Enabled: true INDEX_SLOW_LOGS: CloudWatchLogsLogGroupArn: !GetAtt OpenSearchIndexSlowLogGroup.Arn Enabled: true OpenSearchApplicationLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-opensearch-application-logs" OpenSearchSearchSlowLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-opensearch-searchslow-logs" OpenSearchIndexSlowLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-opensearch-indexslow-logs" CloudWatchApplicationResourcePolicy: Type: AWS::Logs::ResourcePolicy Properties: PolicyName: !Sub "${AWS::StackName}ApplicationResourcePolicy" PolicyDocument: !Sub '{ "Version": "2012-10-17", "Statement": [ { "Sid": "OpenSearchLogsToCloudWatchLogs", "Effect": "Allow", "Principal": { "Service": [ "es.amazonaws.com" ] }, "Action":["logs:PutLogEvents", "logs:CreateLogStream"], "Resource": "*"} ] }' MSKConnectLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-mskconnect" ### Post Deployment Configuration #Config Lambda LambdaConfig: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${AWS::StackName}-LambdaConfig" Code: S3Bucket: Ref: BucketName S3Key: Ref: S3SourceCodePath Handler: LambdaConfig.lambda_handler MemorySize: 128 Role: Fn::GetAtt: - ConfigLambdaServiceExecutionRole - Arn Environment: Variables: kdaName: !Sub "${AWS::StackName}-kda-app" mskClusterArn: !Ref MSKCluster vpcID: !Ref BlogVPC subnet1: !Ref privateSubnet1 subnet2: !Ref privateSubnet2 sgID: !GetAtt BlogVPC.DefaultSecurityGroup stackname: !Sub "${AWS::StackName}" opensearchUsername: Ref: OpenSearchMasterUsername opensearchPassword: Ref: OpenSearchMasterPassword topic: Ref: KafkaOutputTopic opensearchDomainEndpoint: !GetAtt OpenSearchDomain.DomainEndpoint awsAccount: !Sub "${AWS::AccountId}" awsRegion: !Sub "${AWS::Region}" s3bucketName: Ref: BucketName s3connectorkey: Ref: S3connectorPath mskconnectRole: !GetAtt MSKConnectServiceRole.Arn logGroupName: !Sub "${AWS::StackName}-mskconnect" Runtime: python3.6 Timeout: 300 MSKKDAConfig: Type: Custom::MSKKDAConfig Properties: ServiceToken: !GetAtt LambdaConfig.Arn Region: !Ref "AWS::Region" Cloud9IDE: Type: AWS::Cloud9::EnvironmentEC2 Properties: AutomaticStopTimeMinutes: 120 ConnectionType: CONNECT_SSH Description: 'Cloud9 environment to access private resources' ImageId: amazonlinux-2-x86_64 InstanceType: t3.small SubnetId: !Ref publicSubnet1 Name: !Sub "${AWS::StackName}-cloud9" Outputs: OpenSearchDashboardLink: Description: Opensearch Dashboard Link Value: !Sub "https://${OpenSearchDomain.DomainEndpoint}/_dashboards" EnableEventSourceMapping: Description: Step2 - Run this command to enable event source mapping for the consumer lambda function. Value: !Sub "aws lambda update-event-source-mapping --uuid ${EventSourceMappingSNS} --enabled" EnableEventRule: Description: Step1 - Run this command to start producing data Value: !Sub "aws events enable-rule --name ${scheduledEvent}"