# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: '2010-09-09' Description: | Create automation lambda function stack for CI/CD pipelines Parameters: ProjectName: Description: Project name Type: String Default: 'sm-mlops' Resources: AutomationLambdaExecutionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: InlinePolicy PolicyDocument: Version: '2012-10-17' Statement: - Sid: IAMPermission Effect: Allow Action: - 'iam:GetRole' Resource: '*' - Sid: ServiceCatalogPermission Effect: Allow Action: - servicecatalog:* Resource: '*' - Sid: CodePipelineJobPermission Effect: Allow Action: - codepipeline:PutJobSuccessResult - codepipeline:PutJobFailureResult Resource: '*' - Sid: SSMReadPermission Effect: Allow Action: - 'ssm:GetParameters' - 'ssm:GetParameter' - 'ssm:PutParameter' Resource: !Sub 'arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/*' - Sid: S3ArtifactBucketPermission Effect: Allow Action: - 's3:*Object' - 's3:GetObjectVersion' - 's3:GetBucketAcl' - 's3:GetBucketLocation' - 's3:ListBucket' Resource: - !Sub 'arn:aws:s3:::codepipeline-${ProjectName}*' - !Sub 'arn:aws:s3:::codepipeline-${ProjectName}*/*' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' - 'arn:aws:iam::aws:policy/AmazonVPCFullAccess' - 'arn:aws:iam::aws:policy/AmazonElasticFileSystemFullAccess' - 'arn:aws:iam::aws:policy/AWSCloudFormationFullAccess' - 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess' CleanUpEFSLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${ProjectName}-automation-CleanUpEFS' Code: ZipFile: | import json import boto3 import time import tempfile import zipfile efs = boto3.client("efs") s3 = boto3.client("s3") ec2 = boto3.client("ec2") code_pipeline = boto3.client('codepipeline') def delete_efs(sm_domain_id, delete_vpc=False): vpc_id="" subnets = [] # Get EFS which belongs to SageMaker (based on a tag) print(f"Get EFS file system id(s) for SageMaker domain id {sm_domain_id}") for id in [ fs["FileSystemId"] for fs in efs.describe_file_systems()["FileSystems"] if fs.get("Tags") and [t["Value"] for t in fs["Tags"] if t["Key"]=="ManagedByAmazonSageMakerResource"][0].split("/")[-1] == sm_domain_id ]: print(f"Delete mount targets for EFS file system id: {id}") for mt in efs.describe_mount_targets(FileSystemId=id)["MountTargets"]: efs.delete_mount_target(MountTargetId=mt["MountTargetId"]) vpc_id = mt["VpcId"] subnets.append(mt["SubnetId"]) while len(efs.describe_mount_targets(FileSystemId=id)["MountTargets"]) > 0: print("Wait until mount targets have been deleted") time.sleep(5) # Get all SageMaker EFS security groups (based on a tag) security_groups = [sg for sg in ec2.describe_security_groups(Filters=[{"Name":"vpc-id","Values":[vpc_id]}])["SecurityGroups"] if sg.get("Tags") and [t["Value"] for t in sg["Tags"] if t["Key"]=="ManagedByAmazonSageMakerResource"][0].split("/")[-1] == sm_domain_id ] # Remove all ingress and egress rules for sg in security_groups: if len(sg["IpPermissionsEgress"]) > 0: print(f"revoke egress rule for security group {sg['GroupId']}") ec2.revoke_security_group_egress(GroupId=sg["GroupId"], IpPermissions=sg["IpPermissionsEgress"]) if len(sg["IpPermissions"]) > 0: print(f"revoke ingress rule for security group {sg['GroupId']}") ec2.revoke_security_group_ingress(GroupId=sg["GroupId"], IpPermissions=sg["IpPermissions"]) # Delete all SageMaker security groups for eth1 (efs) for sg in security_groups: print(f"delete security group {sg['GroupId']}: {sg['GroupName']}") ec2.delete_security_group(GroupId=sg["GroupId"]) print(f"Delete EFS file system {id}") efs.delete_file_system(FileSystemId=id) if vpc_id and delete_vpc: for sn in subnets: print(f"delete subnet {sn}") ec2.delete_subnet(SubnetId=sn) print(f"delete VPC {vpc_id}") ec2.delete_vpc(VpcId=vpc_id) def get_file(artifact, f_name): bucket = artifact["location"]["s3Location"]["bucketName"] key = artifact["location"]["s3Location"]["objectKey"] print(f"{bucket}/{key}") with tempfile.NamedTemporaryFile() as tmp_file: s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, 'r') as z: return json.loads(z.read(f_name)) def lambda_handler(event, context): try: job_id = event['CodePipeline.job']['id'] job_data = event['CodePipeline.job']['data'] user_param = json.loads(job_data["actionConfiguration"]["configuration"]["UserParameters"]) print(f"user parameters: {user_param}") sm_domain_id = get_file(job_data["inputArtifacts"][0], user_param.get("FileName")).get("SageMakerDomainId") delete_efs(sm_domain_id, user_param.get("VPC") == "delete") code_pipeline.put_job_success_result(jobId=job_id) except Exception as e: print(f"exception: {str(e)}") code_pipeline.put_job_failure_result(jobId=job_id, failureDetails={'message': str(e), 'type': 'JobFailed'}) Description: Delete SageMaker Studio left-over EFS file systems Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt AutomationLambdaExecutionRole.Arn Runtime: python3.8 Timeout: 180 ProvisionProductLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${ProjectName}-automation-ProvisionProduct' Code: ZipFile: | import json import boto3 import tempfile import zipfile import uuid import time sc = boto3.client("servicecatalog") code_pipeline = boto3.client('codepipeline') s3 = boto3.client("s3") sts = boto3.client("sts") ssm = boto3.client("ssm") def get_file(artifact, f_name): bucket = artifact["location"]["s3Location"]["bucketName"] key = artifact["location"]["s3Location"]["objectKey"] print(f"{bucket}/{key}") with tempfile.NamedTemporaryFile() as tmp_file: s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, 'r') as z: return json.loads(z.read(f_name)) def get_role_arn(): return "/".join(sts.get_caller_identity()["Arn"].replace("assumed-role", "role").replace("sts", "iam").split("/")[0:-1]) def associated_role(portfolio_id): role_arn = get_role_arn() print(f"associating the lambda execution role {role_arn} with the portfolio {portfolio_id}") r = sc.associate_principal_with_portfolio( PortfolioId=portfolio_id, PrincipalARN=role_arn, PrincipalType='IAM' ) print(r) def provision_product(product_id, product_name, provisioning_artifact_id, provisioning_parameters): print(f"launching the product {product_id}") r = sc.provision_product( ProductId=product_id, ProvisioningArtifactId=provisioning_artifact_id, ProvisionedProductName=product_name.replace(" ", "_").lower() + "-" + str(uuid.uuid4()).split("-")[0], ProvisioningParameters=provisioning_parameters ) print(r) print(f"ProvisionedProductId: {r['RecordDetail']['ProvisionedProductId']}") ssm.put_parameter( Name=f"/ds-product-catalog/{product_id}/provisioned-product-id", Description=f"Provisioned product id for product_id: {product_id}", Value=r['RecordDetail']['ProvisionedProductId'], Type="String", Overwrite=True ) def lambda_handler(event, context): try: job_id = event['CodePipeline.job']['id'] job_data = event['CodePipeline.job']['data'] user_param = json.loads(job_data["actionConfiguration"]["configuration"]["UserParameters"]) data = get_file(job_data["inputArtifacts"][0], user_param.get("FileName")) print(user_param) if user_param.get("Operation") == "associate-role": associated_role(data["PortfolioId"]) else: provision_product( data["ProductId"], data["ProductName"], data["ProvisioningArtifactIds"], user_param["ProvisioningParameters"] ) code_pipeline.put_job_success_result(jobId=job_id) except Exception as e: print(f"exception: {str(e)}") code_pipeline.put_job_failure_result(jobId=job_id, failureDetails={'message': str(e), 'type': 'JobFailed'}) Description: Provision AWS Service Catalog product Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt AutomationLambdaExecutionRole.Arn Runtime: python3.8 Timeout: 60 TerminateProductLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${ProjectName}-automation-TerminateProduct' Code: ZipFile: | import json import boto3 import tempfile import zipfile import uuid sc = boto3.client("servicecatalog") code_pipeline = boto3.client('codepipeline') s3 = boto3.client("s3") sts = boto3.client("sts") ssm = boto3.client("ssm") def get_file(artifact, f_name): bucket = artifact["location"]["s3Location"]["bucketName"] key = artifact["location"]["s3Location"]["objectKey"] print(f"{bucket}/{key}") with tempfile.NamedTemporaryFile() as tmp_file: s3.download_file(bucket, key, tmp_file.name) with zipfile.ZipFile(tmp_file.name, 'r') as z: return json.loads(z.read(f_name)) def get_role_arn(): return "/".join(sts.get_caller_identity()["Arn"].replace("assumed-role", "role").replace("sts", "iam").split("/")[0:-1]) def terminate_product(portfolio_id, product_id): provisioned_product_id = ssm.get_parameter(Name=f"/ds-product-catalog/{product_id}/provisioned-product-id")["Parameter"]["Value"] print(sc.describe_provisioned_product(Id=provisioned_product_id)) try: print(f"terminating the provisioned product {provisioned_product_id}") r = sc.terminate_provisioned_product( ProvisionedProductId=provisioned_product_id, TerminateToken=str(uuid.uuid4()) ) print(r) except Exception as e: print(f"exception in terminate_provisioned_product: {str(e)}") print(f"disassociating the lambda execution role with the portfolio {portfolio_id}") sc.disassociate_principal_from_portfolio( PortfolioId=portfolio_id, PrincipalARN=get_role_arn() ) print(r) def lambda_handler(event, context): try: job_id = event['CodePipeline.job']['id'] job_data = event['CodePipeline.job']['data'] user_param = json.loads(job_data["actionConfiguration"]["configuration"]["UserParameters"]) print(user_param) data = get_file(job_data["inputArtifacts"][0], user_param.get("FileName")) terminate_product(data["PortfolioId"], data["ProductId"]) code_pipeline.put_job_success_result(jobId=job_id) except Exception as e: print(f"exception: {str(e)}") code_pipeline.put_job_failure_result(jobId=job_id, failureDetails={'message': str(e), 'type': 'JobFailed'}) Description: Get SageMaker domain id Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt AutomationLambdaExecutionRole.Arn Runtime: python3.8 Timeout: 60 GetSageMakerDomainIdLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${ProjectName}-automation-GetSageMakerDomainId' Code: ZipFile: | import json import boto3 import cfnresponse ssm = boto3.client("ssm") def lambda_handler(event, context): try: r = {} if 'RequestType' in event and event['RequestType'] == 'Create': r["SageMakerDomainId"] = ssm.get_parameter( Name=event['ResourceProperties']['SSMParameterName'] )["Parameter"]["Value"] cfnresponse.send(event, context, cfnresponse.SUCCESS, r, '') except Exception as e: print(f"exception: {str(e)}") cfnresponse.send(event, context, cfnresponse.FAILED, {}, physicalResourceId=event.get('PhysicalResourceId'), reason=str(e)) Description: Get SageMaker domain id Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt AutomationLambdaExecutionRole.Arn Runtime: python3.8 Timeout: 60