import email import os import boto3 import logging import uuid import re from email import policy from bs4 import BeautifulSoup workmail_message_flow = boto3.client('workmailmessageflow') s3 = boto3.client('s3') logger = logging.getLogger() logger.setLevel(logging.INFO) def get_env_var(name): """ Helper that returns value of the environment variable key if it exists, else logs and throws ValueError Parameters ---------- name: string, required Environment variable key Returns ------- string A string containing value of the environment variable Raises ------ ValueError: When environment variable was not set """ var = os.getenv(name) if not var: error_msg = f'{name} not set in environment. Please follow https://docs.aws.amazon.com/lambda/latest/dg/env_variables.html to set it.' logger.error(error_msg) raise ValueError(error_msg) return var # The following html templates controls color and structure of disclaimer and footer inserted into email body. disclaimer_html_template = """
{}
""" footer_html_template = """
{}
""" # these are optional try: disclaimer_text = get_env_var('DISCLAIMER') footer_text = get_env_var('FOOTER') subject_tag = get_env_var('SUBJECT_TAG') except Exception: pass def extract_domains(email_addresses): """ Returns a list of email domains extracted from list of email addresses Parameters ---------- email_addresses: list, required Email addresses are dict of type { "address" : "recipient1@domain.test" } Returns ------- list list of email domains """ domains = set() for address in email_addresses: domains.add(address['address'].lower().split('@')[1]) return domains def update_text_content(part, this_disclaimer_text, this_footer_text): """ Updates "text/plain" email body part with disclaimer and footer. Parameters ---------- parsed_email: email.message.Message, required EmailMessage representation the downloaded email this_disclaimer_text: string, required Templated disclaimer text to prepend to the body text this_footer_text: string, required Templated footer text to append to the body text Returns ------- email.message.Message EmailMessage representation the updated email """ text_content = part.get_content() if disclaimer_text: text_content = this_disclaimer_text + "\n\n" + text_content if footer_text: text_content = text_content + "\n\n" + this_footer_text return text_content def update_html_content(part, this_disclaimer_text, this_footer_text): """ Updates "text/html" email body part with disclaimer and footer. Parameters ---------- parsed_email: email.message.Message, required EmailMessage representation the downloaded email this_disclaimer_text: string, required Templated disclaimer text to prepend to the body html this_footer_text: string, required Templated footer text to append to the body html Returns ------- email.message.Message EmailMessage representation the updated email """ html_content = part.get_content() soup = BeautifulSoup(html_content, "html.parser") html_disclaimer = disclaimer_html_template.format(this_disclaimer_text) disclaimer_tag = BeautifulSoup(html_disclaimer, "html.parser") html_footer = footer_html_template.format(this_footer_text) footer_tag = BeautifulSoup(html_footer, "html.parser") tag_to_update = soup.find('body') if tag_to_update is None: tag_to_update = soup if disclaimer_text: tag_to_update.insert(0, disclaimer_tag) if footer_text: tag_to_update.append(footer_tag) return soup def update_email_body(parsed_email, key): """ Finds and updates the "text/html" and "text/plain" email body parts. Parameters ---------- parsed_email: email.message.Message, required EmailMessage representation the downloaded email key: string, required The object key that will be used for storing the message in S3 Returns ------- email.message.Message EmailMessage representation the updated email """ # template in the key for purposes of optional displaying to the recipient this_disclaimer_text = re.sub("{key}", key, disclaimer_text) this_footer_text = re.sub("{key}", key, footer_text) text_charset = None if parsed_email.is_multipart(): # Walk over message parts of this multipart email. for part in parsed_email.walk(): content_type = part.get_content_type() content_disposition = str(part.get_content_disposition()) if content_type == 'text/plain' and 'attachment' not in content_disposition: transfer_encoding = part['Content-Transfer-Encoding'] text_charset = part.get_content_charset() new_text_body = update_text_content(part, this_disclaimer_text, this_footer_text) part.set_content(new_text_body, "plain", charset=text_charset, cte=transfer_encoding) elif content_type == 'text/html' and 'attachment' not in content_disposition: transfer_encoding = part['Content-Transfer-Encoding'] html_charset = part.get_content_charset() new_html_body = update_html_content(part, this_disclaimer_text, this_footer_text) if new_html_body is not None: part.set_content(new_html_body.encode(html_charset), "text", "html", cte=transfer_encoding) part.set_charset(html_charset) else: # Its a plain email with text/plain body transfer_encoding = parsed_email['Content-Transfer-Encoding'] text_charset = parsed_email.get_content_charset() new_text_body = update_text_content(parsed_email, this_disclaimer_text, this_footer_text) parsed_email.set_content(new_text_body, "plain", charset=text_charset, cte=transfer_encoding) return parsed_email def download_email(message_id): """ This method downloads full email MIME content using GetRawMessageContent API and uses email.parser class for parsing it into Python email.message.EmailMessage class. Reference: https://docs.python.org/3.7/library/email.message.html#email.message.EmailMessage https://docs.python.org/3/library/email.parser.html Parameters ---------- message_id: string, required message_id of the email to download Returns ------- email.message.Message EmailMessage representation the downloaded email """ response = workmail_message_flow.get_raw_message_content(messageId=message_id) email_content = response['messageContent'].read() email_generation_policy = policy.SMTP.clone(refold_source='none') logger.info("Downloaded email from WorkMail successfully") return email.message_from_bytes(email_content, policy=email_generation_policy) def update_workmail(message_id, bucket, content, key): """ Uploads the updated message to an S3 bucket in your account and then updates it at WorkMail via PutRawMessageContent API. Reference: https://docs.aws.amazon.com/workmail/latest/adminguide/update-with-lambda.html Parameters ---------- message_id: string, required message_id of the email to download bucket: string, required bucket name storing the updated email content: email.message.Message, required EmailMessage representation the updated email Returns ------- None """ s3.put_object(Body=content.as_bytes(), Bucket=bucket, Key=key) s3_reference = { 'bucket': bucket, 'key': key } content = { 's3Reference': s3_reference } workmail_message_flow.put_raw_message_content(messageId=message_id, content=content) logger.info("Updated email sent to WorkMail successfully") def save_email(bucket, content, key): """ Uploads the original message and/or email metadata to an S3 bucket in your account """ s3.put_object(Body=content, Bucket=bucket, Key=key) s3_reference = { 'bucket': bucket, 'key': key } content = { 's3Reference': s3_reference } logger.info(f"Saved to s3://{bucket}/{key} successfully") def update_email(downloaded_email, email_subject, flow_direction, key): """ Updates the subject and body of the downloaded email. Parameters ---------- downloaded_email: email.message.Message, required EmailMessage representation the original downloaded email email_subject: string, required Subject of the email flow_direction: string, required Indicates direction of email flow. Value is either "INBOUND" or "OUTBOUND" key: string, required The object key that will be used for storing the message in S3 Returns ------- email.message.Message EmailMessage representation the updated email. """ updated_email = update_email_body(downloaded_email, key) # Only update subject of an incoming email if flow_direction == 'INBOUND' and subject_tag: new_subject = f"{subject_tag} {email_subject}" logger.info("Message subject modified") updated_email.replace_header('Subject', new_subject) # add the key to the headers for reference/forensics updated_email.add_header('WorkMailMessageKey', key) logger.info(f"Email updated successfully: {key}") return updated_email