# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines
import abc
import hashlib
from hashlib import sha1
from typing import List, Union

import pkg_resources
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_iam as iam
from aws_cdk import aws_lambda as awslambda
from aws_cdk import aws_logs as logs
from aws_cdk.aws_iam import ManagedPolicy, PermissionsBoundary
from aws_cdk.core import Arn, ArnFormat, CfnDeletionPolicy, CfnTag, Construct, Fn, Stack

from pcluster.config.cluster_config import (
    BaseClusterConfig,
    BaseComputeResource,
    BaseQueue,
    HeadNode,
    LoginNodesPool,
    SharedStorageType,
    SlurmClusterConfig,
    SlurmComputeResource,
    SlurmQueue,
)
from pcluster.constants import (
    COOKBOOK_PACKAGES_VERSIONS,
    CW_LOGS_RETENTION_DAYS_DEFAULT,
    IAM_ROLE_PATH,
    LAMBDA_VPC_ACCESS_MANAGED_POLICY,
    PCLUSTER_CLUSTER_NAME_TAG,
    PCLUSTER_DYNAMODB_PREFIX,
    PCLUSTER_NODE_TYPE_TAG,
)
from pcluster.launch_template_utils import _LaunchTemplateBuilder
from pcluster.models.s3_bucket import S3Bucket, parse_bucket_url
from pcluster.utils import (
    get_installed_version,
    get_resource_name_from_resource_arn,
    policy_name_to_arn,
    split_resource_prefix,
)

PCLUSTER_LAMBDA_PREFIX = "pcluster-"


def create_hash_suffix(string_to_hash: str):
    """Create 16digit hash string."""
    return (
        string_to_hash
        if string_to_hash == "HeadNode"
        # A nosec comment is appended to the following line in order to disable the B324 check.
        # The sha1 is used just as a hashing function.
        # [B324:hashlib] Use of weak MD4, MD5, or SHA1 hash for security. Consider usedforsecurity=False
        else sha1(string_to_hash.encode("utf-8")).hexdigest()[:16].capitalize()  # nosec nosemgrep
    )


def get_user_data_content(user_data_path: str):
    """Retrieve user data content."""
    user_data_file_path = pkg_resources.resource_filename(__name__, user_data_path)
    with open(user_data_file_path, "r", encoding="utf-8") as user_data_file:
        user_data_content = user_data_file.read()
    return user_data_content


def get_common_user_data_env(node: Union[HeadNode, SlurmQueue, LoginNodesPool], config: BaseClusterConfig) -> dict:
    """Return a dict containing the common env variables to be replaced in user data."""
    return {
        "YumProxy": node.networking.proxy.http_proxy_address if node.networking.proxy else "_none_",
        "DnfProxy": node.networking.proxy.http_proxy_address if node.networking.proxy else "",
        "AptProxy": node.networking.proxy.http_proxy_address if node.networking.proxy else "false",
        "ProxyServer": node.networking.proxy.http_proxy_address if node.networking.proxy else "NONE",
        "CustomChefCookbook": config.custom_chef_cookbook or "NONE",
        "ParallelClusterVersion": COOKBOOK_PACKAGES_VERSIONS["parallelcluster"],
        "CookbookVersion": COOKBOOK_PACKAGES_VERSIONS["cookbook"],
        "ChefVersion": COOKBOOK_PACKAGES_VERSIONS["chef"],
        "BerkshelfVersion": COOKBOOK_PACKAGES_VERSIONS["berkshelf"],
    }


def get_slurm_specific_dna_json_for_head_node(config: SlurmClusterConfig, scheduler_resources) -> dict:
    """Return a dict containing slurm specific settings to be written to dna.json of head node."""
    return {
        "dns_domain": scheduler_resources.cluster_hosted_zone.name if scheduler_resources.cluster_hosted_zone else "",
        "hosted_zone": scheduler_resources.cluster_hosted_zone.ref if scheduler_resources.cluster_hosted_zone else "",
        "slurm_ddb_table": scheduler_resources.dynamodb_table.ref,
        "use_private_hostname": str(config.scheduling.settings.dns.use_ec2_hostnames).lower(),
    }


def get_directory_service_dna_json_for_head_node(config: BaseClusterConfig) -> dict:
    """Return a dict containing directory service settings to be written to dna.json of head node."""
    directory_service = config.directory_service
    return (
        {
            "directory_service": {
                "enabled": "true",
                "domain_name": directory_service.domain_name,
                "domain_addr": directory_service.domain_addr,
                "password_secret_arn": directory_service.password_secret_arn,
                "domain_read_only_user": directory_service.domain_read_only_user,
                "ldap_tls_ca_cert": directory_service.ldap_tls_ca_cert or "NONE",
                "ldap_tls_req_cert": directory_service.ldap_tls_req_cert or "NONE",
                "ldap_access_filter": directory_service.ldap_access_filter or "NONE",
                "generate_ssh_keys_for_users": str(directory_service.generate_ssh_keys_for_users).lower(),
                "additional_sssd_configs": directory_service.additional_sssd_configs,
            }
        }
        if directory_service
        else {}
    )


