AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: AWS ParallelCluster Cluster Custom Resource Provider Parameters: CustomLambdaRole: Description: Custom role to use for PC Lambda Type: String Default: '' AdditionalIamPolicies: Description: Comma-delimited list of additional IAM Policies to add to the cluster (only used if CustomLambdaRole isn't provided). Type: CommaDelimitedList Default: '' CustomBucket: Description: (Debug only) bucket to retrieve S3 artifacts for internal resources. Type: String Default: '' Mappings: ParallelCluster: Constants: Version: 3.8.0 # major.minor.patch+alpha/beta_identifier Conditions: CustomRoleCondition: !Not [!Equals [!Ref CustomLambdaRole, '']] UsePCPolicies: !Not [!Condition CustomRoleCondition ] UseAdditionalIamPolicies: !Not [!Equals [!Join ['', !Ref AdditionalIamPolicies ], '']] UseCustomBucket: !Not [!Equals [!Ref CustomBucket, '']] Resources: PclusterLayer: Type: AWS::Lambda::LayerVersion Properties: LayerName: !Sub - PCLayer-${StackIdSuffix} - { StackIdSuffix: !Select [2, !Split ['/', !Ref 'AWS::StackId']] } Description: Library which contains aws-parallelcluster python package and dependencies Content: S3Bucket: !If [ UseCustomBucket, !Ref CustomBucket, !Sub "${AWS::Region}-aws-parallelcluster" ] S3Key: !Sub - parallelcluster/${Version}/layers/aws-parallelcluster/lambda-layer.zip - { Version: !FindInMap [ParallelCluster, Constants, Version] } CompatibleRuntimes: - python3.9 PclusterPolicies: Condition: UsePCPolicies Type: AWS::CloudFormation::Stack Properties: TemplateURL: !Sub - https://${Bucket}.s3.${Region}.${AWS::URLSuffix}/parallelcluster/${Version}/templates/policies/policies.yaml - { Version: !FindInMap [ParallelCluster, Constants, Version ], Bucket: !If [UseCustomBucket, !Ref CustomBucket, !Sub "${AWS::Region}-aws-parallelcluster" ], Region: !Ref AWS::Region } TimeoutInMinutes: 10 Parameters: EnableIamAdminAccess: true PclusterCfnFunctionLogGroup: Type: AWS::Logs::LogGroup DeletionPolicy: Retain Properties: RetentionInDays: 90 LogGroupName: !Sub /aws/lambda/${PclusterCfnFunction} EventsPolicy: Condition: UsePCPolicies Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Sid: EventsPolicy Effect: Allow Action: - events:PutRule - events:DeleteRule - events:PutTargets - events:RemoveTargets Resource: !Sub arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/* S3Policy: Condition: UsePCPolicies Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Sid: S3Policy Effect: Allow Action: - s3:*Object - s3:ListBucket - s3:ListBucketVersions Resource: - !Sub arn:${AWS::Partition}:s3:::${PclusterCustomResourceBucket}/* - !Sub arn:${AWS::Partition}:s3:::${PclusterCustomResourceBucket} PclusterCustomResourceBucket: Type: AWS::S3::Bucket PclusterLambdaRole: Condition: UsePCPolicies Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Effect: Allow Action: sts:AssumeRole Principal: Service: lambda.amazonaws.com ManagedPolicyArns: !Split - "," - !Sub - ${LambdaExecutionPolicy},${ClusterPolicy},${DefaultAdminPolicy},${EventsPolicy},${S3Policy}${AdditionalIamPolicies} - { LambdaExecutionPolicy: !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", ClusterPolicy: !GetAtt [ PclusterPolicies, Outputs.ParallelClusterClusterPolicy ], DefaultAdminPolicy: !GetAtt [ PclusterPolicies, Outputs.DefaultParallelClusterIamAdminPolicy ], EventsPolicy: !Ref EventsPolicy, S3Policy: !Ref S3Policy, AdditionalIamPolicies: !If [UseAdditionalIamPolicies, !Sub [",${AdditionalPolicies}", AdditionalPolicies: !Join [',', !Ref AdditionalIamPolicies ]], ''] } PclusterCfnFunction: Type: AWS::Lambda::Function Properties: Tags: - Key: "parallelcluster:version" Value: !FindInMap [ParallelCluster, Constants, Version] - Key: "parallelcluster:custom_resource" Value: "cluster" FunctionName: !Sub - pcluster-cfn-${StackIdSuffix} - { StackIdSuffix: !Select [2, !Split ['/', !Ref 'AWS::StackId']] } TracingConfig: Mode: Active MemorySize: 2048 Timeout: 60 Code: ZipFile: !Sub | import boto3 import cfnresponse import datetime import json import logging import os import random import re import string import sys logger = logging.getLogger() logger.setLevel(logging.INFO) import pcluster.api.controllers.cluster_operations_controller import pcluster.api.errors import pcluster.utils from pcluster.api import encoder from pcluster.cli.exceptions import APIOperationException, ParameterException from pcluster.api.errors import exception_message, NotFoundException import pcluster.lib as pc crhelper_path = "/opt/python/pcluster/resources/custom_resources/custom_resources_code" sys.path.insert(0, crhelper_path) from crhelper import CfnResource helper = CfnResource() def drop_keys(_dict, keys): return {k: v for k, v in _dict.items() if k not in keys} def flatten(obj, ret={}, path=""): """flatten a nested map using dot-notation for keys.""" if isinstance(obj, list): # convert list to dictionary for flattening return flatten({str(i): v for i, v in enumerate(obj)}, ret, path) for k, v in obj.items(): if isinstance(v, (dict, list)): # recurse on complex objects flatten(v, ret, f"{path}{k}.") else: # otherwise add with prefix ret[path + str(k)] = v return ret def update_response(data): logger.info(data) # Avoid limit on response object size, user has provided these, so drop them in the response extra_keys = {"clusterConfiguration", "scheduler", "tags"} # create / delete responses have cluster information nested in "cluster" key, # flatten that portion while keeping other keys to propagate warnings. if "cluster" in data: helper.Data.update(flatten(drop_keys(data["cluster"], extra_keys))) validation_messages = json.dumps(data.get("validationMessages", [])) validation_messages = "TRUNCATED:" + validation_messages[:2048] if len(validation_messages) > 2048 else validation_messages helper.Data["validationMessages"] = validation_messages else: # without "cluster" in the keys, this is a cluster object. helper.Data.update(flatten(drop_keys(data, extra_keys))) def serialize(val): return utils.to_iso_timestr(val) if isinstance(val, datetime.date) else val def get_stack_tags(stack_name, overrides): cfn = boto3.client('cloudformation') stack_tags = cfn.describe_stacks(StackName=stack_name)["Stacks"][0].get("Tags", []) return list(filter(lambda t: not (t["Key"].startswith("aws:") or t["Key"] in overrides), stack_tags)) def create_or_update(event): properties = event["ResourceProperties"] full_event = json.loads(boto3.client("s3").get_object(Bucket="${PclusterCustomResourceBucket}",Key=properties.get("ClusterName")+"/event.json")["Body"].read()) cluster_configuration = full_event["ResourceProperties"]["ClusterConfiguration"] request_type = event["RequestType"].upper() helper.Data["validationMessages"] = "[]" # default value if properties.get("DeletionPolicy", "Delete") not in {"Retain", "Delete"}: raise ValueError("DeletionPolicy must be one of [\"Retain\", \"Delete\"].") if request_type == "CREATE" and "ClusterName" not in properties: raise ValueError("Couldn't find a ClusterName in the properties.") elif request_type == "UPDATE" and event["PhysicalResourceId"] != properties.get("ClusterName"): raise ValueError("Cannot update the ClusterName in the properties.") cluster_name = properties["ClusterName"] logger.info(f"{event['RequestType'].upper()}: {cluster_name}") physical_resource_id = cluster_name try: meta_keys = {"ServiceToken", "DeletionPolicy"} kwargs = {**{pcluster.utils.to_snake_case(k): serialize(v) for k, v in drop_keys(properties, meta_keys).items()}, "wait": False} kwargs["cluster_configuration"] = cluster_configuration resource_tags = [{"Key": "parallelcluster:custom_resource", "Value": "cluster"}] config_tags = cluster_configuration.get("Tags", []) stack_tags = get_stack_tags(event['StackId'], {t["Key"] for t in config_tags}) kwargs["cluster_configuration"]["Tags"] = stack_tags + config_tags + resource_tags func = {"CREATE": pc.create_cluster, "UPDATE": pc.update_cluster}[request_type] update_response(func(**kwargs)) except (APIOperationException, ParameterException, TypeError) as e: logger.info(str(e)) raise ValueError(str(e)) except Exception as e: message = pcluster.api.errors.exception_message(e) # StatusReason is truncated, so skip changeset in output, still logged below block_list = {"change_set"} message_data = drop_keys(message.to_dict(), block_list) logger.info(message_data) # sort more critical errors last if "configuration_validation_errors" in message_data and message_data["configuration_validation_errors"]: order = {k: i for i, k in enumerate(["INFO", "WARNING", "ERROR"])} message_data["configuration_validation_errors"].sort(key=lambda e: order[e["level"]]) str_msg = encoder.JSONEncoder().encode(message_data) if not re.search(r"No changes found", str_msg): logger.info(encoder.JSONEncoder().encode(message)) raise ValueError(str_msg) logger.info(f"No changes found to update: {cluster_name}") return physical_resource_id @helper.create def create(event, context): return create_or_update(event) @helper.update def update(event, context): return create_or_update(event) @helper.delete def delete(event, context): properties = event["ResourceProperties"] cluster_name = properties.get("ClusterName") boto3.resource('s3').Bucket("${PclusterCustomResourceBucket}").objects.filter(Prefix=f"{cluster_name}/").delete() deletion_policy = properties.get("DeletionPolicy", "Delete") if deletion_policy not in {"Retain", "Delete"}: raise ValueError("DeleetionPolicy must be one of [\"Retain\", \"Delete\"].") if deletion_policy == "Retain": return cluster_name logger.info(f"Deleting: {cluster_name}") try: update_response(pc.delete_cluster(cluster_name=cluster_name)) except (ParameterException, NotFoundException): # cluster deleted or invalid name -- ignore here. pass except Exception as e: message = exception_message(e) raise ValueError(encoder.JSONEncoder().encode(message)) # Polling functionality for async CUD operations def poll(event): log_group = os.getenv("AWS_LAMBDA_LOG_GROUP_NAME") cluster_name = event["ResourceProperties"].get("ClusterName") try: cluster = pc.describe_cluster(cluster_name=cluster_name) status = cluster.get("clusterStatus") if status in {"CREATE_COMPLETE", "UPDATE_COMPLETE"}: update_response(cluster) return cluster_name elif status in {"CREATE_FAILED", "UPDATE_FAILED", "DELETE_FAILED"}: reasons = ",".join(f["failureCode"] for f in cluster.get("failures", [])) raise ValueError(f"{cluster_name}: {reasons} (LogGroup: {log_group})") # If create fails and we try to roll-back (e.g. delete), # gracefully handle missing cluster. on the delete pathway, the # only invalid parameter can be the name except (ParameterException, NotFoundException): if event["RequestType"].upper() == "DELETE": # Returning a value here signifies that the delete is completed and we can stop polling # not returning a value here causes cfn resource helper to keep polling. return cluster_name raise ValueError(f"{cluster_name} failed {event['RequestType'].upper()}. See LogGroup: {log_group}") @helper.poll_create def poll_create(event, context): return poll(event) @helper.poll_update def poll_update(event, context): return poll(event) @helper.poll_delete def poll_delete(event, context): return poll(event) def handler(event, context): try: logger.info("Beginning of Pcluster custom resource Lambda function. Printing full event...") logger.info(event) if event["ResourceProperties"].get("ClusterConfiguration") or (event.get("OldResourceProperties") and event["OldResourceProperties"].get("ClusterConfiguration")): boto3.client('s3').put_object( Body=json.dumps(event), Bucket="${PclusterCustomResourceBucket}", Key=event["ResourceProperties"].get("ClusterName")+"/event.json" ) event["ResourceProperties"].pop("ClusterConfiguration", None) event.get("OldResourceProperties", {}).pop("ClusterConfiguration", None) except Exception as exception: cfnresponse.send(event, context, cfnresponse.FAILED, {}, event.get('PhysicalResourceId', 'PclusterClusterCustomResource'), str(exception)) helper(event, context) Handler: index.handler Runtime: python3.9 Role: !If [CustomRoleCondition, !Ref CustomLambdaRole, !GetAtt PclusterLambdaRole.Arn] Layers: - !Ref PclusterLayer CleanupS3bucketFunction: Type: AWS::Lambda::Function Properties: Timeout: 60 Code: ZipFile: !Sub | import boto3 import cfnresponse import logging import sys from botocore.config import Config logger = logging.getLogger() logger.setLevel(logging.INFO) def _delete_s3_artifacts(event): """ Delete artifacts under the directory that is passed in. It exits gracefully if directory does not exist. :param bucket_name: bucket containing cluster artifacts """ bucket_name = event["ResourceProperties"]["S3Bucket"] try: if bucket_name != "NONE": bucket = boto3.resource("s3", config=Config(retries={"max_attempts": 60})).Bucket(bucket_name) logger.info("Cluster S3 artifact in %s deletion: STARTED", bucket_name) bucket.objects.all().delete() bucket.object_versions.delete() logger.info("Cluster S3 artifact in %s deletion: COMPLETED", bucket_name) except boto3.client("s3").exceptions.NoSuchBucket as ex: logger.warning("S3 bucket %s not found. Bucket was probably manually deleted.", bucket_name) logger.warning(ex, exc_info=True) except Exception as e: logger.error( "Failed when deleting cluster S3 artifact in %s with error %s", bucket_name, e ) raise def handler(event, context): try: if event["RequestType"] == "Delete": _delete_s3_artifacts(event) response_status = cfnresponse.SUCCESS reason = "" except Exception as e: response_status = cfnresponse.FAILED reason = str(e) cfnresponse.send(event, context, response_status, {}, event.get('PhysicalResourceId', 'CleanupS3bucketCustomResource'), reason) Handler: index.handler Runtime: python3.9 Role: !If [CustomRoleCondition, !Ref CustomLambdaRole, !GetAtt PclusterLambdaRole.Arn] CleanupS3bucketCustomResource: DeletionPolicy: Delete Properties: S3Bucket: !Ref PclusterCustomResourceBucket ServiceToken: !GetAtt CleanupS3bucketFunction.Arn Type: AWS::CloudFormation::CustomResource UpdateReplacePolicy: Delete Outputs: ServiceToken: Description: Lambda for managing PCluster Resources Value: !GetAtt PclusterCfnFunction.Arn LogGroupArn: Description: ARN of LogGroup for Lambda logging Value: !GetAtt PclusterCfnFunctionLogGroup.Arn LambdaLayerArn: Description: ARN for the ParallelCluster Lambda Layer Value: !Ref PclusterLayer