AWSTemplateFormatVersion: '2010-09-09' Description: AWS KDA Studio UDF Automation script Parameters: KDAKinesisInputStreamName: Type: String MinLength: "1" Description: "Kinesis Input stream name." Default: "KinesisUDFSampleInputStream" KDAKinesisOutputStreamName: Type: String MinLength: "1" Description: "Kinesis Output stream name." Default: "KinesisUDFSampleOutputStream" KDADatabaseName: Type: String MinLength: "1" Description: "Database Name for KDA Application." Default: "udfkdadb" Resources: ArtifactBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled BuildPipeline: Type: AWS::CodePipeline::Pipeline Properties: RoleArn: !GetAtt CodePipelineServiceRole.Arn Stages: - Name: Source Actions: - Name: KDAUDFApplication ActionTypeId: Category: Source Owner: AWS Version: "1" Provider: S3 OutputArtifacts: - Name: KDAUDFApplicationSource Configuration: S3Bucket: !Ref ArtifactBucket S3ObjectKey: sources/amazon-kda-udf-application.zip RunOrder: 1 - Name: BuildKDAUDFApplication Actions: - Name: BuildKDAUDFApplication InputArtifacts: - Name: KDAUDFApplicationSource OutputArtifacts: - Name: BuildKDAUDFApplicationOutput ActionTypeId: Category: Build Owner: AWS Version: "1" Provider: CodeBuild Configuration: ProjectName: !Ref BuildKDAUDFApplicationProject PrimarySource: KDAUDFApplicationSource RunOrder: 1 - Name: Copy Actions: - Name: CopyBuildKDAUDFApplication InputArtifacts: - Name: BuildKDAUDFApplicationOutput ActionTypeId: Category: Deploy Owner: AWS Version: "1" Provider: S3 Configuration: BucketName: !Ref ArtifactBucket Extract: true RunOrder: 1 - Name: NotifyCloudformation ActionTypeId: Category: Invoke Owner: AWS Version: "1" Provider: Lambda Configuration: FunctionName: !Ref NotifyWaitConditionLambdaFunction RunOrder: 2 ArtifactStore: Type: S3 Location: !Ref ArtifactBucket BuildCompleteWaitCondition: Type: AWS::CloudFormation::WaitCondition Properties: Count: 1 Handle: !Ref BuildCompleteWaitHandle Timeout: "900" BuildCompleteWaitHandle: Type: AWS::CloudFormation::WaitConditionHandle BuildKDAUDFApplicationProject: Type: AWS::CodeBuild::Project Properties: ServiceRole: !GetAtt CodeBuildServiceRole.Arn Artifacts: Type: CODEPIPELINE Environment: Type: LINUX_CONTAINER ComputeType: BUILD_GENERAL1_LARGE Image: aws/codebuild/java:openjdk-11 Source: Type: CODEPIPELINE BuildSpec: !Sub | version: 0.2 phases: build: commands: - 'cd amazon-kinesis-data-analytics-examples-* || :' - 'cd kda-udf-sample || :' - mvn clean package -B -Dflink.version=1.13.2 artifacts: files: - target/MaskPhoneNumber-1.0.jar - amazon-kinesis-data-analytics-examples-*/kda-udf-sample/target/MaskPhoneNumber-1.0.jar discard-paths: yes TimeoutInMinutes: 5 NotifyWaitConditionLambdaFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: !Sub | import json import boto3 import urllib.request code_pipeline = boto3.client('codepipeline') def handler(event, context): job_id = event['CodePipeline.job']['id'] url = '${BuildCompleteWaitHandle}' headers = { "Content-Type": "" } data = { "Status": "SUCCESS", "Reason": "Compilation Succeeded", "UniqueId": "KdaUdfBuildProject", "Data": "Compilation Succeeded" } try: req = urllib.request.Request(url, headers=headers, data=bytes(json.dumps(data), encoding="utf-8"), method='PUT') response = urllib.request.urlopen(req) code_pipeline.put_job_success_result(jobId=job_id) except Exception as e: print("failure: " + str(e)) code_pipeline.put_job_failure_result(jobId=job_id, failureDetails={'message': str(e), 'type': 'JobFailed'}) Runtime: python3.7 Timeout: 10 DownloadSources: Type: Custom::DownloadSources Properties: ServiceToken: !GetAtt DownloadSourcesFunction.Arn DownloadSourcesFunction: Type: AWS::Lambda::Function Properties: Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: !Sub | import boto3 import cfnresponse from urllib.request import urlopen def handler(event, context): s3 = boto3.client('s3') kda_udf_application_source = urlopen('https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/archive/refs/heads/master.zip') s3.put_object(Bucket='${ArtifactBucket}',Key='sources/amazon-kda-udf-application.zip',Body=kda_udf_application_source.read()) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) Runtime: python3.7 Timeout: 60 CodePipelineServiceRole: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - codepipeline.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:GetObjectVersion - s3:GetBucketVersioning - Resource: - !Sub ${BuildKDAUDFApplicationProject.Arn} Effect: Allow Action: - codebuild:StartBuild - codebuild:BatchGetBuilds - Resource: !Sub ${NotifyWaitConditionLambdaFunction.Arn} Effect: Allow Action: - lambda:InvokeFunction - lambda:InvokeAsync CodeBuildServiceRole: Type: AWS::IAM::Role Properties: Path: / AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - codebuild.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Resource: - arn:aws:logs:*:*:log-group:/aws/codebuild/* - arn:aws:logs:*:*:log-group:/aws/codebuild/*:log-stream:* Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Effect: Allow Action: - s3:GetObject - s3:PutObject - s3:GetObjectVersion - s3:ListBucket LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Resource: "*" Effect: Allow Action: - codepipeline:PutJobSuccessResult - codepipeline:PutJobFailureResult - Resource: - arn:aws:logs:*:*:log-group:/aws/lambda/* - arn:aws:logs:*:*:log-group:/aws/lambda/*:log-stream:* Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - Resource: - !Sub arn:aws:s3:::${ArtifactBucket} - !Sub arn:aws:s3:::${ArtifactBucket}/* Effect: Allow Action: - s3:PutObject Database: Type: AWS::Glue::Database Properties: CatalogId: !Ref 'AWS::AccountId' DatabaseInput: Name: !Ref KDADatabaseName Description: Database for KDA Application Source and Target Tables MyStream1: Type: AWS::Kinesis::Stream Properties: Name: !Ref KDAKinesisInputStreamName RetentionPeriodHours: 168 ShardCount: 1 MyStream2: Type: AWS::Kinesis::Stream Properties: Name: !Ref KDAKinesisOutputStreamName RetentionPeriodHours: 168 ShardCount: 1 KinesisAnalyticsRole: Type: AWS::IAM::Role Properties: RoleName: !Sub 'KinesisUDF-${AWS::StackName}-role' AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - kinesisanalytics.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: !Sub 'KinesisIA-${AWS::StackName}-policy' PolicyDocument: Version: '2012-10-17' Statement: - Sid: LogGroupSid Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents - logs:AssociateKmsKey Resource: - arn:aws:logs:*:*:/aws/kinesis-analytics/* - Sid: GlueTableSid Effect: Allow Action: - glue:GetConnection - glue:GetTable - glue:GetTables - glue:CreateTable - glue:UpdateTable - glue:GetUserDefinedFunction - glue:GetPartitions - glue:DeleteTable Resource: - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:connection/*' - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/*' - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${Database}' - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/hive' - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog' - '*' - Sid: GlueDatabaseSid Effect: Allow Action: - glue:GetDatabase Resource: - '*' - Sid: KinesisEfoConsumer Effect: Allow Action: - kinesis:DescribeStreamConsumer - kinesis:SubscribeToShard Resource: - !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${MyStream1}/consumer/*' - !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${MyStream2}/consumer/*' - Sid: KinesisStreamSid Effect: Allow Action: - kinesis:GetShardIterator - kinesis:GetRecords - kinesis:PutRecords - kinesis:PutRecord - kinesis:DescribeStream - kinesis:DescribeStreamSummary - kinesis:RegisterStreamConsumer - kinesis:DeregisterStreamConsumer - kinesis:ListShards Resource: - !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${MyStream1}' - !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${MyStream2}' - Sid: s3UDFReadWriteArtifact Effect: Allow Action: - s3:GetObject - s3:GetObjectVersion - s3:putObject Resource: - !Sub 'arn:aws:s3:::${ArtifactBucket}/MaskPhoneNumber-1.0.jar' - !Sub 'arn:aws:s3:::${ArtifactBucket}/*' - Sid: StreamingKinesisAnalyticsApplication Effect: Allow Action: - kinesisanalytics:DescribeApplication Resource: - '*' StudioApplication: Type: AWS::KinesisAnalyticsV2::Application DependsOn: BuildCompleteWaitCondition Properties: ApplicationName: !Sub 'KinesisDataAnalyticsStudio${AWS::StackName}' ApplicationDescription: Kinesis Flink Sql UDF Demo RuntimeEnvironment: ZEPPELIN-FLINK-2_0 ApplicationMode: INTERACTIVE ServiceExecutionRole: !GetAtt 'KinesisAnalyticsRole.Arn' ApplicationConfiguration: EnvironmentProperties: PropertyGroups: - PropertyGroupId: 'KinesisStreams' PropertyMap: INPUT_STREAM_NAME: !Ref MyStream1 OUTPUT_STREAM_NAME: !Ref MyStream2 AWS_REGION: !Ref AWS::Region ZeppelinApplicationConfiguration: CatalogConfiguration: GlueDataCatalogConfiguration: DatabaseARN: !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${Database}' CustomArtifactsConfiguration: - ArtifactType: DEPENDENCY_JAR MavenReference: GroupId: org.apache.flink ArtifactId: flink-sql-connector-kinesis_2.12 Version: 1.13.2 - ArtifactType: "UDF" S3ContentLocation: BucketARN: !Sub 'arn:aws:s3:::${ArtifactBucket}' FileKey: 'MaskPhoneNumber-1.0.jar' FlinkApplicationConfiguration: MonitoringConfiguration: ConfigurationType: 'CUSTOM' MetricsLevel: 'APPLICATION' LogLevel: 'INFO' ParallelismConfiguration: ConfigurationType: 'CUSTOM' Parallelism: 8 ParallelismPerKPU: 1 StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: StudioApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonKinesisAnalyticsFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: / StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)}) StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref StudioApplication Outputs: KinesisDataInputStream: Description: Input Kinesis stream Value: !Ref MyStream1 KinesisDataOutputStream: Description: Output Kinesis stream Value: !Ref MyStream2 KinesisDataAnalyticsStudio: Value: !Ref 'StudioApplication'