def to_comma_separated_string(list, use_lower_case=False):
    result = ",".join(str(item) for item in list)
    if use_lower_case:
        return result.lower()
    else:
        return result


def get_shared_storage_ids_by_type(shared_storage_infos: dict, storage_type: SharedStorageType):
    """Return shared storage ids from the given list for the given type."""
    return ",".join(storage_mapping.id for storage_mapping in shared_storage_infos[storage_type])


def dict_to_cfn_tags(tags: dict):
    """Convert a dictionary to a list of CfnTag."""
    return [CfnTag(key=key, value=value) for key, value in tags.items()] if tags else []


def get_cluster_tags(stack_name: str, raw_dict: bool = False):
    """Return a list of cluster tags to be used for all the resources."""
    tags = {PCLUSTER_CLUSTER_NAME_TAG: stack_name}
    return tags if raw_dict else dict_to_cfn_tags(tags)


def get_custom_tags(config: Union[BaseClusterConfig, SlurmQueue, SlurmComputeResource], raw_dict: bool = False):
    """Return a list of tags set by the user."""
    custom_tags = config.get_tags()
    tags = {tag.key: tag.value for tag in custom_tags} if custom_tags else {}
    return tags if raw_dict else dict_to_cfn_tags(tags)


def get_default_instance_tags(
    stack_name: str,
    config: BaseClusterConfig,
    node: Union[HeadNode, LoginNodesPool, BaseComputeResource],
    node_type: str,
    shared_storage_infos: dict,
    raw_dict: bool = False,
):
    """Return a list of default tags to be used for instances."""
    tags = {
        **get_cluster_tags(stack_name, raw_dict=True),
        "Name": node_type,
        PCLUSTER_NODE_TYPE_TAG: node_type,
        "parallelcluster:attributes": "{BaseOS}, {Scheduler}, {Version}, {Architecture}".format(
            BaseOS=config.image.os,
            Scheduler=config.scheduling.scheduler,
            Version=get_installed_version(),
            Architecture=node.architecture if hasattr(node, "architecture") else "NONE",
        ),
        "parallelcluster:networking": "EFA={0}".format(
            "true" if hasattr(node, "efa") and node.efa and node.efa.enabled else "NONE"
        ),
        "parallelcluster:filesystem": "efs={efs}, multiebs={multiebs}, raid={raid}, fsx={fsx}".format(
            efs=len(shared_storage_infos[SharedStorageType.EFS]),
            multiebs=len(shared_storage_infos[SharedStorageType.EBS]),
            raid=len(shared_storage_infos[SharedStorageType.RAID]),
            fsx=len(shared_storage_infos[SharedStorageType.FSX]),
        ),
    }
    if config.is_intel_hpc_platform_enabled:
        tags["parallelcluster:intel-hpc"] = "enable_intel_hpc_platform=true"
    return tags if raw_dict else dict_to_cfn_tags(tags)


def get_default_volume_tags(stack_name: str, node_type: str, raw_dict: bool = False):
    """Return a list of default tags to be used for volumes."""
    tags = {
        **get_cluster_tags(stack_name, raw_dict=True),
        PCLUSTER_NODE_TYPE_TAG: node_type,
    }
    return tags if raw_dict else dict_to_cfn_tags(tags)


def get_assume_role_policy_document(service: str):
    """Return default service assume role policy document."""
    return iam.PolicyDocument(
        statements=[
            iam.PolicyStatement(
                actions=["sts:AssumeRole"],
                effect=iam.Effect.ALLOW,
                principals=[iam.ServicePrincipal(service=service)],
            )
        ]
    )


def get_cloud_watch_logs_policy_statement(resource: str) -> iam.PolicyStatement:
    """Return CloudWatch Logs policy statement."""
    return iam.PolicyStatement(
        actions=["logs:CreateLogStream", "logs:PutLogEvents"],
        effect=iam.Effect.ALLOW,
        resources=[resource],
        sid="CloudWatchLogsPolicy",
    )


def get_cloud_watch_logs_retention_days(config: BaseClusterConfig) -> int:
    """Return value to use for CloudWatch logs retention days."""
    return (
        config.monitoring.logs.cloud_watch.retention_in_days
        if config.is_cw_logging_enabled
        else CW_LOGS_RETENTION_DAYS_DEFAULT
    )


def get_log_group_deletion_policy(config: BaseClusterConfig):
    return convert_deletion_policy(config.monitoring.logs.cloud_watch.deletion_policy)


