### # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # Copyright Amazon.com, Inc. and its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT ###### from constructs import Construct import os import aws_cdk.aws_s3 as s3 import aws_cdk.aws_s3_notifications as s3n import aws_cdk.aws_stepfunctions as sfn import aws_cdk.aws_lambda as lambda_ import aws_cdk.aws_stepfunctions_tasks as tasks import aws_cdk.aws_iam as iam from aws_cdk import (CfnOutput, RemovalPolicy, Stack, Duration) import amazon_textract_idp_cdk_constructs as tcdk class SimpleAsyncWorkflow(Stack): def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) script_location = os.path.dirname(__file__) s3_upload_prefix = "uploads" s3_output_prefix = "textract-output" s3_temp_output_prefix = "textract-temp-output" # BEWARE! This is a demo/POC setup, remove the auto_delete_objects=True and document_bucket = s3.Bucket(self, "TextractSimpleAsyncWorkflow", auto_delete_objects=True, removal_policy=RemovalPolicy.DESTROY) s3_output_bucket = document_bucket.bucket_name workflow_name = "SimpleAsyncWorkflow" decider_task = tcdk.TextractPOCDecider( self, f"{workflow_name}-Decider", ) textract_async_task = tcdk.TextractGenericAsyncSfnTask( self, "TextractAsync", s3_output_bucket=s3_output_bucket, s3_temp_output_prefix=s3_temp_output_prefix, integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN, lambda_log_level="DEBUG", timeout=Duration.hours(24), input=sfn.TaskInput.from_object({ "Token": sfn.JsonPath.task_token, "ExecutionId": sfn.JsonPath.string_at('$$.Execution.Id'), "Payload": sfn.JsonPath.entire_payload, }), result_path="$.textract_result") lambda_textract_post_processing_function = lambda_.DockerImageFunction( self, "LambdaTextractPostProcessing", code=lambda_.DockerImageCode.from_image_asset( os.path.join(script_location, '../lambda/textractpostprocessor')), memory_size=2048, timeout=Duration.minutes(15), environment={"SKIP_PAGES": "CONTENTS,TABLE OF CONTENTS,FOREWORDS, ANNEXES,Table of Contents,ACRONYMS, ABBREVIATIONS", "NO_LINES_HEADER": "3", "NO_LINES_FOOTER": "10", "FILTER_PARA_WORDS":"10" }) lambda_textract_post_processing_function.add_to_role_policy( iam.PolicyStatement(actions=['s3:Get*', 's3:List*','s3:Put*'], resources=["*"])) #document_bucket.bucketArn,document_bucket.arnForObjects('*')])) textractAsyncCallTask = tasks.LambdaInvoke(self, "TextractPostProcessorTask", lambda_function=lambda_textract_post_processing_function, output_path="$.Payload", ) async_chain = sfn.Chain.start(textract_async_task).next( textractAsyncCallTask) workflow_chain = sfn.Chain \ .start(decider_task) \ .next(async_chain) # GENERIC state_machine = sfn.StateMachine(self, workflow_name, definition=workflow_chain) lambda_step_start_step_function = lambda_.DockerImageFunction( self, "LambdaStartStepFunctionGeneric", code=lambda_.DockerImageCode.from_image_asset( os.path.join(script_location, '../lambda/startstepfunction')), memory_size=128, environment={"STATE_MACHINE_ARN": state_machine.state_machine_arn}) lambda_step_start_step_function.add_to_role_policy( iam.PolicyStatement(actions=['states:StartExecution'], resources=[state_machine.state_machine_arn])) document_bucket.add_event_notification( s3.EventType.OBJECT_CREATED, s3n.LambdaDestination( lambda_step_start_step_function), #type: ignore s3.NotificationKeyFilter(prefix=s3_upload_prefix)) # OUTPUT CfnOutput( self, "DocumentUploadLocation", value=f"s3://{document_bucket.bucket_name}/{s3_upload_prefix}/") CfnOutput( self, "StartStepFunctionLambdaLogGroup", value=lambda_step_start_step_function.log_group.log_group_name) current_region = Stack.of(self).region CfnOutput( self, 'StepFunctionFlowLink', value= f"https://{current_region}.console.aws.amazon.com/states/home?region={current_region}#/statemachines/view/{state_machine.state_machine_arn}" )