# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: 2010-09-09 Description: | SageMaker Project for automated ingestion of a dataset from an S3 bucket to Feature Store. This template creates a project to start a SageMaker pipeline, run data wrangler data processing, and ingest the processed data into a feature group. Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Pipeline Data Parameters: - PipelineNamePrefix - PipelineDescription - Label: default: Input Data Location Parameters: - S3DataPrefix - Label: default: Transformation Parameters: - DataWranglerFlowUrl - DataWranglerOutputName - Label: default: Destination Parameters: - FeatureGroupName - Label: default: Permissions Parameters: - LambdaExecutionRole ParameterLabels: PipelineNamePrefix: default: Pipeline name prefix PipelineDescription: default: Pipeline description - optional S3DataPrefix: default: S3 Data prefix to monitor for new data DataWranglerFlowUrl: default: Data Wrangler flow URL DataWranglerOutputName: default: Data Wrangler output name (node_id.default) FeatureGroupName: default: Feature group name to ingest data LambdaExecutionRole: default: Execution role for the Lambda function Parameters: SageMakerProjectName: Type: String Description: Name of the project MinLength: 1 MaxLength: 32 AllowedPattern: ^[a-zA-Z](-*[a-zA-Z0-9])* SageMakerProjectId: Type: String Description: Service generated Id of the project. PipelineNamePrefix: Type: String MaxLength: 241 # 256 - len(SageMakerProjectId) - 1 Default: 's3-fs-ingest-pipeline' AllowedPattern: ^[a-zA-Z0-9](-*[a-zA-Z0-9])* Description: Name of your data processing pipeline. The full name has a format - PipelineDescription: Type: String MaxLength: 3072 Default: 'Feature Store ingestion pipeline' Description: Description of your data processing pipeline DataWranglerFlowUrl: Type: String Description: S3 URL of the data wrangler .flow file DataWranglerOutputName: Type: String Description: Output name must point to a correct node's ID in the flow file. The output name has a format .default S3DataPrefix: Type: String Description: S3 prefix pointing to a S3 location where a data file is uploaded to trigger the pipeline (without leading s3:// part) FeatureGroupName: Type: String MaxLength: 64 AllowedPattern: ^[a-zA-Z0-9](-*[a-zA-Z0-9])* Description: Name of the feature group name where the data will be ingested LambdaExecutionRole: Type: String Default: 'Auto' Description: Execution role for the lambda function. If Auto, a new IAM role will be created Conditions: CreateLambdaExecutionRoleCondition: !Equals [ !Ref LambdaExecutionRole, 'Auto' ] Resources: CodePipelineArtifactsBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: BucketName: !Sub sagemaker-cp-${SageMakerProjectName}-${SageMakerProjectId} # 12+32+15=59 chars max/ 63 allowed AccessControl: Private PublicAccessBlockConfiguration: BlockPublicAcls: TRUE BlockPublicPolicy: TRUE IgnorePublicAcls: TRUE RestrictPublicBuckets: TRUE BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' DataLoadPipelineCodeCommitEventRule: Type: AWS::Events::Rule Properties: # Max length allowed: 64 Name: !Sub sagemaker-${SageMakerProjectName}-${SageMakerProjectId}-build # max: 10+33+15+5=63 chars Description: "Rule to start a pipeline upsert when data pipeline CodeCommit repository is updated" EventPattern: source: - "aws.codecommit" detail-type: - "CodeCommit Repository State Change" resources: - !GetAtt DataLoadPipelineCodeCommitRepository.Arn detail: referenceType: - "branch" referenceName: - "main" State: "ENABLED" Targets: - Arn: !Sub 'arn:${AWS::Partition}:codepipeline:${AWS::Region}:${AWS::AccountId}:${DataLoadPipelineBuildPipeline}' RoleArn: !Sub 'arn:${AWS::Partition}:iam::${AWS::AccountId}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole' Id: !Sub codecommit-${SageMakerProjectName}-pipelinebuild DataLoadPipelineCodeCommitRepository: Type: AWS::CodeCommit::Repository Properties: # Max allowed length: 100 chars RepositoryName: !Sub sagemaker-${SageMakerProjectName}-${SageMakerProjectId}-data-pipeline # max: 10+33+15+14=72 RepositoryDescription: !Sub SageMaker data transformation and ingestion pipeline building infrastructure as code for the project ${SageMakerProjectName} Code: S3: Bucket: < S3_CFN_STAGING_BUCKET > Key: amazon-sagemaker-reusable-components/seed-code/s3-fs-ingestion-v1.0.zip BranchName: main DataLoadPipelineBuildProject: Type: AWS::CodeBuild::Project Properties: # Max length: 255 chars Name: !Sub sagemaker-${SageMakerProjectName}-${SageMakerProjectId}-pipelinebuild # max: 10+33+15+13=71 Description: Pulls the code from data pipeline CodeCommit repository and upserts the SageMaker Pipeline ServiceRole: !Sub 'arn:${AWS::Partition}:iam::${AWS::AccountId}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole' Artifacts: Type: CODEPIPELINE Environment: Type: LINUX_CONTAINER ComputeType: BUILD_GENERAL1_SMALL Image: aws/codebuild/amazonlinux2-x86_64-standard:3.0 EnvironmentVariables: - Name: SAGEMAKER_PROJECT_NAME Value: !Ref SageMakerProjectName - Name: SAGEMAKER_PROJECT_ID Value: !Ref SageMakerProjectId - Name: PIPELINE_DESCRIPTION Value: !Ref PipelineDescription - Name: PIPELINE_NAME_PREFIX Value: !Ref PipelineNamePrefix - Name: DW_FLOW_URL Value: !Ref DataWranglerFlowUrl - Name: DW_FLOW_OUTPUT_NAME Value: !Ref DataWranglerOutputName - Name: S3_DATA_PREFIX Value: !Ref S3DataPrefix - Name: FEATURE_GROUP_NAME Value: !Ref FeatureGroupName - Name: AWS_REGION Value: !Ref AWS::Region - Name: EXECUTION_ROLE Value: !Sub 'arn:${AWS::Partition}:iam::${AWS::AccountId}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole' Source: Type: CODEPIPELINE BuildSpec: buildspec.yml TimeoutInMinutes: 480 DataLoadPipelineBuildPipeline: Type: AWS::CodePipeline::Pipeline Properties: # Max length: 100 chars Name: !Sub sagemaker-${SageMakerProjectName}-${SageMakerProjectId}-pipelinebuild # max: 10+33+15+13=71 RoleArn: !Sub 'arn:${AWS::Partition}:iam::${AWS::AccountId}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole' ArtifactStore: Type: S3 Location: !Ref CodePipelineArtifactsBucket Stages: - Name: Source Actions: - Name: DataLoadPipelineBuildSource ActionTypeId: Category: Source Owner: AWS Provider: CodeCommit Version: '1' Configuration: PollForSourceChanges: 'false' RepositoryName: !GetAtt DataLoadPipelineCodeCommitRepository.Name BranchName: main OutputArtifacts: - Name: DataLoadPipelineSourceArtifact - Name: Build Actions: - Name: UpsertSageMakerDataLoadPipeline ActionTypeId: Category: Build Owner: AWS Provider: CodeBuild Version: '1' InputArtifacts: - Name: DataLoadPipelineSourceArtifact OutputArtifacts: - Name: DataLoadPipelineBuildArtifact Configuration: ProjectName: !Ref DataLoadPipelineBuildProject RunOrder: 1 StartIngestionPipelineLambdaExecutionRole: Type: 'AWS::IAM::Role' Condition: CreateLambdaExecutionRoleCondition Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: InlinePolicy PolicyDocument: Version: '2012-10-17' Statement: - Sid: SageMakerPipelinePermission Effect: Allow Action: - sagemaker:StartPipelineExecution Resource: !Sub 'arn:aws:sagemaker:${AWS::Region}:${AWS::AccountId}:pipeline/${PipelineNamePrefix}-${SageMakerProjectId}' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' StartIngestionPipelineLambda: Type: AWS::Lambda::Function Properties: ReservedConcurrentExecutions: 1 Code: ZipFile: | import json import os import boto3 import logging from time import gmtime, strftime logger = logging.getLogger(__name__) logging.root.setLevel(os.environ.get("LOG_LEVEL", "INFO")) # configuration settings SM_PIPELINE_NAME = os.environ.get("SM_PIPELINE_NAME", "") s3 = boto3.resource('s3') sm = boto3.client('sagemaker') def lambda_handler(event, context): try: operation = event["detail"]["eventName"] obj_key = event["detail"]["requestParameters"]["key"] bucket_name = event["detail"]["requestParameters"]["bucketName"] logger.info(f"Got the event: {operation} for the object: {bucket_name}/{obj_key}") logger.info(f"Starting pipeline {SM_PIPELINE_NAME}") start_pipeline = sm.start_pipeline_execution( PipelineName=SM_PIPELINE_NAME, PipelineExecutionDisplayName=f"{obj_key.split('/')[-1].replace('_','').replace('.csv','')}-{strftime('%d-%H-%M-%S', gmtime())}", PipelineParameters=[ { 'Name': 'InputDataUrl', 'Value': f"s3://{bucket_name}/{obj_key}" }, ], PipelineExecutionDescription=obj_key ) logger.info(f"start_pipeline_execution returned {start_pipeline}") except Exception as e: logger.error(f"Exception in start_fs_ingestion function: {str(e)}") return Description: Start SageMaker pipeline with data transformation and FS ingestion Environment: Variables: SM_PIPELINE_NAME: !Sub '${PipelineNamePrefix}-${SageMakerProjectId}' Handler: index.lambda_handler MemorySize: 128 Role: !If - CreateLambdaExecutionRoleCondition - !GetAtt StartIngestionPipelineLambdaExecutionRole.Arn - !Ref LambdaExecutionRole Runtime: python3.8 Timeout: 60 CloudTrailBucket: # Bucket for CloudTrail logs # We need CloudTrail to enable EventBridge notification for object put events on the data bucket # We using the CloudTrail-based notification instead of S3 notifications # because we don't want to overwrite the existing S3 notification on the data bucket Type: AWS::S3::Bucket DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: BucketName: !Sub sagemaker-ct-${SageMakerProjectName}-${SageMakerProjectId} # 13+32+15=60 chars max/ 63 allowed PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: 'AES256' OwnershipControls: Rules: - ObjectOwnership: BucketOwnerPreferred # Bucket policy enables CloudTrail to write to the CloudTrailBucket CloudTrailBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref CloudTrailBucket PolicyDocument: Version: "2012-10-17" Statement: - Sid: "AWSCloudTrailAclCheck" Effect: "Allow" Principal: Service: "cloudtrail.amazonaws.com" Action: "s3:GetBucketAcl" Resource: !Sub arn:aws:s3:::${CloudTrailBucket} - Sid: "AWSCloudTrailWrite" Effect: "Allow" Principal: Service: "cloudtrail.amazonaws.com" Action: "s3:PutObject" Resource: !Sub arn:aws:s3:::${CloudTrailBucket}/AWSLogs/${AWS::AccountId}/* Condition: StringEquals: s3:x-amz-acl: "bucket-owner-full-control" # The CloudTrail trail - uses the CloudTrailBucket as the trail name S3ObjectCloudTrail: Type: AWS::CloudTrail::Trail DependsOn: - CloudTrailBucketPolicy Properties: TrailName: !Sub cloudtrail-${CloudTrailBucket} S3BucketName: !Ref CloudTrailBucket IsLogging: true IsMultiRegionTrail: false EventSelectors: - ReadWriteType: WriteOnly - IncludeManagementEvents: false DataResources: - Type: AWS::S3::Object Values: - !Sub arn:aws:s3:::${S3DataPrefix} IncludeGlobalServiceEvents: false # Lambda resource permission for EventBridge to invoke the Lambda function CloudTrailLambdaInvokePermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref StartIngestionPipelineLambda Principal: events.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: !GetAtt S3ObjectCreatedEventBridgeRule.Arn # EventBridge event rule to invoke the Lambda function on `PutObject` and `CompleteMultipartUpload` S3ObjectCreatedEventBridgeRule: Type: AWS::Events::Rule Properties: Description: !Sub 'Invokes ${StartIngestionPipelineLambda} on object upload to the s3 prefix ${S3DataPrefix}' Name: !Sub 's3-put-rule-${SageMakerProjectName}-${SageMakerProjectId}' EventPattern: source: - 'aws.s3' detail: eventSource: - 's3.amazonaws.com' eventName: - 'PutObject' - 'CompleteMultipartUpload' requestParameters: bucketName: - !Select [ 0, !Split [ '/', !Ref S3DataPrefix] ] State: 'ENABLED' Targets: - Arn: !GetAtt StartIngestionPipelineLambda.Arn Id: !Sub 'target-${SageMakerProjectName}-${SageMakerProjectId}'