def convert_deletion_policy(deletion_policy: str):
    if deletion_policy == "Retain":
        return CfnDeletionPolicy.RETAIN
    elif deletion_policy == "Delete":
        return CfnDeletionPolicy.DELETE
    elif deletion_policy == "Snapshot":
        return CfnDeletionPolicy.SNAPSHOT
    return None


def get_queue_security_groups_full(managed_compute_security_group: ec2.CfnSecurityGroup, queue: BaseQueue):
    """Return full security groups to be used for the queue, default plus additional ones."""
    queue_security_groups = []

    # Default security groups, created by us or provided by the user
    if queue.networking.security_groups:
        queue_security_groups.extend(queue.networking.security_groups)
    else:
        queue_security_groups.append(managed_compute_security_group.ref)

    # Additional security groups
    if queue.networking.additional_security_groups:
        queue_security_groups.extend(queue.networking.additional_security_groups)

    return queue_security_groups


def get_login_nodes_security_groups_full(
    managed_login_security_group: ec2.CfnSecurityGroup,
    pool: LoginNodesPool,
):
    """Return full security groups to be used for the login node, default plus additional ones."""
    login_nodes_security_groups = []

    # Default security groups, created by us or provided by the user
    if pool.networking.security_groups:
        login_nodes_security_groups.extend(pool.networking.security_groups)
    else:
        login_nodes_security_groups.append(managed_login_security_group.ref)

    # Additional security groups
    if pool.networking.additional_security_groups:
        login_nodes_security_groups.extend(pool.networking.additional_security_groups)

    return login_nodes_security_groups


def add_cluster_iam_resource_prefix(stack_name, config, name: str, iam_type: str):
    """Return a path and Name prefix from the Resource prefix config option."""
    full_resource_path = None
    full_resource_name = None
    if config.iam and config.iam.resource_prefix:
        iam_path, iam_name_prefix = split_resource_prefix(config.iam.resource_prefix)
        if iam_name_prefix:
            # Creating a Globally Unique Hash using Region, Type, Name and stack name
            resource_hash = (
                hashlib.sha256((name + stack_name + iam_type + config.region).encode("utf-8")).hexdigest()[:12].upper()
            )
            full_resource_name = iam_name_prefix + name + "-" + resource_hash
        if iam_path:
            full_resource_path = iam_path

    return full_resource_path, full_resource_name


def add_lambda_cfn_role(scope, config, function_id: str, statements: List[iam.PolicyStatement], has_vpc_config: bool):
    """Return a CfnRole to be used for a Lambda function."""
    role_path, role_name = add_cluster_iam_resource_prefix(
        config.cluster_name, config, name=f"{function_id}Role", iam_type="AWS::IAM::Role"
    )
    _, policy_name = add_cluster_iam_resource_prefix(
        config.cluster_name, config, "LambdaPolicy", iam_type="AWS::IAM::Policy"
    )

    role_id = f"{function_id}Role" if role_name else f"{function_id}FunctionExecutionRole"

    return iam.CfnRole(
        scope,
        role_id,
        path=role_path or IAM_ROLE_PATH,
        role_name=role_name,
        assume_role_policy_document=get_assume_role_policy_document("lambda.amazonaws.com"),
        policies=[
            iam.CfnRole.PolicyProperty(
                policy_document=iam.PolicyDocument(statements=statements),
                policy_name=policy_name or "LambdaPolicy",
            ),
        ],
        managed_policy_arns=[Fn.sub(LAMBDA_VPC_ACCESS_MANAGED_POLICY)] if has_vpc_config else None,
    )


def apply_permissions_boundary(boundary, scope):
    """Apply a permissions boundary to all IAM roles defined in the scope."""
    if boundary:
        boundary = ManagedPolicy.from_managed_policy_arn(scope, "Boundary", boundary)
        PermissionsBoundary.of(scope).apply(boundary)


def scheduler_is_slurm(config: BaseClusterConfig):
    return config.scheduling.scheduler == "slurm"


def generate_launch_template_version_cfn_parameter_hash(queue, compute_resource):
    """
    Generate 16 characters hash for compute fleet launch template version cfn parameter.

    :param queue
    :param compute_resource
    :return: 16 chars string e.g. 2238a84ac8a74529
    """
    # A nosec comment is appended to the following line in order to disable the B324 check.
    # The sha1 is used just as a hashing function.
    # [B324:hashlib] Use of weak MD4, MD5, or SHA1 hash for security. Consider usedforsecurity=False
    return hashlib.sha1((queue + compute_resource).encode()).hexdigest()[0:16].capitalize()  # nosec nosemgrep


