from typing import Any from aws_cdk.aws_glue import CfnCrawler, CfnDatabase from aws_cdk.aws_glue_alpha import Code as glue_code from aws_cdk.aws_glue_alpha import GlueVersion, JobExecutable, PythonVersion from aws_cdk.aws_iam import ( Effect, ManagedPolicy, PolicyStatement, Role, ServicePrincipal, ) from aws_cdk.aws_lambda import Code as lambda_code from aws_cdk.aws_lambda import Runtime from aws_cdk.aws_s3 import Bucket from aws_ddk_core import ( BaseStack, DataPipeline, GlueTransformStage, S3EventStage, SqsToLambdaStage, ) from constructs import Construct class FileStandardizationPipelineStack(BaseStack): def __init__( self, scope: Construct, id: str, environment_id: str, **kwargs: Any ) -> None: super().__init__(scope, id, environment_id=environment_id, **kwargs) self._environment_id = environment_id self._ddk_bucket = self._create_bucket( bucket_name=f"ddk-{self._environment_id}-filestandardization-bucket-{self.account}", event_bridge_status=True, ) self._s3_event_stage = S3EventStage( self, id="s3_event_stage", event_names=["Object Created"], bucket=self._ddk_bucket, key_prefix="input_files/", ) self._glue_transform_stage = self._create_glue_transform_stage( database_name="ddk_pattern_database", crawler_role_name="ddk_pattern_glue_crawler_role", ) self._sqs_to_lambda_stage = self._create_sqs_to_lambda_stage() self._ddk_data_pipeline = ( DataPipeline( self, id="file-standardization-pipeline", name="file-standardization-pipeline", description="file standardization pipeline using aws-ddk", ) .add_stage(stage=self._s3_event_stage) .add_stage(stage=self._sqs_to_lambda_stage) .add_stage(stage=self._glue_transform_stage, skip_rule=True) ) def _create_bucket(self, bucket_name, event_bridge_status): s3_bucket = Bucket( self, id="s3_bucket", bucket_name=bucket_name, event_bridge_enabled=event_bridge_status, ) s3_bucket.add_to_resource_policy( PolicyStatement( sid="AllowGlueActions", effect=Effect.ALLOW, principals=[ServicePrincipal(service="glue.amazonaws.com")], actions=[ "s3:Put*", "s3:Get*", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", "s3:ListBucketMultipartUploads", ], resources=[ s3_bucket.bucket_arn, f"{s3_bucket.bucket_arn}/*", ], conditions={ "StringEquals": { "aws:SourceAccount": self.account, } }, ) ) return s3_bucket def _create_glue_transform_stage(self, database_name, crawler_role_name): CfnDatabase( self, id="glue_database", catalog_id=self.account, database_input=CfnDatabase.DatabaseInputProperty( description="glue database created by ddk", name=database_name ), ) glue_crawler_role = Role( self, id="glue_crawler_role", role_name=crawler_role_name, assumed_by=ServicePrincipal("glue.amazonaws.com"), managed_policies=[ ManagedPolicy.from_aws_managed_policy_name( "service-role/AWSGlueServiceRole" ) ], ) glue_crawler_role.add_to_policy( PolicyStatement( actions=["s3:GetObject", "s3:PutObject"], resources=[f"{self._ddk_bucket.bucket_arn}/output/*"], ) ) glue_transform_stage = GlueTransformStage( self, id="glue_transform_stage", job_props={ "max_concurrent_runs": 100, "job_name": "ddk_pattern_glue_job", "executable": JobExecutable.python_etl( glue_version=GlueVersion.V3_0, python_version=PythonVersion.THREE, script=glue_code.from_asset( "./file_standardization_pipeline/src/file_standardization/glue_script.py" ), ), }, job_run_args={ "--additional-python-modules": "pyarrow==3,awswrangler", "--input_s3_path.$": "$.input_s3_path", "--target_s3_path.$": "$.target_s3_path", }, crawler_props={ "database_name": database_name, "targets": CfnCrawler.TargetsProperty( s3_targets=[ CfnCrawler.S3TargetProperty( path=f"s3://{self._ddk_bucket.bucket_name}/output" ) ] ), "role": glue_crawler_role.role_arn, }, ) glue_transform_stage.state_machine.add_to_role_policy( PolicyStatement( effect=Effect.ALLOW, actions=["glue:StartCrawler"], resources=[ f"arn:aws:glue:*:*:crawler/{glue_transform_stage.crawler.ref}" ], ) ) self._ddk_bucket.grant_read_write(glue_transform_stage.glue_job) return glue_transform_stage def _create_sqs_to_lambda_stage(self): sqs_to_lambda_stage = SqsToLambdaStage( self, id="lambda_to_sqs_stage", lambda_function_props={ "code": lambda_code.from_asset( "./file_standardization_pipeline/src/invoke_step_function" ), "handler": "handler.lambda_handler", "runtime": Runtime.PYTHON_3_9, "environment": { "STEPFUNCTION": self._glue_transform_stage.state_machine.state_machine_arn }, }, batch_size=1, ) sqs_to_lambda_stage.function.add_to_role_policy( PolicyStatement( effect=Effect.ALLOW, actions=["states:*"], resources=[self._glue_transform_stage.state_machine.state_machine_arn], ) ) return sqs_to_lambda_stage