""" Cloudformation deploy class which also streams events and changeset information This was ported over from the sam-cli repo """ # Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. # This is a modified version of the Deployer class from aws-sam-cli # (and its dependencies) to work with python 2 # Modifications: # - Imports now reference local classes # - Alternative imports for python2 # - py3 -> py2 migrations (ex: "".format() instead of f"", no "from" for raise) # - Moved UserException to exceptions.py # - Moved DeployColor to colors.py # - Removed unnecessary functions from artifact_exporter import logging import sys import time from collections import OrderedDict from datetime import datetime import botocore from integration.helpers.deployer.exceptions import exceptions as deploy_exceptions from integration.helpers.deployer.utils.artifact_exporter import mktempfile from integration.helpers.deployer.utils.colors import DeployColor from integration.helpers.deployer.utils.retry import retry_with_exponential_backoff_and_jitter from integration.helpers.deployer.utils.table_print import ( MIN_OFFSET, newline_per_item, pprint_column_names, pprint_columns, ) from integration.helpers.deployer.utils.time_util import utc_to_timestamp from integration.helpers.resource import generate_suffix LOG = logging.getLogger(__name__) DESCRIBE_STACK_EVENTS_FORMAT_STRING = ( "{ResourceStatus:<{0}} {ResourceType:<{1}} {LogicalResourceId:<{2}} {ResourceStatusReason:<{3}}" ) DESCRIBE_STACK_EVENTS_DEFAULT_ARGS = OrderedDict( { "ResourceStatus": "ResourceStatus", "ResourceType": "ResourceType", "LogicalResourceId": "LogicalResourceId", "ResourceStatusReason": "ResourceStatusReason", } ) DESCRIBE_STACK_EVENTS_TABLE_HEADER_NAME = "CloudFormation events from changeset" DESCRIBE_CHANGESET_FORMAT_STRING = "{Operation:<{0}} {LogicalResourceId:<{1}} {ResourceType:<{2}} {Replacement:<{3}}" DESCRIBE_CHANGESET_DEFAULT_ARGS = OrderedDict( { "Operation": "Operation", "LogicalResourceId": "LogicalResourceId", "ResourceType": "ResourceType", "Replacement": "Replacement", } ) DESCRIBE_CHANGESET_TABLE_HEADER_NAME = "CloudFormation stack changeset" OUTPUTS_FORMAT_STRING = "{Outputs:<{0}}" OUTPUTS_DEFAULTS_ARGS = OrderedDict({"Outputs": "Outputs"}) OUTPUTS_TABLE_HEADER_NAME = "CloudFormation outputs from deployed stack" class Deployer: def __init__(self, cloudformation_client, changeset_prefix="sam-integ-"): self._client = cloudformation_client self.changeset_prefix = changeset_prefix # 500ms of sleep time between stack checks and describe stack events. self.client_sleep = 0.5 # 2000ms of backoff time which is exponentially used, when there are exceptions during describe stack events self.backoff = 2 # Maximum number of attempts before raising exception back up the chain. self.max_attempts = 3 self.deploy_color = DeployColor() def create_changeset( self, stack_name, cfn_template, parameter_values, capabilities, role_arn, notification_arns, s3_uploader, tags, changeset_type, ): """ Call Cloudformation to create a changeset and wait for it to complete :param stack_name: Name or ID of stack :param cfn_template: CloudFormation template string :param parameter_values: Template parameters object :param capabilities: Array of capabilities passed to CloudFormation :param tags: Array of tags passed to CloudFormation :param tags: the type of the changeset :return: """ if type == "UPDATE": # UsePreviousValue not valid if parameter is new summary = self._client.get_template_summary(StackName=stack_name) existing_parameters = [parameter["ParameterKey"] for parameter in summary["Parameters"]] parameter_values = [ x for x in parameter_values if not (x.get("UsePreviousValue", False) and x["ParameterKey"] not in existing_parameters) ] else: # When creating a new stack, UsePreviousValue=True is invalid. # For such parameters, users should either override with new value, # or set a Default value in template to successfully create a stack. parameter_values = [x for x in parameter_values if not x.get("UsePreviousValue", False)] # Each changeset will get a unique name based on time. # Description is also setup based on current date and that SAM CLI is used. kwargs = { "ChangeSetName": self.changeset_prefix + str(int(time.time())), "StackName": stack_name, "TemplateBody": cfn_template, "ChangeSetType": changeset_type, "Parameters": parameter_values, "Capabilities": capabilities, "Description": f"Created by SAM CLI at {datetime.utcnow().isoformat()} UTC", "Tags": tags, } # If an S3 uploader is available, use TemplateURL to deploy rather than # TemplateBody. This is required for large templates. if s3_uploader: with mktempfile() as temporary_file: temporary_file.write(kwargs.pop("TemplateBody")) temporary_file.flush() # add random suffix to file_name to enable multiple tests run in the same time template_file_name = "template" + generate_suffix() # TemplateUrl property requires S3 URL to be in path-style format s3_uploader.upload_file(template_file_name, temporary_file.name) kwargs["TemplateURL"] = s3_uploader.get_s3_uri(template_file_name) # don't set these arguments if not specified to use existing values if role_arn is not None: kwargs["RoleARN"] = role_arn if notification_arns is not None: kwargs["NotificationARNs"] = notification_arns return self._create_change_set(stack_name=stack_name, changeset_type=changeset_type, **kwargs) def _create_change_set(self, stack_name, changeset_type, **kwargs): try: resp = self._client.create_change_set(**kwargs) return resp, changeset_type except botocore.exceptions.ClientError as ex: if "The bucket you are attempting to access must be addressed using the specified endpoint" in str(ex): raise deploy_exceptions.DeployBucketInDifferentRegionError( f"Failed to create/update stack {stack_name}" ) raise deploy_exceptions.ChangeSetError(stack_name=stack_name, msg=str(ex)) except Exception as ex: LOG.debug("Unable to create changeset", exc_info=ex) raise deploy_exceptions.ChangeSetError(stack_name=stack_name, msg=str(ex)) @pprint_column_names( format_string=DESCRIBE_CHANGESET_FORMAT_STRING, format_kwargs=DESCRIBE_CHANGESET_DEFAULT_ARGS, table_header=DESCRIBE_CHANGESET_TABLE_HEADER_NAME, ) def describe_changeset(self, change_set_id, stack_name, **kwargs): """ Call Cloudformation to describe a changeset :param change_set_id: ID of the changeset :param stack_name: Name of the CloudFormation stack :return: dictionary of changes described in the changeset. """ paginator = self._client.get_paginator("describe_change_set") response_iterator = paginator.paginate(ChangeSetName=change_set_id, StackName=stack_name) changes = {"Add": [], "Modify": [], "Remove": []} changes_showcase = {"Add": "+ Add", "Modify": "* Modify", "Remove": "- Delete"} changeset = False for item in response_iterator: cf_changes = item.get("Changes") for change in cf_changes: changeset = True resource_props = change.get("ResourceChange") action = resource_props.get("Action") changes[action].append( { "LogicalResourceId": resource_props.get("LogicalResourceId"), "ResourceType": resource_props.get("ResourceType"), "Replacement": "N/A" if resource_props.get("Replacement") is None else resource_props.get("Replacement"), } ) for k, v in changes.items(): for value in v: row_color = self.deploy_color.get_changeset_action_color(action=k) pprint_columns( columns=[ changes_showcase.get(k, k), value["LogicalResourceId"], value["ResourceType"], value["Replacement"], ], width=kwargs["width"], margin=kwargs["margin"], format_string=DESCRIBE_CHANGESET_FORMAT_STRING, format_args=kwargs["format_args"], columns_dict=DESCRIBE_CHANGESET_DEFAULT_ARGS.copy(), color=row_color, ) if not changeset: # There can be cases where there are no changes, # but could be an an addition of a SNS notification topic. pprint_columns( columns=["-", "-", "-", "-"], width=kwargs["width"], margin=kwargs["margin"], format_string=DESCRIBE_CHANGESET_FORMAT_STRING, format_args=kwargs["format_args"], columns_dict=DESCRIBE_CHANGESET_DEFAULT_ARGS.copy(), ) return changes def wait_for_changeset(self, changeset_id, stack_name): """ Waits until the changeset creation completes :param changeset_id: ID or name of the changeset :param stack_name: Stack name :return: Latest status of the create-change-set operation """ sys.stdout.write("\nWaiting for changeset to be created...\n") sys.stdout.flush() # Wait for changeset to be created waiter = self._client.get_waiter("change_set_create_complete") # Poll every 5 seconds. Changeset creation should be fast waiter_config = {"Delay": 5} try: waiter.wait(ChangeSetName=changeset_id, StackName=stack_name, WaiterConfig=waiter_config) except botocore.exceptions.WaiterError as ex: LOG.error("Waiter exception waiting for changeset", exc_info=ex) resp = ex.last_response status = resp.get("Status", "") reason = resp.get("StatusReason", "") if ( status == "FAILED" and "The submitted information didn't contain changes." in reason or "No updates are to be performed" in reason ): raise deploy_exceptions.ChangeEmptyError(stack_name=stack_name) raise deploy_exceptions.ChangeSetError( stack_name=stack_name, msg=f"ex: {ex} Status: {status}. Reason: {reason}" ) def execute_changeset(self, changeset_id, stack_name): """ Calls CloudFormation to execute changeset :param changeset_id: ID of the changeset :param stack_name: Name or ID of the stack :return: Response from execute-change-set call """ try: return self._client.execute_change_set(ChangeSetName=changeset_id, StackName=stack_name) except botocore.exceptions.ClientError as ex: raise deploy_exceptions.DeployFailedError(stack_name=stack_name, msg=str(ex)) def get_last_event_time(self, stack_name): """ Finds the last event time stamp thats present for the stack, if not get the current time :param stack_name: Name or ID of the stack :return: unix epoch """ try: return utc_to_timestamp( self._client.describe_stack_events(StackName=stack_name)["StackEvents"][0]["Timestamp"] ) except KeyError: return time.time() def _check_stack_complete(self, status): return "COMPLETE" in status and "CLEANUP" not in status def wait_for_execute(self, stack_name, changeset_type): sys.stdout.write( "\n{} - Waiting for stack create/update " "to complete\n".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) ) sys.stdout.flush() # Pick the right waiter if changeset_type == "CREATE": waiter = self._client.get_waiter("stack_create_complete") elif changeset_type == "UPDATE": waiter = self._client.get_waiter("stack_update_complete") else: raise RuntimeError(f"Invalid changeset type {changeset_type}") # Poll every 30 seconds. Polling too frequently risks hitting rate limits # on CloudFormation's DescribeStacks API waiter_config = {"Delay": 30, "MaxAttempts": 120} self._wait(stack_name, waiter, waiter_config) outputs = self.get_stack_outputs(stack_name=stack_name, echo=False) if outputs: self._display_stack_outputs(outputs) @retry_with_exponential_backoff_and_jitter(deploy_exceptions.ThrottlingError, 5, 360) def _wait(self, stack_name, waiter, waiter_config): try: waiter.wait(StackName=stack_name, WaiterConfig=waiter_config) except botocore.exceptions.WaiterError as ex: LOG.debug("Execute changeset waiter exception", exc_info=ex) if "Throttling" in str(ex): raise deploy_exceptions.ThrottlingError(stack_name=stack_name, msg=str(ex)) else: raise deploy_exceptions.DeployFailedError(stack_name=stack_name, msg=str(ex)) def create_and_wait_for_changeset( self, stack_name, cfn_template, parameter_values, capabilities, role_arn, notification_arns, s3_uploader, tags, changeset_type, ): try: result, changeset_type = self.create_changeset( stack_name, cfn_template, parameter_values, capabilities, role_arn, notification_arns, s3_uploader, tags, changeset_type, ) self.wait_for_changeset(result["Id"], stack_name) self.describe_changeset(result["Id"], stack_name) return result except deploy_exceptions.ChangeEmptyError: return {} except botocore.exceptions.ClientError as ex: raise deploy_exceptions.DeployFailedError(stack_name=stack_name, msg=str(ex)) @pprint_column_names( format_string=OUTPUTS_FORMAT_STRING, format_kwargs=OUTPUTS_DEFAULTS_ARGS, table_header=OUTPUTS_TABLE_HEADER_NAME ) def _display_stack_outputs(self, stack_outputs, **kwargs): for counter, output in enumerate(stack_outputs): for k, v in [ ("Key", output.get("OutputKey")), ("Description", output.get("Description", "-")), ("Value", output.get("OutputValue")), ]: pprint_columns( columns=["{k:<{0}}{v:<{0}}".format(MIN_OFFSET, k=k, v=v)], width=kwargs["width"], margin=kwargs["margin"], format_string=OUTPUTS_FORMAT_STRING, format_args=kwargs["format_args"], columns_dict=OUTPUTS_DEFAULTS_ARGS.copy(), color="green", replace_whitespace=False, break_long_words=False, drop_whitespace=False, ) newline_per_item(stack_outputs, counter) @retry_with_exponential_backoff_and_jitter(deploy_exceptions.ThrottlingError, 5, 360) def get_stack_outputs(self, stack_name, echo=True): try: stacks_description = self._client.describe_stacks(StackName=stack_name) try: outputs = stacks_description["Stacks"][0]["Outputs"] if echo: sys.stdout.write(f"\nStack {stack_name} outputs:\n") sys.stdout.flush() self._display_stack_outputs(stack_outputs=outputs) return outputs except KeyError: return None except botocore.exceptions.ClientError as ex: if "Throttling" in str(ex): raise deploy_exceptions.ThrottlingError(stack_name=stack_name, msg=str(ex)) else: raise deploy_exceptions.DeployStackOutPutFailedError(stack_name=stack_name, msg=str(ex))