class NodeIamResourcesBase(Construct):
    """Abstract construct defining IAM resources for a cluster node."""

    def __init__(
        self,
        scope: Construct,
        id: str,
        config: BaseClusterConfig,
        node: Union[HeadNode, BaseQueue, LoginNodesPool],
        shared_storage_infos: dict,
        name: str,
    ):
        super().__init__(scope, id)
        self._config = config
        self.instance_role = None

        self._add_role_and_policies(node, shared_storage_infos, name)

    def _add_role_and_policies(
        self,
        node: Union[HeadNode, BaseQueue, LoginNodesPool],
        shared_storage_infos: dict,
        name: str,
    ):
        """Create role and policies for the given node/queue."""
        suffix = create_hash_suffix(name)
        if node.instance_profile:
            # If existing InstanceProfile provided, do not create InstanceRole
            self.instance_profile = get_resource_name_from_resource_arn(node.instance_profile)
        elif node.instance_role:
            node_role_ref = get_resource_name_from_resource_arn(node.instance_role)
            self.instance_profile = self._add_instance_profile(node_role_ref, f"InstanceProfile{suffix}")
        else:
            self.instance_role = self._add_node_role(node, f"Role{suffix}")

            # ParallelCluster Policies
            self._add_pcluster_policies_to_role(
                self.instance_role.ref, shared_storage_infos, f"ParallelClusterPolicies{suffix}"
            )

            # Custom Cookbook S3 url policy
            if self._condition_custom_cookbook_with_s3_url():
                self._add_custom_cookbook_policies_to_role(self.instance_role.ref, f"CustomCookbookPolicies{suffix}")

            # S3 Access Policies
            if isinstance(node, (BaseQueue, HeadNode)) and self._condition_create_s3_access_policies(node):
                self._add_s3_access_policies_to_role(node, self.instance_role.ref, f"S3AccessPolicies{suffix}")

            # Head node Instance Profile
            self.instance_profile = self._add_instance_profile(self.instance_role.ref, f"InstanceProfile{suffix}")

    def _add_instance_profile(self, role_ref: str, name: str):
        instance_profile_path, instance_profile_name = add_cluster_iam_resource_prefix(
            self._config.cluster_name, self._config, name, iam_type="AWS::IAM::InstanceProfile"
        )
        return iam.CfnInstanceProfile(
            Stack.of(self),
            name,
            roles=[role_ref],
            path=self._cluster_scoped_iam_path(iam_path=instance_profile_path),
            instance_profile_name=instance_profile_name,
        ).ref

    def _add_node_role(self, node: Union[HeadNode, BaseQueue, LoginNodesPool], name: str):
        role_path, role_name = add_cluster_iam_resource_prefix(
            self._config.cluster_name, self._config, name, iam_type="AWS::IAM::Role"
        )
        additional_iam_policies = set(node.iam.additional_iam_policy_arns)
        if self._config.monitoring.logs.cloud_watch.enabled:
            additional_iam_policies.add(policy_name_to_arn("CloudWatchAgentServerPolicy"))
        return iam.CfnRole(
            Stack.of(self),
            name,
            path=self._cluster_scoped_iam_path(iam_path=role_path),
            managed_policy_arns=list(additional_iam_policies),
            assume_role_policy_document=get_assume_role_policy_document("ec2.{0}".format(Stack.of(self).url_suffix)),
            role_name=role_name,
        )

    def _add_pcluster_policies_to_role(self, role_ref: str, shared_storage_infos: dict, name: str):
        _, policy_name = add_cluster_iam_resource_prefix(
            self._config.cluster_name, self._config, "parallelcluster", iam_type="AWS::IAM::Policy"
        )
        common_policies = []
        if self._config.scheduling.scheduler != "awsbatch":
            efs_with_iam_authorization_arns = self._get_efs_with_iam_authorization_arns(shared_storage_infos)
            if efs_with_iam_authorization_arns:
                common_policies.append(
                    iam.PolicyStatement(
                        sid="Efs",
                        actions=[
                            "elasticfilesystem:ClientMount",
                            "elasticfilesystem:ClientRootAccess",
                            "elasticfilesystem:ClientWrite",
                        ],
                        effect=iam.Effect.ALLOW,
                        resources=efs_with_iam_authorization_arns,
                    ),
                )
        iam.CfnPolicy(
            Stack.of(self),
            name,
            policy_name=policy_name or "parallelcluster",
            policy_document=iam.PolicyDocument(statements=self._build_policy() + common_policies),
            roles=[role_ref],
        )

    def _get_efs_with_iam_authorization_arns(self, shared_storage_infos):
        return [
            self._format_arn(
                service="elasticfilesystem",
                resource=f"file-system/{efs_id}",
                region=Stack.of(self).region,
                account=Stack.of(self).account,
            )
            for efs_id, efs_storage in shared_storage_infos[SharedStorageType.EFS]
            if efs_storage.iam_authorization
        ]

    def _condition_custom_cookbook_with_s3_url(self):
        try:
            return self._config.dev_settings.cookbook.chef_cookbook.startswith("s3://")
        except AttributeError:
            return False

    def _condition_create_s3_access_policies(self, node: Union[HeadNode, BaseQueue, LoginNodesPool]):
        return node.iam and node.iam.s3_access

    def _add_custom_cookbook_policies_to_role(self, role_ref: str, name: str):
        bucket_info = parse_bucket_url(self._config.dev_settings.cookbook.chef_cookbook)
        bucket_name = bucket_info.get("bucket_name")
        object_key = bucket_info.get("object_key")
        iam.CfnPolicy(
            Stack.of(self),
            name,
            policy_name="CustomCookbookS3Url",
            policy_document=iam.PolicyDocument(
                statements=[
                    iam.PolicyStatement(
                        actions=["s3:GetObject"],
                        effect=iam.Effect.ALLOW,
                        resources=[
                            self._format_arn(
                                region="", service="s3", account="", resource=bucket_name, resource_name=object_key
                            )
                        ],
                    ),
                    iam.PolicyStatement(
                        actions=["s3:GetBucketLocation"],
                        effect=iam.Effect.ALLOW,
                        resources=[self._format_arn(service="s3", resource=bucket_name, region="", account="")],
                    ),
                ]
            ),
            roles=[role_ref],
        )

    def _add_s3_access_policies_to_role(
        self, node: Union[HeadNode, BaseQueue, LoginNodesPool], role_ref: str, name: str
    ):
        """Attach S3 policies to given role."""
        read_only_s3_resources = []
        read_write_s3_resources = []
        for s3_access in node.iam.s3_access:
            for resource in s3_access.resource_regex:
                arn = self._format_arn(service="s3", resource=resource, region="", account="")
                if s3_access.enable_write_access:
                    read_write_s3_resources.append(arn)
                else:
                    read_only_s3_resources.append(arn)
        _, policy_name = add_cluster_iam_resource_prefix(
            self._config.cluster_name, self._config, "S3Access", iam_type="AWS::IAM::Policy"
        )

        s3_access_policy = iam.CfnPolicy(
            Stack.of(self),
            name,
            policy_document=iam.PolicyDocument(statements=[]),
            roles=[role_ref],
            policy_name=policy_name or "S3Access",
        )

        if read_only_s3_resources:
            s3_access_policy.policy_document.add_statements(
                iam.PolicyStatement(
                    sid="S3Read",
                    effect=iam.Effect.ALLOW,
                    actions=["s3:Get*", "s3:List*"],
                    resources=read_only_s3_resources,
                )
            )

        if read_write_s3_resources:
            s3_access_policy.policy_document.add_statements(
                iam.PolicyStatement(
                    sid="S3ReadWrite", effect=iam.Effect.ALLOW, actions=["s3:*"], resources=read_write_s3_resources
                )
            )

    def _cluster_scoped_iam_path(self, iam_path=None):
        """Return a path to be associated with IAM roles and instance profiles."""
        if iam_path:
            return f"{iam_path}{Stack.of(self).stack_name}/"
        else:
            return f"{IAM_ROLE_PATH}{Stack.of(self).stack_name}/"

    def _format_arn(self, **kwargs):
        return Stack.of(self).format_arn(**kwargs)

    @abc.abstractmethod
    def _build_policy(self) -> List[iam.PolicyStatement]:
        pass


