#!/usr/bin/env python3 # -*- encoding: utf-8 -*- #vim: tabstop=2 shiftwidth=2 softtabstop=2 expandtab import os import logging from urllib.parse import urlparse import boto3 import botocore from boto3.dynamodb.conditions import ( Key, Attr ) LOGGER = logging.getLogger() if len(LOGGER.handlers) > 0: # The Lambda environment pre-configures a handler logging to stderr. # If a handler is already configured, `.basicConfig` does not execute. # Thus we set the level directly. LOGGER.setLevel(logging.INFO) else: logging.basicConfig(level=logging.INFO) AWS_REGION_NAME = os.getenv('AWS_REGION_NAME', 'us-east-1') DOWNLOAD_URL_TTL = int(os.getenv('DOWNLOAD_URL_TTL', '3600')) DDB_TABLE_NAME = os.getenv('DDB_TABLE_NAME') EMAIL_FROM_ADDRESS = os.getenv('EMAIL_FROM_ADDRESS') def gen_html(elem): HTML_FORMAT = '''

Your Query Results can be downlodable

key value
query_id {query_id}
link {link}
''' html_doc = HTML_FORMAT.format(query_id=elem['query_id'], link=elem['link']) return html_doc def send_email(from_addr, to_addrs, subject, html_body): ses_client = boto3.client('ses', region_name=AWS_REGION_NAME) ret = ses_client.send_email(Destination={'ToAddresses': to_addrs}, Message={'Body': { 'Html': { 'Charset': 'UTF-8', 'Data': html_body } }, 'Subject': { 'Charset': 'UTF-8', 'Data': subject } }, Source=from_addr ) return ret def get_user_id_by_query_id(table, query_execution_id): dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION_NAME) ddb_table = dynamodb.Table(table) try: #TODO: should handle ProvisionedThroughputExceededException ddb_attributes = ddb_table.query( IndexName='query_id', KeyConditionExpression=Key('query_id').eq(query_execution_id) ) except botocore.exceptions.ClientError as ex: LOGGER.error(ex.response['Error']['Message']) #TODO: send alarm by sns raise ex else: record = {'query_id': query_execution_id} if 'Items' in ddb_attributes and len(ddb_attributes['Items']) == 1: record = dict(ddb_attributes['Items'][0]) return record def update_query_status(table, user_id, query_execution_id, query_state): dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION_NAME) ddb_table = dynamodb.Table(table) try: response = ddb_table.update_item( Key={'user_id': user_id}, UpdateExpression='SET query_status = :query_status', ConditionExpression=Attr('query_id').eq(query_execution_id), ExpressionAttributeValues={':query_status': query_state}, ReturnValues='UPDATED_NEW') except botocore.exceptions.ClientError as ex: if ex.response['Error']['Code'] == 'ConditionalCheckFailedException': LOGGER.info(ex.response['Error']['Message']) else: raise ex return response def get_athena_query_result_location(query_execution_id): athena_client = boto3.client('athena', region_name=AWS_REGION_NAME) response = athena_client.get_query_execution( QueryExecutionId=query_execution_id ) output_location = response['QueryExecution']['ResultConfiguration']['OutputLocation'] return output_location def create_presigned_url(bucket_name, object_name, expiration=3600): s3_client = boto3.client('s3', region_name=AWS_REGION_NAME) try: presigned_url = s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': object_name}, ExpiresIn=expiration) except botocore.exceptions.ClientError as ex: LOGGER.error(ex) return None return presigned_url def lambda_handler(event, context): LOGGER.debug(event) current_query_state = event['detail']['currentState'] if current_query_state == 'FAILED': raise RuntimeError('Athena Query is {}'.format(current_query_state)) if current_query_state != 'SUCCEEDED': #TODO: send alert by sns LOGGER.info('athena query state: %s' % current_query_state) return query_execution_id = event['detail']['queryExecutionId'] output_location = get_athena_query_result_location(query_execution_id) LOGGER.info(output_location) url_parse_result = urlparse(output_location, scheme='s3') bucket_name, object_name = url_parse_result.netloc, url_parse_result.path.lstrip('/') presigned_url = create_presigned_url(bucket_name, object_name, expiration=DOWNLOAD_URL_TTL) LOGGER.info('presigned_url: %s' % presigned_url) try: record = get_user_id_by_query_id(DDB_TABLE_NAME, query_execution_id) except Exception as ex: raise ex else: # send email to requester record['link'] = presigned_url user_id = record.get('user_id', EMAIL_FROM_ADDRESS) html = gen_html(record) subject = '''Athena Query Results is ready''' send_email(EMAIL_FROM_ADDRESS, [user_id], subject, html) try: update_query_status(DDB_TABLE_NAME, user_id, query_execution_id, current_query_state) except Exception as ex: LOGGER.error(ex) LOGGER.info("end") if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--region-name', default='us-east-1', help='aws region name: default=us-east-1') parser.add_argument('--query-execution-id', required=True, help='aws athena query execution id. ex: ce8826f3-6949-4405-81e5-392745da2c95') parser.add_argument('--work-group-name', default='primary', help='aws athena work group name: default=primary') parser.add_argument('--dynamodb-table', required=True, help='aws dynamodb table') parser.add_argument('--sender-email', required=True, help='sender email address') options = parser.parse_args() AWS_REGION_NAME = options.region_name DDB_TABLE_NAME = options.dynamodb_table EMAIL_FROM_ADDRESS = options.sender_email event_template = { "account": "111122223333", "detail": { "currentState": "SUCCEEDED", "previousState": "RUNNING", "queryExecutionId": options.query_execution_id, "sequenceNumber": "3", "statementType": "DML", "statementType": "DML", "versionId": "0", "workgroupName": options.work_group_name }, "detail-type": "Athena Query State Change", "id": "d9b0f8f8-1f67-6772-a390-01556bb3c09d", "region": options.region_name, "resources": [], "source": "aws.athena", "time": "2020-11-24T05:52:12Z", "version": "0" } for query_state in ['SUCCEEDED', 'FAILED', 'CANCELLED']: event = dict(event_template) event['detail']['currentState'] = query_state try: lambda_handler(event, {}) except Exception: import traceback traceback.print_exc()