# Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from __future__ import absolute_import from contextlib import contextmanager import logging import signal from time import sleep from awslogs.core import AWSLogs from botocore.exceptions import ClientError LOGGER = logging.getLogger("timeout") class TimeoutError(Exception): pass @contextmanager def timeout(seconds=0, minutes=0, hours=0): """ Add a signal-based timeout to any block of code. If multiple time units are specified, they will be added together to determine time limit. Usage: with timeout(seconds=5): my_slow_function(...) Args: - seconds: The time limit, in seconds. - minutes: The time limit, in minutes. - hours: The time limit, in hours. """ limit = seconds + 60 * minutes + 3600 * hours def handler(signum, frame): raise TimeoutError("timed out after {} seconds".format(limit)) try: signal.signal(signal.SIGALRM, handler) signal.alarm(limit) yield finally: signal.alarm(0) @contextmanager def timeout_and_delete_endpoint_by_name( endpoint_name, sagemaker_session, seconds=0, minutes=45, hours=0 ): with timeout(seconds=seconds, minutes=minutes, hours=hours) as t: no_errors = False try: yield [t] no_errors = True finally: attempts = 3 while attempts > 0: attempts -= 1 try: sagemaker_session.delete_endpoint(endpoint_name) LOGGER.info("deleted endpoint {}".format(endpoint_name)) _show_logs(endpoint_name, "Endpoints", sagemaker_session) if no_errors: _cleanup_logs(endpoint_name, "Endpoints", sagemaker_session) break except ClientError as ce: if ce.response["Error"]["Code"] == "ValidationException": # avoids the inner exception to be overwritten pass # trying to delete the resource again in 10 seconds sleep(10) @contextmanager def timeout_and_delete_model_with_transformer( transformer, sagemaker_session, seconds=0, minutes=0, hours=0 ): with timeout(seconds=seconds, minutes=minutes, hours=hours) as t: no_errors = False try: yield [t] no_errors = True finally: attempts = 3 while attempts > 0: attempts -= 1 try: transformer.delete_model() LOGGER.info("deleted SageMaker model {}".format(transformer.model_name)) _show_logs(transformer.model_name, "Models", sagemaker_session) if no_errors: _cleanup_logs(transformer.model_name, "Models", sagemaker_session) break except ClientError as ce: if ce.response["Error"]["Code"] == "ValidationException": pass sleep(10) def _show_logs(resource_name, resource_type, sagemaker_session): log_group = "/aws/sagemaker/{}/{}".format(resource_type, resource_name) try: # print out logs before deletion for debuggability LOGGER.info("cloudwatch logs for log group {}:".format(log_group)) logs = AWSLogs( log_group_name=log_group, log_stream_name="ALL", start="1d", aws_region=sagemaker_session.boto_session.region_name, ) logs.list_logs() except Exception: LOGGER.exception( "Failure occurred while listing cloudwatch log group %s. Swallowing exception but printing " "stacktrace for debugging.", log_group, ) def _cleanup_logs(resource_name, resource_type, sagemaker_session): log_group = "/aws/sagemaker/{}/{}".format(resource_type, resource_name) try: # print out logs before deletion for debuggability LOGGER.info("deleting cloudwatch log group {}:".format(log_group)) cwl_client = sagemaker_session.boto_session.client("logs") cwl_client.delete_log_group(logGroupName=log_group) LOGGER.info("deleted cloudwatch log group: {}".format(log_group)) except Exception: LOGGER.exception( "Failure occurred while cleaning up cloudwatch log group %s. " "Swallowing exception but printing stacktrace for debugging.", log_group, )