class HeadNodeIamResources(NodeIamResourcesBase):
    """Construct defining IAM resources for the head node."""

    def __init__(
        self,
        scope: Construct,
        id: str,
        config: BaseClusterConfig,
        node: Union[HeadNode, BaseQueue, LoginNodesPool],
        shared_storage_infos: dict,
        name: str,
        cluster_bucket: S3Bucket,
    ):
        self._cluster_bucket = cluster_bucket
        super().__init__(scope, id, config, node, shared_storage_infos, name)

    def _build_policy(self) -> List[iam.PolicyStatement]:
        policy = [
            iam.PolicyStatement(
                sid="Ec2",
                actions=[
                    "ec2:DescribeInstanceAttribute",
                    "ec2:DescribeInstances",
                    "ec2:DescribeInstanceStatus",
                    "ec2:DescribeVolumes",
                ],
                effect=iam.Effect.ALLOW,
                resources=["*"],
            ),
            iam.PolicyStatement(
                sid="Ec2TagsAndVolumes",
                actions=["ec2:AttachVolume", "ec2:CreateTags", "ec2:DetachVolume"],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(
                        service="ec2",
                        resource="instance/*",
                        region=Stack.of(self).region,
                        account=Stack.of(self).account,
                    ),
                    self._format_arn(
                        service="ec2",
                        resource="volume/*",
                        region=Stack.of(self).region,
                        account=Stack.of(self).account,
                    ),
                ],
            ),
            iam.PolicyStatement(
                sid="S3GetObj",
                actions=["s3:GetObject"],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(
                        service="s3",
                        resource="{0}-aws-parallelcluster/*".format(Stack.of(self).region),
                        region="",
                        account="",
                    )
                ],
            ),
            iam.PolicyStatement(
                sid="ResourcesS3Bucket",
                effect=iam.Effect.ALLOW,
                actions=["s3:GetObject", "s3:GetObjectVersion", "s3:GetBucketLocation", "s3:ListBucket"],
                resources=[
                    self._format_arn(service="s3", resource=self._cluster_bucket.name, region="", account=""),
                    self._format_arn(
                        service="s3",
                        resource=f"{self._cluster_bucket.name}/{self._cluster_bucket.artifact_directory}/*",
                        region="",
                        account="",
                    ),
                ],
            ),
            iam.PolicyStatement(
                sid="CloudFormation",
                actions=[
                    "cloudformation:DescribeStacks",
                    "cloudformation:DescribeStackResource",
                    "cloudformation:SignalResource",
                ],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(service="cloudformation", resource=f"stack/{Stack.of(self).stack_name}/*"),
                    self._format_arn(service="cloudformation", resource=f"stack/{Stack.of(self).stack_name}-*/*"),
                ],
            ),
            iam.PolicyStatement(
                sid="DcvLicense",
                actions=[
                    "s3:GetObject",
                ],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(
                        service="s3",
                        resource="dcv-license.{0}/*".format(Stack.of(self).region),
                        region="",
                        account="",
                    )
                ],
            ),
        ]

        if self._config.scheduling.scheduler != "awsbatch":
            policy.extend(
                [
                    iam.PolicyStatement(
                        sid="EC2Terminate",
                        actions=["ec2:TerminateInstances"],
                        effect=iam.Effect.ALLOW,
                        resources=["*"],
                        conditions={
                            "StringEquals": {f"ec2:ResourceTag/{PCLUSTER_CLUSTER_NAME_TAG}": Stack.of(self).stack_name}
                        },
                    ),
                    iam.PolicyStatement(
                        sid="EC2RunInstancesCreateFleet",
                        actions=["ec2:RunInstances", "ec2:CreateFleet"],
                        effect=iam.Effect.ALLOW,
                        resources=[
                            self._format_arn(service="ec2", resource=f"subnet/{subnet_id}")
                            for subnet_id in self._config.compute_subnet_ids
                        ]
                        + [
                            self._format_arn(service="ec2", resource="fleet/*"),
                            self._format_arn(service="ec2", resource="network-interface/*"),
                            self._format_arn(service="ec2", resource="instance/*"),
                            self._format_arn(service="ec2", resource="volume/*"),
                            self._format_arn(service="ec2", resource=f"key-pair/{self._config.head_node.ssh.key_name}"),
                            self._format_arn(service="ec2", resource="security-group/*"),
                            self._format_arn(service="ec2", resource="launch-template/*"),
                            self._format_arn(service="ec2", resource="placement-group/*"),
                        ]
                        + [
                            self._format_arn(service="ec2", resource=f"image/{queue_ami}", account="")
                            for _, queue_ami in self._config.image_dict.items()
                        ],
                    ),
                    iam.PolicyStatement(
                        sid="EC2DescribeCapacityReservations",
                        actions=["ec2:DescribeCapacityReservations"],
                        effect=iam.Effect.ALLOW,
                        resources=["*"],
                    ),
                    iam.PolicyStatement(
                        sid="PassRole",
                        actions=["iam:PassRole"],
                        effect=iam.Effect.ALLOW,
                        resources=self._generate_head_node_pass_role_resources(),
                    ),
                    iam.PolicyStatement(
                        sid="DynamoDBTable",
                        actions=["dynamodb:UpdateItem", "dynamodb:PutItem", "dynamodb:GetItem"],
                        effect=iam.Effect.ALLOW,
                        resources=[
                            self._format_arn(
                                service="dynamodb",
                                resource=f"table/{PCLUSTER_DYNAMODB_PREFIX}{Stack.of(self).stack_name}",
                            )
                        ],
                    ),
                ]
            )

            self._add_compute_console_output_policy_statement(policy)

            capacity_reservation_ids = self._config.capacity_reservation_ids

            if capacity_reservation_ids:
                policy.append(
                    iam.PolicyStatement(
                        sid="AllowRunningReservedCapacity",
                        actions=["ec2:RunInstances"],
                        effect=iam.Effect.ALLOW,
                        resources=self._config.capacity_reservation_arns,
                    )
                )
            capacity_reservation_resource_group_arns = self._config.capacity_reservation_resource_group_arns
            if capacity_reservation_resource_group_arns:
                policy.extend(
                    [
                        iam.PolicyStatement(
                            sid="AllowManagingReservedCapacity",
                            actions=["ec2:RunInstances", "ec2:CreateFleet", "resource-groups:ListGroupResources"],
                            effect=iam.Effect.ALLOW,
                            resources=capacity_reservation_resource_group_arns,
                        )
                    ]
                )

        if self._config.directory_service:
            password_secret_arn = Arn.split(
                self._config.directory_service.password_secret_arn, ArnFormat.COLON_RESOURCE_NAME
            )
            policy.append(
                iam.PolicyStatement(
                    sid="AllowGettingDirectorySecretValue",
                    actions=[
                        "secretsmanager:GetSecretValue"
                        if password_secret_arn.service == "secretsmanager"
                        else "ssm:GetParameter"
                        if password_secret_arn.service == "ssm"
                        else None
                    ],
                    effect=iam.Effect.ALLOW,
                    resources=[self._config.directory_service.password_secret_arn],
                )
            )

        if self._config.scheduling.scheduler == "slurm" and self._config.scheduling.settings.database:
            policy.append(
                iam.PolicyStatement(
                    sid="AllowGettingSlurmDbSecretValue",
                    actions=["secretsmanager:GetSecretValue"],
                    effect=iam.Effect.ALLOW,
                    resources=[self._config.scheduling.settings.database.password_secret_arn],
                )
            )

        return policy

    def _add_compute_console_output_policy_statement(self, policy):
        if self._config.monitoring.logs.cloud_watch.enabled:
            queue_names = [queue.name for queue in self._config.scheduling.queues]
            policy.append(
                iam.PolicyStatement(
                    sid="EC2GetComputeConsoleOutput",
                    actions=["ec2:GetConsoleOutput"],
                    effect=iam.Effect.ALLOW,
                    resources=[self._format_arn(service="ec2", resource="instance/*")],
                    conditions={
                        "StringEquals": {
                            "aws:ResourceTag/parallelcluster:queue-name": queue_names,
                            "aws:ResourceTag/parallelcluster:node-type": "Compute",
                            "aws:ResourceTag/parallelcluster:cluster-name": Stack.of(self).stack_name,
                        }
                    },
                )
            )

    def _generate_head_node_pass_role_resources(self):
        """Return a unique list of ARNs that the head node should be able to use when calling PassRole."""
        resource_iam_path, _ = add_cluster_iam_resource_prefix(
            self._config.cluster_name, self._config, "", iam_type="AWS::IAM::Role"
        )
        default_pass_role_resource = self._format_arn(
            service="iam",
            region="",
            resource=f"role{self._cluster_scoped_iam_path(iam_path=resource_iam_path)}*",
        )

        # If there are any queues where a custom instance role was specified,
        # enable the head node to pass permissions to those roles.
        custom_queue_role_arns = {
            arn for queue in self._config.scheduling.queues for arn in queue.iam.instance_role_arns
        }
        if custom_queue_role_arns:
            pass_role_resources = custom_queue_role_arns

            # Include the default IAM role path for the queues that
            # aren't using a custom instance role.
            queues_without_custom_roles = [
                queue for queue in self._config.scheduling.queues if not queue.iam.instance_role_arns
            ]
            if any(queues_without_custom_roles):
                pass_role_resources.add(default_pass_role_resource)
        else:
            pass_role_resources = {default_pass_role_resource}
        return list(pass_role_resources)


