''' Send notification to administrator via Amazon Simple Notification Service (SNS) when pipeline fails Required Permissions: SNS:CreateTopics SNS:Publish etc. ''' import argparse import json import logging import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_topic(self, name): """ Creates a notification topic. :param name: The name of the topic to create. :return: The newly created topic. """ try: topic = self.sns_resource.create_topic(Name=name) logger.info("Created topic %s with ARN %s.", name, topic.arn) except ClientError: logger.exception("Couldn't create topic %s.", name) raise else: return topic def list_topics(self): """ Lists topics for the current account. :return: An iterator that yields the topics. """ try: topics_iter = self.sns_resource.topics.all() logger.info("Got topics.") except ClientError: logger.exception("Couldn't get topics.") raise else: return topics_iter @staticmethod def delete_topic(topic): """ Deletes a topic. All subscriptions to the topic are also deleted. """ try: topic.delete() logger.info("Deleted topic %s.", topic.arn) except ClientError: logger.exception("Couldn't delete topic %s.", topic.arn) raise @staticmethod def subscribe(topic, protocol, endpoint): """ Subscribes an endpoint to the topic. Some endpoint types, such as email, must be confirmed before their subscriptions are active. When a subscription is not confirmed, its Amazon Resource Number (ARN) is set to 'PendingConfirmation'. :param topic: The topic to subscribe to. :param protocol: The protocol of the endpoint, such as 'sms' or 'email'. :param endpoint: The endpoint that receives messages, such as a phone number (in E.164 format) for SMS messages, or an email address for email messages. :return: The newly added subscription. """ try: subscription = topic.subscribe( Protocol=protocol, Endpoint=endpoint, ReturnSubscriptionArn=True) logger.info("Subscribed %s %s to topic %s.", protocol, endpoint, topic.arn) except ClientError: logger.exception( "Couldn't subscribe %s %s to topic %s.", protocol, endpoint, topic.arn) raise else: return subscription def list_subscriptions(self, topic=None): """ Lists subscriptions for the current account, optionally limited to a specific topic. :param topic: When specified, only subscriptions to this topic are returned. :return: An iterator that yields the subscriptions. """ try: if topic is None: subs_iter = self.sns_resource.subscriptions.all() else: subs_iter = topic.subscriptions.all() logger.info("Got subscriptions.") except ClientError: logger.exception("Couldn't get subscriptions.") raise else: return subs_iter @staticmethod def add_subscription_filter(subscription, attributes): """ Adds a filter policy to a subscription. A filter policy is a key and a list of values that are allowed. When a message is published, it must have an attribute that passes the filter or it will not be sent to the subscription. :param subscription: The subscription the filter policy is attached to. :param attributes: A dictionary of key-value pairs that define the filter. """ try: att_policy = {key: [value] for key, value in attributes.items()} subscription.set_attributes( AttributeName='FilterPolicy', AttributeValue=json.dumps(att_policy)) logger.info("Added filter to subscription %s.", subscription.arn) except ClientError: logger.exception( "Couldn't add filter to subscription %s.", subscription.arn) raise @staticmethod def delete_subscription(subscription): """ Unsubscribes and deletes a subscription. """ try: subscription.delete() logger.info("Deleted subscription %s.", subscription.arn) except ClientError: logger.exception("Couldn't delete subscription %s.", subscription.arn) raise def publish_text_message(self, phone_number, message): """ Publishes a text message directly to a phone number without need for a subscription. :param phone_number: The phone number that receives the message. This must be in E.164 format. For example, a United States phone number might be +12065550101. :param message: The message to send. :return: The ID of the message. """ try: response = self.sns_resource.meta.client.publish( PhoneNumber=phone_number, Message=message) message_id = response['MessageId'] logger.info("Published message to %s.", phone_number) except ClientError: logger.exception("Couldn't publish message to %s.", phone_number) raise else: return message_id @staticmethod def publish_message(topic, message, attributes): """ Publishes a message, with attributes, to a topic. Subscriptions can be filtered based on message attributes so that a subscription receives messages only when specified attributes are present. :param topic: The topic to publish to. :param message: The message to publish. :param attributes: The key-value attributes to attach to the message. Values must be either `str` or `bytes`. :return: The ID of the message. """ try: att_dict = {} for key, value in attributes.items(): if isinstance(value, str): att_dict[key] = {'DataType': 'String', 'StringValue': value} elif isinstance(value, bytes): att_dict[key] = {'DataType': 'Binary', 'BinaryValue': value} response = topic.publish(Message=message, MessageAttributes=att_dict) message_id = response['MessageId'] logger.info( "Published message with attributes %s to topic %s.", attributes, topic.arn) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id @staticmethod def publish_multi_message( topic, subject, default_message, sms_message, email_message): """ Publishes a multi-format message to a topic. A multi-format message takes different forms based on the protocol of the subscriber. For example, an SMS subscriber might receive a short, text-only version of the message while an email subscriber could receive an HTML version of the message. :param topic: The topic to publish to. :param subject: The subject of the message. :param default_message: The default version of the message. This version is sent to subscribers that have protocols that are not otherwise specified in the structured message. :param sms_message: The version of the message sent to SMS subscribers. :param email_message: The version of the message sent to email subscribers. :return: The ID of the message. """ try: message = { 'default': default_message, 'sms': sms_message, 'email': email_message } response = topic.publish( Message=json.dumps(message), Subject=subject, MessageStructure='json') message_id = response['MessageId'] logger.info("Published multi-format message to topic %s.", topic.arn) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--alert-topic', default='KGPipelineAlert', type=str) parser.add_argument('--alert-message', default='demo alert', type=str) parser.add_argument('--alert-emails', type=str, required=False, help='a list of emails separated by comma') parser.add_argument('--alert-phones', type=str, required=False, help='a list of phone numbers in E164 format separated by comma') return parser.parse_known_args() if __name__ == '__main__': args, _ = parse_args() sns_wrapper = SnsWrapper(boto3.resource('sns')) topic = sns_wrapper.create_topic(args.alert_topic) phones, emails = [], [] if args.alert_phones is not None: phones = args.alert_phones.split(sep=',') if args.alert_emails is not None: emails = args.alert_emails.split(sep=',') subscribed = sns_wrapper.list_subscriptions(topic) logger.info("Existing subscribers:") for subscriber in subscribed: logger.info(f"{subscriber.attributes['Protocol']}: {subscriber.attributes['Endpoint']}") email_subscribers = [subscriber.attributes['Endpoint'] for subscriber in subscribed if subscriber.attributes['Protocol']=='email'] phone_subscribers = [subscriber.attributes['Endpoint'] for subscriber in subscribed if subscriber.attributes['Protocol']=='sms'] for phone in phones: if phone not in phone_subscribers: logger.info(f"Subscribing {phone} to {args.alert_topic}.") phone_sub = sns_wrapper.subscribe(topic, 'sms', phone) for email in emails: if email not in phone_subscribers: logger.info(f"Subscribing {email} to {args.alert_topic}.") email_sub = sns_wrapper.subscribe(topic, 'email', email) logger.info( f"Confirmation email sent to {email}. To receive SNS messages, " f"follow the instructions in the email." ) sns_wrapper.publish_multi_message( topic, args.alert_topic, args.alert_message, args.alert_message + ' (SMS)', args.alert_message + ' (Email)')