Description: Sample Amazon Kinesis Analytics UDFs for text translation and analytics using Amazon Comprehend and Amazon Translate (v0.1.0) Parameters: ArtifactsBucket: Description: The bucket name where UDF JAR is stored Type: String Default: aws-blogs-artifacts-public ArtifactsPrefix: Description: The key prefix inside S3 bucket where UDF JAR is present Type: String Default: artifacts/ML-4786 JarArtifact: Description: The key name inside S3 bucket which represents the UDF JAR Type: String Default: text-analytics-udfs-linear-1.0.jar PersonalCareReviewsArtifact: Description: The key name inside S3 bucket which represents the trimmed down PersonalCareReviews TSV file Type: String Default: amazon_reviews_us_Personal_Care_Appliances_trimmed.tsv GroceryReviewsArtifact: Description: The key name inside S3 bucket which represents the trimmed down GroceryReviews TSV file Type: String Default: amazon_reviews_us_Grocery_trimmed.tsv ZeroNotebookArtifact: Description: Apache Zeppelin notebook to load UDF JAR into Amazon Kinesis Analytics Studio Notebook environment Type: String Default: 0-UDF-notebook.zpln FirstNotebookArtifact: Description: Apache Zeppelin notebook to load sample dataset into Amazon Kinesis Data Stream Type: String Default: 1-data-load-notebook.zpln SecondNotebookArtifact: Description: Apache Zeppelin notebook to create view on streaming data using UDF Type: String Default: 2-base-SQL-notebook.zpln ThirdNotebookArtifact: Description: Apache Zeppelin notebook to create view on streaming data using UDF for sentiment detection use-case Type: String Default: 3-sentiments-notebook.zpln FourthNotebookArtifact: Description: Apache Zeppelin notebook to create view on streaming data using UDF for entity detection use-case Type: String Default: 4-entities-notebook.zpln FifthNotebookArtifact: Description: Apache Zeppelin notebook to create view on streaming data using UDF for entity redaction use-case Type: String Default: 5-redact-entities-notebook.zpln SixthNotebookArtifact: Description: Apache Zeppelin notebook to create view on streaming data using UDF for PII entity redaction use-case Type: String Default: 6-redact-pii-entities-notebook.zpln Resources: SourceKinesisStream: Type: 'AWS::Kinesis::Stream' Properties: Name: !Join - "_" - - "amazon_reviews_raw_stream" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" ShardCount: 1 GlueDatabase: Type: 'AWS::Glue::Database' Properties: CatalogId: Ref: 'AWS::AccountId' DatabaseInput: Name: !Join - "-" - - "amazon_reviews_db" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Description: Database for integration test KinesisAnalyticsRole: Type: AWS::IAM::Role Properties: RoleName: !Join - "-" - - "KDAExecutionRole" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - kinesisanalytics.amazonaws.com Action: - 'sts:AssumeRole' Policies: - PolicyName: !Join - "-" - - "kda_app_policy" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" PolicyDocument: Version: 2012-10-17 Statement: - Sid: ListCloudwatchLogGroups Effect: Allow Action: - logs:DescribeLogGroups Resource: - !Join ['', ["arn:aws:logs:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":log-group:*"]] - Sid: GlueReadDatabase Effect: Allow Action: - glue:GetDatabase Resource: - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":database/", !Ref GlueDatabase]] - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":catalog"]] - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":database/hive"]] - Sid: GlueReadConnection Effect: Allow Action: - glue:GetConnection Resource: - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":connection/*"]] - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":catalog"]] - Sid: GlueTable Effect: Allow Action: - glue:GetTable - glue:GetTables - glue:CreateTable - glue:UpdateTable - glue:GetPartitions - glue:DeleteTable - glue:GetUserDefinedFunction Resource: - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":table/", !Ref GlueDatabase, "/*"]] - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":database/", !Ref GlueDatabase]] - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":catalog"]] - !Join ['', ["arn:aws:glue:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":userDefinedFunction/*"]] - Sid: PutCloudwatchLogs Effect: Allow Action: - logs:PutLogEvents Resource: - !Join ['', ["arn:aws:logs:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":log-group:*"]] - Sid: VPCReadOnlyPermissions Effect: Allow Action: - ec2:DescribeVpcs - ec2:DescribeSubnets - ec2:DescribeSecurityGroups - ec2:DescribeDhcpOptions Resource: - !Join ['', ["*"]] - Sid: ReadWriteCode Effect: Allow Action: - s3:GetObject - s3:PutObject - s3:GetObjectVersion - s3:DeleteObject Resource: - !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts, "/*"]] - Sid: DescribeApplication Effect: Allow Action: - kinesisanalytics:DescribeApplication Resource: - !Join ['', ["arn:aws:kinesisanalytics:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":application/*"]] - Sid: AllowKinesisAccess Effect: Allow Action: - kinesis:Get* - kinesis:List* - kinesis:Describe* - kinesis:Put* Resource: - !Join ['', ["arn:aws:kinesis:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":stream/", !Ref SourceKinesisStream ]] - Sid: AllowComprehendAccess Effect: Allow Action: - comprehend:DetectEntities - comprehend:DetectSentiment - comprehend:DetectDominantLanguage - comprehend:DetectPiiEntities Resource: - !Join ['', ["arn:aws:comprehend:*:", !Ref AWS::AccountId, ":entity-recognizer-endpoint/*"]] - !Join ['', ["*"]] - Sid: AllowTranslateAccess Effect: Allow Action: - translate:TranslateText Resource: - !Join ['', ["*"]] LocalS3BucketForStudioArtifacts: Type: AWS::S3::Bucket DeletionPolicy: Delete Properties: BucketName: !Join - "-" - - "amazon-reviews-bucket" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" # Custom resource to copy source code JAR, trimmed-down tsv dataset & sample notebooks customised per stack-id from the published artifacts # bucket to the local LocalS3BucketForStudioArtifacts TriggerCopyArtifactsFunction: Type: AWS::CloudFormation::CustomResource DependsOn: - LocalS3BucketForStudioArtifacts Properties: ServiceToken: !GetAtt CopyArtifactsFunction.Arn ArtifactsBucket: !Ref ArtifactsBucket ArtifactsPrefix: !Ref ArtifactsPrefix JarArtifact: !Ref JarArtifact PersonalCareReviewsArtifact: !Ref PersonalCareReviewsArtifact GroceryReviewsArtifact: !Ref GroceryReviewsArtifact ZeroNotebookArtifact: !Ref ZeroNotebookArtifact FirstNotebookArtifact: !Ref FirstNotebookArtifact SecondNotebookArtifact: !Ref SecondNotebookArtifact ThirdNotebookArtifact: !Ref ThirdNotebookArtifact FourthNotebookArtifact: !Ref FourthNotebookArtifact FifthNotebookArtifact: !Ref FifthNotebookArtifact SixthNotebookArtifact: !Ref SixthNotebookArtifact StackCode: !Join - "" - - "" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" LocalS3BucketForStudioArtifacts: !Ref LocalS3BucketForStudioArtifacts LocalJARfile: 'text-analytics-udfs-linear-1.0.jar' LocalPersonalCareReviewsFile: 'amazon_reviews_us_Personal_Care_Appliances_trimmed.tsv' LocalGroceryReviewsFile: 'amazon_reviews_us_Grocery_trimmed.tsv' LocalZeroNotebookFile: '0-UDF-notebook.json' LocalFirstNotebookFile: '1-data-load-notebook.json' LocalSecondNotebookFile: '2-base-SQL-notebook.json' LocalThirdNotebookFile: '3-sentiments-notebook.json' LocalFourthNotebookFile: '4-entities-notebook.json' LocalFifthNotebookFile: '5-redact-entities-notebook.json' LocalSixthNotebookFile: '6-redact-pii-entities-notebook.json' CopySourceJARFunctionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Statement: - Action: 'sts:AssumeRole' Effect: Allow Principal: Service: lambda.amazonaws.com Version: 2012-10-17 ManagedPolicyArns: - !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' CopySourceJARFunctionPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyDocument: Statement: - Action: - 's3:PutObject' Effect: Allow Resource: - !Sub 'arn:aws:s3:::${LocalS3BucketForStudioArtifacts}/*' - Action: - 's3:GetObject' - 's3:ListBucket' Effect: Allow Resource: - !Sub 'arn:aws:s3:::${ArtifactsBucket}/*' - !Sub 'arn:aws:s3:::${ArtifactsBucket}' Version: 2012-10-17 PolicyName: CopySourceJARFunctionPolicy Roles: - !Ref CopySourceJARFunctionRole CopyArtifactsFunction: Type: 'AWS::Lambda::Function' DependsOn: - CopySourceJARFunctionPolicy - CopySourceJARFunctionRole Properties: Role: !GetAtt - CopySourceJARFunctionRole - Arn Handler: index.on_event Runtime: python3.9 Timeout: 900 Code: ZipFile: | import cfnresponse import os import boto3 import json def do_stuff(event): artifact_bucket = event['ResourceProperties']['ArtifactsBucket'] artifact_prefix = event['ResourceProperties']['ArtifactsPrefix'] local_bucket = event['ResourceProperties']['LocalS3BucketForStudioArtifacts'] s3 = boto3.resource('s3') copy_jar(s3, event, artifact_bucket, artifact_prefix, local_bucket) copy_tsv(s3, event, artifact_bucket, artifact_prefix, local_bucket) customize_and_copy_notebooks(s3, event, artifact_bucket, artifact_prefix, local_bucket) def copy_jar(s3, event, artifact_bucket, artifact_prefix, local_bucket): source_jar = event['ResourceProperties']['JarArtifact'] local_jar = event['ResourceProperties']['LocalJARfile'] print(f"Copying s3://{artifact_bucket}/{artifact_prefix}/{source_jar} to s3://{local_bucket}/artifacts/{local_jar}") copy_source= { 'Bucket' : artifact_bucket, 'Key': f"{artifact_prefix}/{source_jar}" } dest = s3.Bucket(local_bucket) dest.copy(copy_source, 'artifacts/'+local_jar) def copy_tsv(s3, event, artifact_bucket, artifact_prefix, local_bucket): personal_care_tsv_file = event['ResourceProperties']['PersonalCareReviewsArtifact'] local_personal_care_tsv_file = event['ResourceProperties']['LocalPersonalCareReviewsFile'] print(f"Copying s3://{artifact_bucket}/{artifact_prefix}/{personal_care_tsv_file} to s3://{local_bucket}/artifacts/{local_personal_care_tsv_file}") copy_source= { 'Bucket' : artifact_bucket, 'Key': f"{artifact_prefix}/{personal_care_tsv_file}" } dest = s3.Bucket(local_bucket) dest.copy(copy_source, 'artifacts/'+local_personal_care_tsv_file) grocery_tsv_file = event['ResourceProperties']['GroceryReviewsArtifact'] local_grocery_tsv_file = event['ResourceProperties']['LocalGroceryReviewsFile'] print(f"Copying s3://{artifact_bucket}/{artifact_prefix}/{grocery_tsv_file} to s3://{local_bucket}/artifacts/{local_grocery_tsv_file}") copy_source= { 'Bucket' : artifact_bucket, 'Key': f"{artifact_prefix}/{grocery_tsv_file}" } dest = s3.Bucket(local_bucket) dest.copy(copy_source, 'artifacts/'+local_grocery_tsv_file) def customize_and_copy_notebooks(s3, event, artifact_bucket, artifact_prefix, local_bucket): # zero_data_load_notebook stack_code = event['ResourceProperties']['StackCode'] region = os.environ['AWS_REGION'] zero_data_load_notebook_key = event['ResourceProperties']['ZeroNotebookArtifact'] local_zero_data_load_notebook_file = event['ResourceProperties']['LocalZeroNotebookFile'] zero_data_load_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ zero_data_load_notebook_key) zero_data_load_notebook_file = zero_data_load_notebook_object.get()['Body'].read().decode('utf-8-sig') zero_data_load_notebook = json.loads(zero_data_load_notebook_file) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{zero_data_load_notebook_key} to s3://{local_bucket}/artifacts/{local_zero_data_load_notebook_file}") zero_data_load_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_zero_data_load_notebook_file) zero_data_load_notebook_object_local.put(Body=(bytes(json.dumps(zero_data_load_notebook).encode('utf-8'))), ContentType='application/json') # one_UDF_notebook one_UDF_notebook_key = event['ResourceProperties']['FirstNotebookArtifact'] local_one_UDF_notebook_file = event['ResourceProperties']['LocalFirstNotebookFile'] one_UDF_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ one_UDF_notebook_key) one_UDF_notebook_file = one_UDF_notebook_object.get()['Body'].read().decode('utf-8-sig') one_UDF_notebook = json.loads(one_UDF_notebook_file) caps_region = region.replace("-","_").upper() one_UDF_notebook['paragraphs'][0]['text'] = one_UDF_notebook['paragraphs'][0]['text'].replace("us-east-1",region) one_UDF_notebook['paragraphs'][0]['text'] = one_UDF_notebook['paragraphs'][0]['text'].replace("amazon_reviews_raw_stream_91ff95a0","amazon_reviews_raw_stream_"+stack_code) one_UDF_notebook['paragraphs'][1]['text'] = one_UDF_notebook['paragraphs'][1]['text'].replace("amazon-reviews-bucket-91ff95a0","amazon-reviews-bucket-"+stack_code) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{one_UDF_notebook_key} to s3://{local_bucket}/artifacts/{local_one_UDF_notebook_file}") one_UDF_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_one_UDF_notebook_file) one_UDF_notebook_object_local.put(Body=(bytes(json.dumps(one_UDF_notebook).encode('utf-8'))), ContentType='application/json') # two_base_SQL_notebook two_base_SQL_notebook_key = event['ResourceProperties']['SecondNotebookArtifact'] local_two_base_SQL_notebook_file = event['ResourceProperties']['LocalSecondNotebookFile'] two_base_SQL_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ two_base_SQL_notebook_key) two_base_SQL_notebook_file = two_base_SQL_notebook_object.get()['Body'].read().decode('utf-8-sig') two_base_SQL_notebook = json.loads(two_base_SQL_notebook_file) two_base_SQL_notebook['paragraphs'][0]['text'] = two_base_SQL_notebook['paragraphs'][0]['text'].replace("us-east-1",region) two_base_SQL_notebook['paragraphs'][0]['text'] = two_base_SQL_notebook['paragraphs'][0]['text'].replace("amazon_reviews_raw_stream_91ff95a0","amazon_reviews_raw_stream_"+stack_code) two_base_SQL_notebook['paragraphs'][1]['text'] = two_base_SQL_notebook['paragraphs'][1]['text'].replace("amazon_reviews_raw_stream_91ff95a0","amazon_reviews_raw_stream_"+stack_code) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{two_base_SQL_notebook_key} to s3://{local_bucket}/artifacts/{local_two_base_SQL_notebook_file}") two_base_SQL_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_two_base_SQL_notebook_file) two_base_SQL_notebook_object_local.put(Body=(bytes(json.dumps(two_base_SQL_notebook).encode('utf-8'))), ContentType='application/json') # three_sentiments_notebook three_sentiments_notebook_key = event['ResourceProperties']['ThirdNotebookArtifact'] local_three_sentiments_notebook_file = event['ResourceProperties']['LocalThirdNotebookFile'] three_sentiments_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ three_sentiments_notebook_key) three_sentiments_notebook_file = three_sentiments_notebook_object.get()['Body'].read().decode('utf-8-sig') three_sentiments_notebook = json.loads(three_sentiments_notebook_file) three_sentiments_notebook['paragraphs'][2]['text'] = three_sentiments_notebook['paragraphs'][2]['text'].replace("amazon-reviews-bucket-91ff95a0","amazon-reviews-bucket-"+stack_code) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{three_sentiments_notebook_key} to s3://{local_bucket}/artifacts/{local_three_sentiments_notebook_file}") three_sentiments_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_three_sentiments_notebook_file) three_sentiments_notebook_object_local.put(Body=(bytes(json.dumps(three_sentiments_notebook).encode('utf-8'))), ContentType='application/json') # four_entities_notebook four_entities_notebook_key = event['ResourceProperties']['FourthNotebookArtifact'] local_four_entities_notebook_file = event['ResourceProperties']['LocalFourthNotebookFile'] four_entities_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ four_entities_notebook_key) four_entities_notebook_file = four_entities_notebook_object.get()['Body'].read().decode('utf-8-sig') four_entities_notebook = json.loads(four_entities_notebook_file) four_entities_notebook['paragraphs'][2]['text'] = four_entities_notebook['paragraphs'][2]['text'].replace("amazon-reviews-bucket-91ff95a0","amazon-reviews-bucket-"+stack_code) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{four_entities_notebook_key} to s3://{local_bucket}/artifacts/{local_four_entities_notebook_file}") four_entities_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_four_entities_notebook_file) four_entities_notebook_object_local.put(Body=(bytes(json.dumps(four_entities_notebook).encode('utf-8'))), ContentType='application/json') # five_redact_entities_notebook five_redact_entities_notebook_key = event['ResourceProperties']['FifthNotebookArtifact'] local_five_redact_entities_notebook_file = event['ResourceProperties']['LocalFifthNotebookFile'] five_redact_entities_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ five_redact_entities_notebook_key) five_redact_entities_notebook_file = five_redact_entities_notebook_object.get()['Body'].read().decode('utf-8-sig') five_redact_entities_notebook = json.loads(five_redact_entities_notebook_file) five_redact_entities_notebook['paragraphs'][2]['text'] = five_redact_entities_notebook['paragraphs'][2]['text'].replace("amazon-reviews-bucket-91ff95a0","amazon-reviews-bucket-"+stack_code) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{five_redact_entities_notebook_key} to s3://{local_bucket}/artifacts/{local_five_redact_entities_notebook_file}") five_redact_entities_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_five_redact_entities_notebook_file) five_redact_entities_notebook_object_local.put(Body=(bytes(json.dumps(five_redact_entities_notebook).encode('utf-8'))), ContentType='application/json') # six_redact_pii_entities_notebook six_redact_pii_entities_notebook_key = event['ResourceProperties']['SixthNotebookArtifact'] local_six_redact_pii_entities_notebook_file = event['ResourceProperties']['LocalSixthNotebookFile'] six_redact_pii_entities_notebook_object = s3.Object(artifact_bucket, artifact_prefix +'/'+ six_redact_pii_entities_notebook_key) six_redact_pii_entities_notebook_file = six_redact_pii_entities_notebook_object.get()['Body'].read().decode('utf-8-sig') six_redact_pii_entities_notebook = json.loads(six_redact_pii_entities_notebook_file) six_redact_pii_entities_notebook['paragraphs'][2]['text'] = six_redact_pii_entities_notebook['paragraphs'][2]['text'].replace("amazon-reviews-bucket-91ff95a0","amazon-reviews-bucket-"+stack_code) print(f"Copying customized s3://{artifact_bucket}/{artifact_prefix}/{six_redact_pii_entities_notebook_key} to s3://{local_bucket}/artifacts/{local_six_redact_pii_entities_notebook_file}") six_redact_pii_entities_notebook_object_local = s3.Object(local_bucket, 'artifacts/'+local_six_redact_pii_entities_notebook_file) six_redact_pii_entities_notebook_object_local.put(Body=(bytes(json.dumps(six_redact_pii_entities_notebook).encode('utf-8'))), ContentType='application/json') def on_event(event, context): print(event) responseData = {} status = cfnresponse.SUCCESS if event['RequestType'] != 'Delete': try: do_stuff(event) except Exception as e: print(e) responseData["Error"] = f"Exception thrown: {e}" status = cfnresponse.FAILED responseData['Data'] = "Success" cfnresponse.send(event, context, status, responseData) StudioApplication: Type: AWS::KinesisAnalyticsV2::Application DependsOn: - KinesisAnalyticsRole - TriggerCopyArtifactsFunction Properties: ApplicationName: !Join - "-" - - "amazon_reviews_studio_application" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" ApplicationMode: INTERACTIVE RuntimeEnvironment: ZEPPELIN-FLINK-2_0 ServiceExecutionRole: !GetAtt KinesisAnalyticsRole.Arn ApplicationConfiguration: FlinkApplicationConfiguration: ParallelismConfiguration: ConfigurationType: CUSTOM Parallelism: 4 ParallelismPerKPU: 1 ZeppelinApplicationConfiguration: MonitoringConfiguration: LogLevel: INFO DeployAsApplicationConfiguration: S3ContentLocation: BasePath: 'zeppelin-code/' BucketARN: !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts]] CatalogConfiguration: GlueDataCatalogConfiguration: DatabaseARN: 'Fn::Sub': >- arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabase} CustomArtifactsConfiguration: - ArtifactType: DEPENDENCY_JAR MavenReference: GroupId: org.apache.flink ArtifactId: flink-sql-connector-kinesis_2.12 Version: 1.13.2 - ArtifactType: UDF S3ContentLocation: BucketARN: !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts]] FileKey: 'artifacts/text-analytics-udfs-linear-1.0.jar' # Start Studio Instance & create a CORS configuration on s3 for notebook domain (generated once notebook instance is in RUNNING state) HelperLambdaForKDAStudioAppStartUpRole: Type: AWS::IAM::Role DependsOn: - StudioApplication Properties: RoleName: !Join - "-" - - "HelperLambdaRoleStudioStart" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' Policies: - PolicyName: !Join - "-" - - "helper_lambda_policy" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - kinesisanalytics:StartApplication - kinesisanalytics:CreateApplicationPresignedUrl - kinesisanalytics:DescribeApplication Resource: - !Join ['', ["arn:aws:kinesisanalytics:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":application/", !Ref StudioApplication]] - Effect: Allow Action: - s3:PutBucketCors Resource: - !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts]] - !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts, "/*"]] Path: / HelperLambdaForKDAStudioAppStartUp: Type: AWS::Lambda::Function DependsOn: - HelperLambdaForCleanupBucketOnStackDeleteRole Properties: Code: ZipFile: | import json import boto3 import cfnresponse import time client = boto3.client('kinesisanalyticsv2') def lambda_handler(event, context): responseData = {} try: if (event['RequestType'] == 'Create'): app_name = event['ResourceProperties']['AppName'] response = client.start_application(ApplicationName=app_name) response = client.describe_application(ApplicationName=app_name) while response['ApplicationDetail']['ApplicationStatus'] != 'RUNNING': print('status is '+response['ApplicationDetail']['ApplicationStatus']+' ; not yet ready, waiting') time.sleep(10) response = client.describe_application(ApplicationName=app_name) #trigger only when Studio application is in Running state modifyCORSPolicyOfBucket(client, app_name, event['ResourceProperties']['LocalS3Bucket']) responseData['Data'] = "Success" cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData) except Exception as e: print(e) responseData['Data'] = "Failed" cfnresponse.send(event, context, cfnresponse.FAILED, responseData) def modifyCORSPolicyOfBucket(client, app_name, local_bucket): response = client.create_application_presigned_url( ApplicationName=app_name, UrlType='ZEPPELIN_UI_URL' ) response_url = response['AuthorizedUrl'] sub_str = "amazonaws.com" kda_app_domain = str(response_url[:response_url.index(sub_str) + len(sub_str)]) # Define the configuration rules cors_configuration = { 'CORSRules': [{ 'AllowedHeaders': ['*'], 'AllowedMethods': ['GET'], 'AllowedOrigins': [kda_app_domain], 'ExposeHeaders': ['GET'] }] } # Set the CORS configuration s3 = boto3.client('s3') s3.put_bucket_cors(Bucket=local_bucket,CORSConfiguration=cors_configuration) FunctionName: !Join - "-" - - "helper_lambda_for_kda_app_start" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" MemorySize: 128 Runtime: python3.9 Description: Lambda function that executes during stack creation and deletion to achieve CleanupBucketOnStackDelete Handler: index.lambda_handler Role: !GetAtt HelperLambdaForKDAStudioAppStartUpRole.Arn Timeout: 900 StartKDAStudioApplication: Type: Custom::startKDAStudioApp DependsOn: - HelperLambdaForKDAStudioAppStartUp - StudioApplication Properties: ServiceToken: !GetAtt HelperLambdaForKDAStudioAppStartUp.Arn AppName: !Ref StudioApplication LocalS3Bucket: !Ref LocalS3BucketForStudioArtifacts # empty local bucket on stack deletion CleanupBucketOnStackDelete: Type: Custom::cleanupbucket Properties: ServiceToken: !GetAtt HelperLambdaForCleanupBucketOnStackDelete.Arn BucketName: !Ref LocalS3BucketForStudioArtifacts HelperLambdaForCleanupBucketOnStackDelete: Type: AWS::Lambda::Function DependsOn: - LocalS3BucketForStudioArtifacts - HelperLambdaForCleanupBucketOnStackDeleteRole Properties: Code: ZipFile: | import json import boto3 import cfnresponse def lambda_handler(event, context): try: bucket = event['ResourceProperties']['BucketName'] if event['RequestType'] == 'Delete': s3 = boto3.resource('s3') bucket = s3.Bucket(bucket) for obj in bucket.objects.filter(): s3.Object(bucket.name, obj.key).delete() sendResponseCfn(event, context, cfnresponse.SUCCESS) except Exception as e: print(e) sendResponseCfn(event, context, cfnresponse.FAILED) def sendResponseCfn(event, context, responseStatus): responseValue = context.log_stream_name responseData = {} responseData['Data'] = responseValue cfnresponse.send(event, context, responseStatus, responseData, context.log_stream_name) FunctionName: !Join - "-" - - "helper_lambda_for_cleanup_bucket_on_stack_delete" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" MemorySize: 128 Runtime: python3.9 Description: Lambda function that executes during stack creation and deletion to achieve CleanupBucketOnStackDelete Handler: index.lambda_handler Role: !GetAtt HelperLambdaForCleanupBucketOnStackDeleteRole.Arn Timeout: 900 HelperLambdaForCleanupBucketOnStackDeleteRole: Type: AWS::IAM::Role DependsOn: - LocalS3BucketForStudioArtifacts Properties: RoleName: !Join - "-" - - "HelperLambdaRoleStackDelete" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' Policies: - PolicyName: !Join - "-" - - "helper_lambda_policy" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:DeleteObjectVersion - s3:DeleteObject - s3:GetObject - s3:ListBucket Resource: - !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts]] - !Join ['', ["arn:aws:s3:::", !Ref LocalS3BucketForStudioArtifacts, "/*"]] Path: / Outputs: GlueDatabase: Description: The Glue database used in this example Value: !Ref GlueDatabase KinesisStream: Description: The Kinesis data stream used in this example Value: !Ref SourceKinesisStream S3Bucket: Description: The S3 bucket used in this example to store artifacts Value: !Ref LocalS3BucketForStudioArtifacts StudioApplication: Description: The Kinesis Data Analytics Studio instance used in this example Value: !Ref StudioApplication