class LoginNodesIamResources(NodeIamResourcesBase):
    """Construct defining IAM resources for a login node."""

    def __init__(
        self,
        scope: Construct,
        id: str,
        config: BaseClusterConfig,
        node: Union[HeadNode, BaseQueue, LoginNodesPool],
        shared_storage_infos: dict,
        name: str,
        auto_scaling_group_name: str,
    ):
        self._auto_scaling_group_name = auto_scaling_group_name
        super().__init__(scope, id, config, node, shared_storage_infos, name)

    def _build_policy(self) -> List[iam.PolicyStatement]:
        return [
            iam.PolicyStatement(
                sid="Ec2",
                actions=["ec2:DescribeInstanceAttribute"],
                effect=iam.Effect.ALLOW,
                resources=["*"],
            ),
            iam.PolicyStatement(
                sid="S3GetObj",
                actions=["s3:GetObject"],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(
                        service="s3",
                        resource="{0}-aws-parallelcluster/*".format(Stack.of(self).region),
                        region="",
                        account="",
                    )
                ],
            ),
            iam.PolicyStatement(
                sid="Autoscaling",
                actions=[
                    "autoscaling:CompleteLifecycleAction",
                ],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(
                        service="autoscaling",
                        resource=f"autoScalingGroupName/{self._auto_scaling_group_name}",
                    )
                ],
            ),
        ]


