# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance # with the License. A copy of the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and # limitations under the License. import pytest from assertpy import assert_that from pcluster.aws.common import AWSClientError from pcluster.config.cluster_config import ExistingFsxFileCache, ExistingFsxOpenZfs from pcluster.schemas.cluster_schema import ClusterSchema from pcluster.templates.cdk_builder import CDKTemplateBuilder from pcluster.utils import load_yaml_dict from tests.pcluster.aws.dummy_aws_api import _DummyAWSApi, _DummyInstanceTypeInfo, mock_aws_api from tests.pcluster.models.dummy_s3_bucket import dummy_cluster_bucket, mock_bucket_object_utils from tests.pcluster.utils import assert_sg_rule, get_head_node_policy, get_resources, get_statement_by_sid @pytest.mark.parametrize( "config_file_name, storage_name, deletion_policy", [ ("config.yaml", "shared-ebs-managed-1", "Delete"), ("config.yaml", "shared-ebs-managed-2", "Delete"), ("config.yaml", "shared-ebs-managed-3", "Retain"), ], ) def test_shared_storage_ebs(mocker, test_datadir, config_file_name, storage_name, deletion_policy): mock_aws_api(mocker) mock_bucket_object_utils(mocker) input_yaml = load_yaml_dict(test_datadir / config_file_name) cluster_config = ClusterSchema(cluster_name="clustername").load(input_yaml) generated_template, _ = CDKTemplateBuilder().build_cluster_template( cluster_config=cluster_config, bucket=dummy_cluster_bucket(), stack_name="clustername" ) volumes = get_resources( generated_template, type="AWS::EC2::Volume", properties={"Tags": [{"Key": "Name", "Value": storage_name}]} ) assert_that(volumes).is_length(1) volume = next(iter(volumes.values())) assert_that(volume["DeletionPolicy"]).is_equal_to(deletion_policy) assert_that(volume["UpdateReplacePolicy"]).is_equal_to(deletion_policy) @pytest.mark.parametrize( "config_file_name, storage_name, deletion_policy", [ ("config.yaml", "shared-efs-managed-1", "Delete"), ("config.yaml", "shared-efs-managed-2", "Delete"), ("config.yaml", "shared-efs-managed-3", "Retain"), ("config-custom-sg.yaml", "shared-efs-managed-1", "Delete"), ], ) def test_shared_storage_efs(mocker, test_datadir, config_file_name, storage_name, deletion_policy): mock_aws_api(mocker) mock_bucket_object_utils(mocker) input_yaml = load_yaml_dict(test_datadir / config_file_name) cluster_config = ClusterSchema(cluster_name="clustername").load(input_yaml) generated_template, _ = CDKTemplateBuilder().build_cluster_template( cluster_config=cluster_config, bucket=dummy_cluster_bucket(), stack_name="clustername" ) file_systems = get_resources( generated_template, type="AWS::EFS::FileSystem", properties={"FileSystemTags": [{"Key": "Name", "Value": storage_name}]}, ) assert_that(file_systems).is_length(1) file_system_name = next(iter(file_systems.keys())) file_system = file_systems[file_system_name] assert_that(file_system["DeletionPolicy"]).is_equal_to(deletion_policy) assert_that(file_system["UpdateReplacePolicy"]).is_equal_to(deletion_policy) mount_targets = get_resources( generated_template, type="AWS::EFS::MountTarget", properties={"FileSystemId": {"Ref": file_system_name}} ) assert_that(mount_targets).is_length(1) mount_target = next(iter(mount_targets.values())) assert_that(mount_target["DeletionPolicy"]).is_equal_to(deletion_policy) assert_that(mount_target["UpdateReplacePolicy"]).is_equal_to(deletion_policy) mount_target_sg_name = mount_target["Properties"]["SecurityGroups"][0]["Ref"] mount_target_sg = generated_template["Resources"][mount_target_sg_name] assert_that(mount_target_sg["DeletionPolicy"]).is_equal_to(deletion_policy) assert_that(mount_target_sg["UpdateReplacePolicy"]).is_equal_to(deletion_policy) login_nodes_networking = cluster_config.login_nodes.pools[0].networking login_nodes_sg_name = ( login_nodes_networking.security_groups[0] if login_nodes_networking.security_groups else "LoginNodesSecurityGroup" ) for sg in ["HeadNodeSecurityGroup", "ComputeSecurityGroup", login_nodes_sg_name, mount_target_sg_name]: ingress_ip_protocol = "tcp" if sg == login_nodes_sg_name else "-1" ingress_port_range = [2049, 2049] if sg == login_nodes_sg_name else [0, 65535] rule_deletion_policy = deletion_policy if sg == mount_target_sg_name else None assert_sg_rule( generated_template, mount_target_sg_name, rule_type="ingress", protocol=ingress_ip_protocol, port_range=ingress_port_range, target_sg=sg, deletion_policy=rule_deletion_policy, ) assert_sg_rule( generated_template, mount_target_sg_name, rule_type="egress", protocol="-1", port_range=[0, 65535], target_sg=sg, deletion_policy=rule_deletion_policy, ) @pytest.mark.parametrize( "config_file_name, storage_name, fs_type, deletion_policy", [ ("config.yaml", "shared-fsx-lustre-managed-1", "LUSTRE", "Delete"), ("config.yaml", "shared-fsx-lustre-managed-2", "LUSTRE", "Delete"), ("config.yaml", "shared-fsx-lustre-managed-3", "LUSTRE", "Retain"), ("config-custom-sg.yaml", "shared-fsx-lustre-managed-1", "LUSTRE", "Delete"), ], ) def test_shared_storage_fsx(mocker, test_datadir, config_file_name, storage_name, fs_type, deletion_policy): mock_aws_api(mocker) mock_bucket_object_utils(mocker) input_yaml = load_yaml_dict(test_datadir / config_file_name) cluster_config = ClusterSchema(cluster_name="clustername").load(input_yaml) generated_template, _ = CDKTemplateBuilder().build_cluster_template( cluster_config=cluster_config, bucket=dummy_cluster_bucket(), stack_name="clustername" ) file_systems = get_resources( generated_template, type="AWS::FSx::FileSystem", properties={"Tags": [{"Key": "Name", "Value": storage_name}]} ) assert_that(file_systems).is_length(1) file_system = next(iter(file_systems.values())) assert_that(file_system["Properties"]["FileSystemType"]).is_equal_to(fs_type) assert_that(file_system["DeletionPolicy"]).is_equal_to(deletion_policy) assert_that(file_system["UpdateReplacePolicy"]).is_equal_to(deletion_policy) file_system_sg_name = file_system["Properties"]["SecurityGroupIds"][0]["Ref"] file_system_sg = generated_template["Resources"][file_system_sg_name] assert_that(file_system_sg["DeletionPolicy"]).is_equal_to(deletion_policy) assert_that(file_system_sg["UpdateReplacePolicy"]).is_equal_to(deletion_policy) login_nodes_networking = cluster_config.login_nodes.pools[0].networking login_nodes_sg_name = ( login_nodes_networking.security_groups[0] if login_nodes_networking.security_groups else "LoginNodesSecurityGroup" ) for sg in ["HeadNodeSecurityGroup", "ComputeSecurityGroup", login_nodes_sg_name, file_system_sg_name]: ingress_ip_protocol = "tcp" if sg == login_nodes_sg_name else "-1" ingress_port_range = [988, 988] if sg == login_nodes_sg_name else [0, 65535] rule_deletion_policy = deletion_policy if sg == file_system_sg_name else None assert_sg_rule( generated_template, file_system_sg_name, rule_type="ingress", protocol=ingress_ip_protocol, port_range=ingress_port_range, target_sg=sg, deletion_policy=rule_deletion_policy, ) assert_sg_rule( generated_template, file_system_sg_name, rule_type="egress", protocol="-1", port_range=[0, 65535], target_sg=sg, deletion_policy=rule_deletion_policy, ) @pytest.mark.parametrize( "config_file_name", [ ("unmanaged_config.yaml"), ], ) def test_unmanaged_shared_storage_fsx(mocker, test_datadir, config_file_name): # Check dna.json has DNS Name, FSx Id's and Mount Name for Unmanaed storage mock_aws_api(mocker) mock_bucket_object_utils(mocker) input_yaml = load_yaml_dict(test_datadir / config_file_name) cluster_config = ClusterSchema(cluster_name="clustername").load(input_yaml) generated_template, _ = CDKTemplateBuilder().build_cluster_template( cluster_config=cluster_config, bucket=dummy_cluster_bucket(), stack_name="clustername" ) head_node_dna_json_file = ( generated_template.get("Resources") .get("HeadNodeLaunchTemplate") .get("Metadata") .get("AWS::CloudFormation::Init") .get("deployConfigFiles") .get("files") .get("/tmp/dna.json") .get("content") .get("Fn::Join")[1][4] ) for storage in cluster_config.shared_storage: if storage.existing_dns_name: assert_that(storage.existing_dns_name in head_node_dna_json_file).is_true() if isinstance(storage, ExistingFsxFileCache): assert_that(storage.file_cache_id in head_node_dna_json_file).is_true() assert_that(storage.file_cache_mount_name in head_node_dna_json_file).is_true() assert_that(storage.file_cache_id in generated_template.get("Outputs").get("FSXIds").get("Value")).is_true() else: assert_that(storage.volume_id in head_node_dna_json_file).is_true() if isinstance(storage, ExistingFsxOpenZfs): assert_that(storage.volume_path in head_node_dna_json_file).is_true() else: assert_that(storage.junction_path in head_node_dna_json_file).is_true() @pytest.mark.parametrize( "config_file_name", [("config.yaml"), ("file_cache_config.yaml")], ) def test_non_happy_storage(mocker, test_datadir, config_file_name): dummy_api = _DummyAWSApi() if config_file_name == "file_cache_config.yaml": # Check for Non-Existence of File cache dummy_api._fsx.set_non_happy_describe_storage( AWSClientError( function_name="describe_file_caches", message="The cache with cache ID 'fc-12345678' does not exist." ) ) else: # Check for Non-Happy Storage Mounting dummy_api._fsx.set_non_happy_describe_storage( AWSClientError(function_name="describe_volumes", message="describing volumes is unauthorized") ) mocker.patch("pcluster.aws.aws_api.AWSApi.instance", return_value=dummy_api) mocker.patch("pcluster.aws.ec2.Ec2Client.get_instance_type_info", side_effect=_DummyInstanceTypeInfo) input_yaml = load_yaml_dict(test_datadir / config_file_name) cluster_config = ClusterSchema(cluster_name="clustername").load(input_yaml) with pytest.raises(AWSClientError): CDKTemplateBuilder().build_cluster_template( cluster_config=cluster_config, bucket=dummy_cluster_bucket(), stack_name="clustername" ) @pytest.mark.parametrize( "config_file_name", [ ("config.yaml"), ], ) def test_efs_permissions(mocker, test_datadir, config_file_name): mock_aws_api(mocker) mock_bucket_object_utils(mocker) input_yaml = load_yaml_dict(test_datadir / config_file_name) cluster_config = ClusterSchema(cluster_name="clustername").load(input_yaml) generated_template, _ = CDKTemplateBuilder().build_cluster_template( cluster_config=cluster_config, bucket=dummy_cluster_bucket(), stack_name="clustername" ) head_node_policy = get_head_node_policy(generated_template) statement = get_statement_by_sid(policy=head_node_policy, sid="Efs") assert_that(statement["Effect"]).is_equal_to("Allow") assert_that(statement["Action"]).contains_only( "elasticfilesystem:ClientMount", "elasticfilesystem:ClientRootAccess", "elasticfilesystem:ClientWrite" )