import logging import time from pathlib import Path import boto3 import botocore import pytest from botocore.exceptions import ClientError from integration.helpers.base_test import S3_BUCKET_PREFIX from integration.helpers.client_provider import ClientProvider from integration.helpers.deployer.exceptions.exceptions import S3DoesNotExistException, ThrottlingError from integration.helpers.deployer.utils.retry import retry_with_exponential_backoff_and_jitter from integration.helpers.resource import ( current_region_does_not_support, read_test_config_file, write_test_config_file_to_json, ) from integration.helpers.stack import Stack from integration.helpers.yaml_utils import load_yaml LOG = logging.getLogger(__name__) COMPANION_STACK_NAME = "sam-integ-stack-companion" COMPANION_STACK_TEMPLATE = "companion-stack.yaml" SAR_APP_TEMPLATE = "example-sar-app.yaml" SAR_APP_NAME = "sam-integration-test-sar-app" SAR_APP_VERSION = "1.0.3" def _get_all_buckets(): s3 = boto3.resource("s3") return s3.buckets.all() def clean_bucket(s3_bucket_name, s3_client): """ Empties and deletes the bucket used for the tests """ s3 = boto3.resource("s3") bucket = s3.Bucket(s3_bucket_name) object_summary_iterator = bucket.objects.all() for object_summary in object_summary_iterator: try: s3_client.delete_object(Key=object_summary.key, Bucket=s3_bucket_name) except ClientError as e: LOG.error("Unable to delete object %s from bucket %s", object_summary.key, s3_bucket_name, exc_info=e) try: s3_client.delete_bucket(Bucket=s3_bucket_name) except ClientError as e: LOG.error("Unable to delete bucket %s", s3_bucket_name, exc_info=e) @pytest.fixture(scope="session") def clean_all_integ_buckets(): buckets = _get_all_buckets() s3_client = ClientProvider().s3_client for bucket in buckets: if bucket.name.startswith(S3_BUCKET_PREFIX): clean_bucket(bucket.name, s3_client) def _delete_unused_network_interface_by_subnet(ec2_client, subnet_id): """Deletes unused network interface under the provided subnet""" paginator = ec2_client.get_paginator("describe_network_interfaces") response_iterator = paginator.paginate( Filters=[ {"Name": "subnet-id", "Values": [subnet_id]}, {"Name": "status", "Values": ["available"]}, ] ) network_interface_ids = [] for page in response_iterator: network_interface_ids += [ni["NetworkInterfaceId"] for ni in page["NetworkInterfaces"]] for ni_id in network_interface_ids: ec2_client.delete_network_interface(NetworkInterfaceId=ni_id) time.sleep(0.5) LOG.info("Deleted %s unused network interfaces under subnet %s", len(network_interface_ids), subnet_id) @pytest.fixture() def setup_companion_stack_once(tmpdir_factory, get_prefix): tests_integ_dir = Path(__file__).resolve().parents[1] template_foler = Path(tests_integ_dir, "integration", "setup") companion_stack_tempalte_path = Path(template_foler, COMPANION_STACK_TEMPLATE) cfn_client = ClientProvider().cfn_client output_dir = tmpdir_factory.mktemp("data") stack_name = get_prefix + COMPANION_STACK_NAME companion_stack = Stack(stack_name, companion_stack_tempalte_path, cfn_client, output_dir) companion_stack.create_or_update(_stack_exists(stack_name)) ec2_client = ClientProvider().ec2_client precreated_subnet_ids = [ resource["PhysicalResourceId"] for resource in companion_stack.stack_resources["StackResourceSummaries"] if resource["LogicalResourceId"].startswith("PreCreatedSubnet") ] for subnet_id in precreated_subnet_ids: _delete_unused_network_interface_by_subnet(ec2_client, subnet_id) @pytest.fixture() def get_serverless_application_repository_app(): """Create or re-use a simple SAR app""" if current_region_does_not_support(["ServerlessRepo"]): LOG.info("Creating SAR application is skipped since SAR tests are not supported in this region.") return None sar_client = ClientProvider().sar_client sar_apps = sar_client.list_applications().get("Applications", []) for sar_app in sar_apps: if sar_app.get("Name") == SAR_APP_NAME: LOG.info("SAR Application was already created, skipping SAR application publish") return sar_app.get("ApplicationId") LOG.info("SAR application not found, publishing new one...") tests_integ_dir = Path(__file__).resolve().parents[1] template_foler = Path(tests_integ_dir, "integration", "setup") sar_app_template_path = Path(template_foler, SAR_APP_TEMPLATE) with open(sar_app_template_path) as f: sar_template_contents = f.read() create_app_result = sar_client.create_application( Author="SAM Team", Description="SAR Application for Integration Tests", Name=SAR_APP_NAME, SemanticVersion=SAR_APP_VERSION, TemplateBody=sar_template_contents, ) LOG.info("SAR application creation result: %s", create_app_result) return create_app_result.get("ApplicationId") @pytest.fixture() def upload_resources(get_s3): """ Creates the bucket and uploads the files used by the tests to it """ s3_bucket = get_s3 if not _s3_exists(s3_bucket): raise S3DoesNotExistException(get_s3, "Check companion stack status") code_dir = Path(__file__).resolve().parents[0].joinpath("resources").joinpath("code") file_to_s3_uri_map = read_test_config_file("file_to_s3_map.json") if not file_to_s3_uri_map or not file_to_s3_uri_map.items(): LOG.debug("No resources to upload") return current_file_name = "" try: s3_client = ClientProvider().s3_client session = boto3.session.Session() region = session.region_name for file_name, file_info in file_to_s3_uri_map.items(): current_file_name = file_name code_path = str(Path(code_dir, file_name)) LOG.debug("Uploading file %s to bucket %s", file_name, s3_bucket) s3_client.upload_file(code_path, s3_bucket, file_name) LOG.debug("File %s uploaded successfully to bucket %s", file_name, s3_bucket) file_info["uri"] = get_s3_uri(file_name, file_info["type"], s3_bucket, region) except ClientError as error: LOG.error("Upload of file %s to bucket %s failed", current_file_name, s3_bucket, exc_info=error) raise error write_test_config_file_to_json("file_to_s3_map_modified.json", file_to_s3_uri_map) def get_s3_uri(file_name, uri_type, bucket, region): if uri_type == "s3": return f"s3://{bucket}/{file_name}" if region == "us-east-1": return f"https://s3.amazonaws.com/{bucket}/{file_name}" if region == "us-iso-east-1": return f"https://s3.us-iso-east-1.c2s.ic.gov/{bucket}/{file_name}" if region == "us-isob-east-1": return f"https://s3.us-isob-east-1.sc2s.sgov.gov/{bucket}/{file_name}" return f"https://s3-{region}.amazonaws.com/{bucket}/{file_name}" @pytest.fixture() def delete_companion_stack_once(get_prefix): if not get_prefix: ClientProvider().cfn_client.delete_stack(StackName=COMPANION_STACK_NAME) @retry_with_exponential_backoff_and_jitter(ThrottlingError, 5, 360) def get_stack_description(stack_name): try: stack_description = ClientProvider().cfn_client.describe_stacks(StackName=stack_name) return stack_description except botocore.exceptions.ClientError as ex: if "Throttling" in str(ex): raise ThrottlingError(stack_name=stack_name, msg=str(ex)) raise ex def get_stack_outputs(stack_description): if not stack_description: return {} output_list = stack_description["Stacks"][0]["Outputs"] return {output["OutputKey"]: output["OutputValue"] for output in output_list} @pytest.fixture() def get_companion_stack_outputs(get_prefix): companion_stack_description = get_stack_description(get_prefix + COMPANION_STACK_NAME) return get_stack_outputs(companion_stack_description) @pytest.fixture() def get_s3(get_companion_stack_outputs): s3_bucket = get_companion_stack_outputs.get("PreCreatedS3Bucket") return str(s3_bucket) @pytest.fixture() def get_prefix(request): prefix = "" if request.config.getoption("--prefix"): prefix = request.config.getoption("--prefix") + "-" return prefix @pytest.fixture() def get_stage(request): stage = "" if request.config.getoption("--stage"): stage = request.config.getoption("--stage") return stage @pytest.fixture() def check_internal(request): internal = False if request.config.getoption("--internal"): internal = True return internal @pytest.fixture() def parameter_values(request): parameter_values = {} parameter_values_option = request.config.getoption("--parameter-values") if parameter_values_option: parameter_values = load_yaml(parameter_values_option) return parameter_values def pytest_addoption(parser): parser.addoption( "--prefix", default=None, action="store", help="the prefix of the stack", ) parser.addoption( "--stage", default=None, action="store", help="the stage of the stack", ) parser.addoption( "--internal", action="store_true", help="run internal tests", ) parser.addoption( "--parameter-values", default=None, help="YAML file path which will contain parameter values that could be passed during deployment of the stack", ) @retry_with_exponential_backoff_and_jitter(ThrottlingError, 5, 360) def _stack_exists(stack_name): cloudformation = boto3.resource("cloudformation") stack = cloudformation.Stack(stack_name) try: status = stack.stack_status if status == "REVIEW_IN_PROGRESS": return False except ClientError as ex: if "does not exist" in str(ex): return False if "Throttling" in str(ex): raise ThrottlingError(stack_name=stack_name, msg=str(ex)) raise ex return True @retry_with_exponential_backoff_and_jitter(ThrottlingError, 5, 360) def _s3_exists(s3_bucket): s3 = boto3.resource("s3") bucket = s3.Bucket(s3_bucket) try: s3.meta.client.head_bucket(Bucket=bucket.name) except ClientError: return False return True