class ComputeNodeIamResources(NodeIamResourcesBase):
    """Construct defining IAM resources for a compute node."""

    def __init__(
        self,
        scope: Construct,
        id: str,
        config: BaseClusterConfig,
        node: Union[HeadNode, BaseQueue, LoginNodesPool],
        shared_storage_infos: dict,
        name: str,
    ):
        super().__init__(scope, id, config, node, shared_storage_infos, name)

    def _build_policy(self) -> List[iam.PolicyStatement]:
        return [
            iam.PolicyStatement(
                sid="Ec2",
                actions=[
                    "ec2:DescribeInstanceAttribute",
                ],
                effect=iam.Effect.ALLOW,
                resources=["*"],
            ),
            iam.PolicyStatement(
                sid="S3GetObj",
                actions=["s3:GetObject"],
                effect=iam.Effect.ALLOW,
                resources=[
                    self._format_arn(
                        service="s3",
                        resource="{0}-aws-parallelcluster/*".format(Stack.of(self).region),
                        region="",
                        account="",
                    )
                ],
            ),
        ]


def get_lambda_log_group_prefix(function_id: str):
    """Return the prefix of the log group associated to Lambda functions created using PclusterLambdaConstruct."""
    return f"log-group:/aws/lambda/{PCLUSTER_LAMBDA_PREFIX}{function_id}"


