AWSTemplateFormatVersion: "2010-09-09" Description: Creates resources needed for streaming ingestion using Kinesis Data Analytics, Kafka topics and SageMaker Feature Store. Mappings: SubnetConfig: LabVPC: cidr: '10.0.0.0/16' PublicSubnetOne: cidr: '10.0.0.0/24' PublicSubnetTwo: cidr: '10.0.4.0/24' PrivateSubnetOne: cidr: '10.0.1.0/24' PrivateSubnetTwo: cidr: '10.0.2.0/24' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Required Parameters Parameters: - SageMakerStudioUserProfileName - CreditCardAggregateFeatureStoreName - CreditCardAggregateBatchFeatureStoreName ParameterLabels: SageMakerStudioUserProfileName: default: SageMaker Studio user profile name CreditCardAggregateFeatureStoreName: default: Feature Group name for credit card aggregate data CreditCardAggregateBatchFeatureStoreName: default: Feature Group name for credit card batch aggregate data Parameters: SageMakerStudioUserProfileName: Type: String Description: "SageMaker Studio user profile name " Default: "DefaultUser" CreditCardAggregateFeatureStoreName: Default: cc-agg-fg Type: String Description: CreditCard Aggregate FeatureGroup Name MinLength: 1 MaxLength: 63 AllowedPattern: ^[a-z0-9](-*[a-z0-9])* ConstraintDescription: Must be lowercase or numbers with a length of 1-63 characters. CreditCardAggregateBatchFeatureStoreName: Default: cc-agg-batch-fg Type: String Description: CreditCard Aggregate Batch FeatureGroup Name MinLength: 1 MaxLength: 63 AllowedPattern: ^[a-z0-9](-*[a-z0-9])* ConstraintDescription: Must be lowercase or numbers with a length of 1-63 characters. Resources: ### ### VPC ### LabVPC: Type: AWS::EC2::VPC Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !FindInMap ['SubnetConfig', 'LabVPC', 'cidr'] Tags: - Key: 'Name' Value: !Ref AWS::StackName PublicSubnetOne: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'LabVPC' CidrBlock: !FindInMap ['SubnetConfig', 'PublicSubnetOne', 'cidr'] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: !Join [ "-", [ !Ref "AWS::StackName","PublicSubnetOne" ] ] PublicSubnetTwo: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'LabVPC' CidrBlock: !FindInMap ['SubnetConfig', 'PublicSubnetTwo', 'cidr'] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: !Join [ "-", [ !Ref "AWS::StackName","PublicSubnetTwo" ] ] PrivateSubnetOne: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 0 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'LabVPC' CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetOne', 'cidr'] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: !Join [ "-", [ !Ref "AWS::StackName","PrivateSubnetOne" ] ] PrivateSubnetTwo: Type: AWS::EC2::Subnet Properties: AvailabilityZone: Fn::Select: - 1 - Fn::GetAZs: {Ref: 'AWS::Region'} VpcId: !Ref 'LabVPC' CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetTwo', 'cidr'] MapPublicIpOnLaunch: false Tags: - Key: 'Name' Value: !Join [ "-", [ !Ref "AWS::StackName","PrivateSubnetTwo" ] ] InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: !Join [ "-", [ !Ref "AWS::StackName","InternetGateway" ] ] GatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: VpcId: !Ref 'LabVPC' InternetGatewayId: !Ref 'InternetGateway' NATIP: Type: AWS::EC2::EIP DependsOn: GatewayAttachment Properties: Domain: vpc NatGateway: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NATIP.AllocationId SubnetId: !Ref 'PublicSubnetOne' Tags: - Key: 'Name' Value: !Join [ "-", [ !Ref "AWS::StackName","NatGateway" ] ] PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref 'LabVPC' PublicRoute: Type: AWS::EC2::Route DependsOn: [GatewayAttachment, InternetGateway] Properties: RouteTableId: !Ref 'PublicRouteTable' DestinationCidrBlock: '0.0.0.0/0' GatewayId: !Ref 'InternetGateway' PublicSubnetOneRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PublicSubnetTwoRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: SubnetId: !Ref PublicSubnetTwo RouteTableId: !Ref PublicRouteTable PrivateRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref 'LabVPC' PrivateRoute: Type: AWS::EC2::Route DependsOn: NatGateway Properties: RouteTableId: !Ref PrivateRouteTable DestinationCidrBlock: '0.0.0.0/0' NatGatewayId: !Ref 'NatGateway' PrivateSubnetOneRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetOne PrivateSubnetTwoRouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetTwo ### ### Security Groups ### SageMakerStudioSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Security group for SageMaker studio kernel instances VpcId: !Ref 'LabVPC' SecurityGroupEgress: - IpProtocol: "-1" CidrIp: 0.0.0.0/0 DependsOn: [PrivateRouteTable, PrivateSubnetOne, PrivateSubnetTwo, PublicSubnetOne, PublicSubnetTwo] MSKServiceSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: Access to the Kafka service on the MSK cluster VpcId: !Ref 'LabVPC' SecurityGroupEgress: - IpProtocol: "-1" CidrIp: 0.0.0.0/0 DependsOn: [PrivateRouteTable, PrivateSubnetOne, PrivateSubnetTwo, PublicSubnetOne, PublicSubnetTwo] MSKPlainTextSecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: IpProtocol: tcp GroupId: !Ref MSKServiceSecurityGroup SourceSecurityGroupId: !GetAtt MSKServiceSecurityGroup.GroupId Description: Plaintext Kafka FromPort: 9092 ToPort: 9092 MSKPlainTextSecurityGroupIngress2: Type: AWS::EC2::SecurityGroupIngress Properties: IpProtocol: -1 GroupId: !Ref MSKServiceSecurityGroup SourceSecurityGroupId: !GetAtt MSKServiceSecurityGroup.GroupId Description: All Traffic from SG SageMakerStudioSecurityGroupSelfIngress: Type: AWS::EC2::SecurityGroupIngress Properties: IpProtocol: -1 GroupId: !Ref SageMakerStudioSecurityGroup SourceSecurityGroupId: !GetAtt SageMakerStudioSecurityGroup.GroupId Description: All traffic from security group SageMakerStudioSecurityGroupMSKIngress: Type: AWS::EC2::SecurityGroupIngress Properties: IpProtocol: -1 GroupId: !Ref SageMakerStudioSecurityGroup SourceSecurityGroupId: !GetAtt MSKServiceSecurityGroup.GroupId Description: Alllow all traffic from MSK cluster to SageMaker Studio kernels MSKSecurityGroupSageMakerStudioIngress: Type: AWS::EC2::SecurityGroupIngress Properties: IpProtocol: -1 GroupId: !Ref MSKServiceSecurityGroup SourceSecurityGroupId: !GetAtt SageMakerStudioSecurityGroup.GroupId Description: Alllow all traffic from SageMaker Studio kernels to MSK cluster ### ### VPC Endpoints ### SageMakerAPIVPCInterfaceEndpointforPrivateSubnets: Type: AWS::EC2::VPCEndpoint DependsOn: [PrivateSubnetOne, PrivateSubnetTwo, MSKServiceSecurityGroup, SageMakerStudioSecurityGroup] Properties: VpcEndpointType: Interface ServiceName: !Sub 'com.amazonaws.${AWS::Region}.sagemaker.api' VpcId: !Ref LabVPC PrivateDnsEnabled: true SubnetIds: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo SecurityGroupIds: - !Ref MSKServiceSecurityGroup - !Ref SageMakerStudioSecurityGroup SageMakerRuntimeVPCInterfaceEndpointforPrivateSubnets: Type: AWS::EC2::VPCEndpoint DependsOn: [PrivateSubnetOne, PrivateSubnetTwo, MSKServiceSecurityGroup, SageMakerStudioSecurityGroup] Properties: VpcEndpointType: Interface ServiceName: !Sub 'com.amazonaws.${AWS::Region}.sagemaker.runtime' VpcId: !Ref LabVPC PrivateDnsEnabled: true SubnetIds: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo SecurityGroupIds: - !Ref MSKServiceSecurityGroup - !Ref SageMakerStudioSecurityGroup S3VPCGatewayEndpointforPrivateSubnets: Type: AWS::EC2::VPCEndpoint DependsOn: [PublicRouteTable, PrivateRouteTable] Properties: VpcEndpointType: Gateway PrivateDnsEnabled: false PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: '*' Action: - '*' Resource: - '*' RouteTableIds: - !Ref PublicRouteTable - !Ref PrivateRouteTable ServiceName: !Sub 'com.amazonaws.${AWS::Region}.s3' VpcId: !Ref LabVPC ElasticFileSystemVPCInterfaceEndpointforPrivateSubnets: Type: AWS::EC2::VPCEndpoint DependsOn: [PrivateSubnetOne, PrivateSubnetTwo, MSKServiceSecurityGroup, SageMakerStudioSecurityGroup] Properties: VpcEndpointType: Interface ServiceName: !Sub 'com.amazonaws.${AWS::Region}.elasticfilesystem' VpcId: !Ref LabVPC PrivateDnsEnabled: true SubnetIds: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo SecurityGroupIds: - !Ref MSKServiceSecurityGroup - !Ref SageMakerStudioSecurityGroup ### ### IAM Roles ### SageMakerExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - sagemaker.amazonaws.com Action: - "sts:AssumeRole" Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AmazonMSKFullAccess - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/CloudWatchFullAccess - arn:aws:iam::aws:policy/AWSLambda_FullAccess - arn:aws:iam::aws:policy/AmazonKinesisAnalyticsFullAccess KDAStreamingApplicationExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - kinesisanalytics.amazonaws.com Action: - "sts:AssumeRole" Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/CloudWatchFullAccess - arn:aws:iam::aws:policy/AmazonVPCFullAccess - arn:aws:iam::aws:policy/AmazonVPCCrossAccountNetworkInterfaceOperations - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonMSKFullAccess DownloadLambdaExecutionRole: Type: AWS::IAM::Role Properties: Description: Runs the Lambda function that has permission to upload the job scripts to the S3 bucket. AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com Version: '2012-10-17' Path: "/" Policies: - PolicyDocument: Version: '2012-10-17' Statement: - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Effect: Allow Resource: arn:aws:logs:*:*:* PolicyName: !Sub AWSLambda-CW-${AWS::StackName} ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonMSKFullAccess - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess - arn:aws:iam::aws:policy/service-role/AWSLambdaRole - arn:aws:iam::aws:policy/CloudWatchFullAccess - arn:aws:iam::aws:policy/AWSLambdaExecute LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: lambda.amazonaws.com Action: 'sts:AssumeRole' Description: Lambda Role that can create KDA Apps Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonMSKFullAccess - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess - arn:aws:iam::aws:policy/CloudWatchFullAccess - arn:aws:iam::aws:policy/AWSLambdaExecute - arn:aws:iam::aws:policy/AmazonKinesisAnalyticsFullAccess - arn:aws:iam::aws:policy/AmazonVPCFullAccess KDAStudioExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: kinesisanalytics.amazonaws.com Action: 'sts:AssumeRole' Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AmazonVPCFullAccess - arn:aws:iam::aws:policy/AmazonKinesisAnalyticsFullAccess - arn:aws:iam::aws:policy/AmazonMSKFullAccess - arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AWSLambda_FullAccess - arn:aws:iam::aws:policy/CloudWatchFullAccess - arn:aws:iam::aws:policy/AWSLambdaExecute ### ### S3 BUCKET ### KDAStreamingApplicationS3Bucket: Type: AWS::S3::Bucket Properties: BucketName: !Join - "-" - - "kda-streaming-app" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" ### ### CUSTOM S3 RESOURCES ### DownloadS3CustomResource: Type: Custom::S3CustomResource Properties: ServiceToken: !GetAtt DownloadLambdaFunction.Arn the_bucket: !Ref KDAStreamingApplicationS3Bucket invoke_fraud_endpoint_lambda_code_file_key: artifacts/InvokeFraudEndpointlambda_function.zip invoke_fraud_endpoint_lambda_code_url: s3://aws-ml-blog/artifacts/ML-13533/InvokeFraudEndpointlambda_function.zip streaming_ingest_agg_featares_lambda_code_file_key: artifacts/StreamingIngestAggFeatureslambda_function.zip streaming_ingest_agg_features_lambda_code_url: s3://aws-ml-blog/artifacts/ML-13533/StreamingIngestAggFeatureslambda_function.zip boto3_lib_layer_code_file_key: artifacts/boto3-1-16-28.zip boto3_lib_layer_code_url: s3://aws-ml-blog/artifacts/ML-13533/boto3-1-16-28.zip ### ### LAMBDA LAYERS ### Boto3LibLayer: Type: AWS::Lambda::LayerVersion DependsOn: DownloadS3CustomResource Properties: CompatibleRuntimes: - python3.6 - python3.7 Content: S3Bucket: !Ref KDAStreamingApplicationS3Bucket S3Key: artifacts/boto3-1-16-28.zip Description: Upgraded version of boto3 library for SageMaker FeatureStore - obtained from a public S3 bucket LicenseInfo: MIT ### ### LAMBDA ### DownloadLambdaFunction: Type: "AWS::Lambda::Function" DependsOn: DownloadLambdaExecutionRole Properties: FunctionName: !Sub 'lambda-download-function-${AWS::StackName}' Description: This is used as an AWS CloudFormation custom resource to copy job scripts from a GitHub repository to your S3 bucket. ReservedConcurrentExecutions: 50 Handler: index.handler Role: !GetAtt DownloadLambdaExecutionRole.Arn Timeout: 360 Runtime: python3.8 Code: ZipFile: !Sub - | import boto3 from botocore.client import ClientError import cfnresponse from urllib.parse import urlparse import urllib.request def handler(event, context): # Init print(f'event: {event}') the_event = event['RequestType'].strip() print("The event is: ", the_event) response_data = {} s3 = boto3.client('s3') s3_resource = boto3.resource('s3') # Retrieve parameters the_bucket = event['ResourceProperties']['the_bucket'].strip() invoke_fraud_endpoint_lambda_code_file_key = event['ResourceProperties']['invoke_fraud_endpoint_lambda_code_file_key'].strip() invoke_fraud_endpoint_lambda_code_url = event['ResourceProperties']['invoke_fraud_endpoint_lambda_code_url'].strip() streaming_ingest_agg_featares_lambda_code_file_key = event['ResourceProperties']['streaming_ingest_agg_featares_lambda_code_file_key'].strip() streaming_ingest_agg_features_lambda_code_url = event['ResourceProperties']['streaming_ingest_agg_features_lambda_code_url'].strip() boto3_lib_layer_code_file_key = event['ResourceProperties']['boto3_lib_layer_code_file_key'].strip() boto3_lib_layer_code_url = event['ResourceProperties']['boto3_lib_layer_code_url'].strip() try: if the_event in ('Create', 'Update'): # Copying job script try: s3_source_object = urlparse(invoke_fraud_endpoint_lambda_code_url, allow_fragments=False) copy_source = { 'Bucket': s3_source_object.netloc, 'Key': s3_source_object.path.lstrip('/') } s3_resource.meta.client.copy(copy_source, the_bucket, invoke_fraud_endpoint_lambda_code_file_key) s3_source_object = urlparse(streaming_ingest_agg_features_lambda_code_url, allow_fragments=False) copy_source = { 'Bucket': s3_source_object.netloc, 'Key': s3_source_object.path.lstrip('/') } s3_resource.meta.client.copy(copy_source, the_bucket, streaming_ingest_agg_featares_lambda_code_file_key) s3_source_object = urlparse(boto3_lib_layer_code_url, allow_fragments=False) copy_source = { 'Bucket': s3_source_object.netloc, 'Key': s3_source_object.path.lstrip('/') } s3_resource.meta.client.copy(copy_source, the_bucket, boto3_lib_layer_code_file_key) except ClientError as ce: print("Failed to copy the source code file.") print(ce) print(ce.response['ResponseMetadata']) except urllib.error.HTTPError as e: print(e) # Everything OK... send the signal back print("Completed.") cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data) except Exception as e: print("Failed...") print(str(e)) response_data['Data'] = str(e) cfnresponse.send(event, context, cfnresponse.FAILED, response_data) - Region: !Ref AWS::Region PredictLambdaFunction: Type: AWS::Lambda::Function DependsOn: DownloadS3CustomResource Properties: FunctionName: InvokeFraudEndpointLambda Description: LambdaFunction for Python ReservedConcurrentExecutions: 50 Runtime: python3.7 Handler: lambda_function.lambda_handler Code: S3Bucket: !Ref KDAStreamingApplicationS3Bucket S3Key: artifacts/InvokeFraudEndpointlambda_function.zip Layers: - Ref: Boto3LibLayer MemorySize: 512 Timeout: 60 Role: Fn::GetAtt: - LambdaRole - Arn Environment: Variables: CC_AGG_FEATURE_GROUP_NAME: Ref: CreditCardAggregateFeatureStoreName CC_AGG_BATCH_FEATURE_GROUP_NAME: Ref: CreditCardAggregateBatchFeatureStoreName ENDPOINT_NAME: this_will_be_overwritten_by_notebook FRAUD_THRESHOLD: 0.25 LOG_LEVEL: INFO HOME: /tmp IngestLambdaFunction: Type: AWS::Lambda::Function DependsOn: DownloadS3CustomResource Properties: FunctionName: StreamingIngestAggFeatures Description: LambdaFunction for Python ReservedConcurrentExecutions: 50 Runtime: python3.7 Handler: lambda_function.lambda_handler Code: S3Bucket: !Ref KDAStreamingApplicationS3Bucket S3Key: artifacts/StreamingIngestAggFeatureslambda_function.zip Layers: - Ref: Boto3LibLayer MemorySize: 512 Timeout: 60 Role: Fn::GetAtt: - LambdaRole - Arn Environment: Variables: CC_AGG_FEATURE_GROUP_NAME: Ref: CreditCardAggregateFeatureStoreName HOME: /tmp ### ### GLUE ### KDAStudioGlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: kdastudio Description: Glue database for KDA Studio ### ### MSK Cluster ### MSKLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "${AWS::StackName}-msk-log-group" RetentionInDays: 7 MSKCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: InstanceType: kafka.t3.small ClientSubnets: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo SecurityGroups: [!Ref MSKServiceSecurityGroup] StorageInfo: EBSStorageInfo: VolumeSize: 10 ClusterName: !Sub "${AWS::StackName}-msk-cluster" LoggingInfo: BrokerLogs: CloudWatchLogs: Enabled: true LogGroup: !Ref MSKLogGroup EncryptionInfo: EncryptionInTransit: ClientBroker: TLS_PLAINTEXT InCluster: false EnhancedMonitoring: DEFAULT KafkaVersion: 2.8.0 NumberOfBrokerNodes: 2 DependsOn: [MSKServiceSecurityGroup, LabVPC, NatGateway, InternetGateway] ### ### KDA Studio ### KDAStudioApp: Type: AWS::KinesisAnalyticsV2::Application Properties: ApplicationName: 'kda-msk-streaming' ApplicationDescription: 'Apache Flink running in a Zeppelin Notebook' ApplicationMode: 'INTERACTIVE' RuntimeEnvironment: 'ZEPPELIN-FLINK-2_0' ServiceExecutionRole: !GetAtt KDAStudioExecutionRole.Arn ApplicationConfiguration: FlinkApplicationConfiguration: ParallelismConfiguration: AutoScalingEnabled: False ConfigurationType: 'CUSTOM' Parallelism: 4 ParallelismPerKPU: 1 VpcConfigurations: - SecurityGroupIds: - !Ref MSKServiceSecurityGroup SubnetIds: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo ZeppelinApplicationConfiguration: MonitoringConfiguration: LogLevel: 'INFO' CatalogConfiguration: GlueDataCatalogConfiguration: DatabaseARN: !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/kdastudio" DeployAsApplicationConfiguration: S3ContentLocation: BasePath: '/' BucketARN: !GetAtt KDAStreamingApplicationS3Bucket.Arn CustomArtifactsConfiguration: - ArtifactType: 'DEPENDENCY_JAR' MavenReference: ArtifactId: 'flink-connector-kafka_2.12' Version: '1.13.2' GroupId: 'org.apache.flink' ### ### SageMaker Studio ### StudioDomain: Type: AWS::SageMaker::Domain DependsOn: [SageMakerExecutionRole, SageMakerStudioSecurityGroup, PrivateSubnetOne, LabVPC] Properties: AppNetworkAccessType: VpcOnly AppSecurityGroupManagement: Customer AuthMode: IAM DefaultUserSettings: ExecutionRole: !GetAtt SageMakerExecutionRole.Arn SecurityGroups: [!Ref SageMakerStudioSecurityGroup] DomainName: "kda-msk-smfs-domain" SubnetIds: - !Ref PrivateSubnetOne VpcId: !Ref 'LabVPC' UserProfile: Type: AWS::SageMaker::UserProfile DependsOn: [StudioDomain, SageMakerExecutionRole, SageMakerStudioSecurityGroup] Properties: DomainId: !GetAtt StudioDomain.DomainId UserProfileName: !Ref SageMakerStudioUserProfileName UserSettings: ExecutionRole: !GetAtt SageMakerExecutionRole.Arn SecurityGroups: [!Ref SageMakerStudioSecurityGroup] StudioApp: Type: AWS::SageMaker::App DependsOn: [UserProfile, StudioDomain, SageMakerExecutionRole, SageMakerStudioSecurityGroup] Properties: AppName: default AppType: JupyterServer DomainId: !GetAtt StudioDomain.DomainId UserProfileName: !Ref SageMakerStudioUserProfileName ### ### CloudFormation Stack Outputs ### Outputs: KDAStreamingApplicationS3BucketName: Value: !Ref KDAStreamingApplicationS3Bucket Description: Name of the S3 bucket that hosts the KDA App AggregateFeatureStoreNameOutput: Value: !Ref CreditCardAggregateFeatureStoreName Description: The user entered value for CreditCardAggregateFeatureStoreName. AggregateBatchFeatureStoreNameOutput: Value: !Ref CreditCardAggregateBatchFeatureStoreName Description: The user entered value for CreditCardAggregateBatchFeatureStoreName. LambdaRoleARN: Description: Role for Lambda execution. Value: Fn::GetAtt: - LambdaRole - Arn Export: Name: Fn::Sub: LambdaRole PredictLambdaFunctionName: Value: Ref: PredictLambdaFunction PredictLambdaFunctionARN: Description: Lambda function ARN. Value: Fn::GetAtt: - PredictLambdaFunction - Arn Export: Name: Fn::Sub: PredictLambdaARN IngestLambdaFunctionName: Value: Ref: IngestLambdaFunction IngestLambdaFunctionARN: Description: Lambda function ARN. Value: Fn::GetAtt: - IngestLambdaFunction - Arn Export: Name: Fn::Sub: IngestLambdaARN