class PclusterLambdaConstruct(Construct):
    """Create a Lambda function with some pre-filled fields."""

    def __init__(
        self,
        scope: Construct,
        id: str,
        function_id: str,
        bucket: S3Bucket,
        config: BaseClusterConfig,
        execution_role: iam.CfnRole,
        handler_func: str,
        timeout: int = 900,
    ):
        super().__init__(scope, id)

        function_name = f"{PCLUSTER_LAMBDA_PREFIX}{function_id}-{self._stack_unique_id()}"

        self.log_group = logs.CfnLogGroup(
            scope,
            f"{function_id}FunctionLogGroup",
            log_group_name=f"/aws/lambda/{function_name}",
            retention_in_days=get_cloud_watch_logs_retention_days(config),
        )
        self.log_group.cfn_options.deletion_policy = get_log_group_deletion_policy(config)

        self.lambda_func = awslambda.CfnFunction(
            scope,
            f"{function_id}Function",
            function_name=function_name,
            code=awslambda.CfnFunction.CodeProperty(
                s3_bucket=bucket.name,
                s3_key=f"{bucket.artifact_directory}/custom_resources/artifacts.zip",
            ),
            handler=f"{handler_func}.handler",
            memory_size=128,
            role=execution_role,
            runtime="python3.9",
            timeout=timeout,
            vpc_config=awslambda.CfnFunction.VpcConfigProperty(
                security_group_ids=config.lambda_functions_vpc_config.security_group_ids,
                subnet_ids=config.lambda_functions_vpc_config.subnet_ids,
            )
            if config.lambda_functions_vpc_config
            else None,
        )

    def _stack_unique_id(self):
        return Fn.select(2, Fn.split("/", Stack.of(self).stack_id))

    def _format_arn(self, **kwargs):
        return Stack.of(self).format_arn(**kwargs)


class CdkLaunchTemplateBuilder(_LaunchTemplateBuilder):
    """Concrete class for building a CDK launch template."""

    def _block_device_mapping_for_ebs(self, device_name, volume):
        return ec2.CfnLaunchTemplate.BlockDeviceMappingProperty(
            device_name=device_name,
            ebs=ec2.CfnLaunchTemplate.EbsProperty(
                volume_size=volume.size,
                encrypted=volume.encrypted,
                volume_type=volume.volume_type,
                iops=volume.iops,
                throughput=volume.throughput,
                delete_on_termination=volume.delete_on_termination,
            ),
        )

    def _block_device_mapping_for_virt(self, device_name, virtual_name):
        return ec2.CfnLaunchTemplate.BlockDeviceMappingProperty(device_name=device_name, virtual_name=virtual_name)

    def _instance_market_option(self, market_type, spot_instance_type, instance_interruption_behavior, max_price):
        return ec2.CfnLaunchTemplate.InstanceMarketOptionsProperty(
            market_type=market_type,
            spot_options=ec2.CfnLaunchTemplate.SpotOptionsProperty(
                spot_instance_type=spot_instance_type,
                instance_interruption_behavior=instance_interruption_behavior,
                max_price=max_price,
            ),
        )

    def _capacity_reservation(self, cr_target):
        return ec2.CfnLaunchTemplate.CapacityReservationSpecificationProperty(
            capacity_reservation_target=ec2.CfnLaunchTemplate.CapacityReservationTargetProperty(
                capacity_reservation_id=cr_target.capacity_reservation_id,
                capacity_reservation_resource_group_arn=cr_target.capacity_reservation_resource_group_arn,
            )
        )