# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 from __future__ import print_function import zlib import boto3 import base64 import json import aws_encryption_sdk from Crypto.Cipher import AES from aws_encryption_sdk import DefaultCryptoMaterialsManager from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import os import time print('Decrypting and loading to S3 via Firehose') key_id = os.environ['key_id'] stream_name = os.environ['stream_name'] region_name = os.environ['region_name'] cluster_id = os.environ['cluster_id'] firehose_stream = os.environ['firehose_stream'] #snstopic_arn = os.environ['snstopic_arn'] send_to_firehose = boto3.client('firehose') kms = boto3.client('kms', region_name=region_name) client = boto3.client('kinesis', region_name=region_name) sns = boto3.client('sns') def lambda_handler(event, context): for record in event['Records']: # Kinesis data is base64 encoded so decode here try: payload = base64.b64decode(record['kinesis']['data']) payload = json.loads(payload) decrypt_result = kms.decrypt(CiphertextBlob=base64.b64decode(payload['key']),EncryptionContext={"aws:rds:dbc-id":cluster_id}) decrypt(base64.b64decode(payload['databaseActivityEvents']), decrypt_result[u'Plaintext']) except Exception as e: if "JSONDecodeError" in str(e): print("JSON Decode Error while decrypting") continue else: print("Exception occured while decrypting: {}".format(str(e))) continue return 'Successfully processed {} records.'.format(len(event['Records'])) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, wrapping_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = wrapping_key def _get_raw_key(self, key_id): return self.wrapping_key def decrypt(decoded, plaintext): wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plaintext, wrapping_key_type=EncryptionKeyType.SYMMETRIC) my_key_provider = MyRawMasterKeyProvider(wrapping_key) my_key_provider.add_master_key("DataKey") with aws_encryption_sdk.stream( mode='d', source=decoded, materials_manager=DefaultCryptoMaterialsManager(master_key_provider=my_key_provider) ) as decryptor: for chunk in decryptor: d = zlib.decompressobj(16 + zlib.MAX_WBITS) decompressed_database_stream = d.decompress(chunk).decode() print(decompressed_database_stream) firehosedata = str.encode(decompressed_database_stream) + b'\n' #filtered_decompressed_database_stream=str.encode(decompressed_database_stream) + b'\n' send_to_firehose.put_record( DeliveryStreamName= firehose_stream, Record= { 'Data': firehosedata } ) ''' def filter_records(json_str): #json_txt = '{"type":"DatabaseActivityMonitoringRecord","clusterId":"cluster-6AMTVXN3EFRETMW4S4HX5CYMLA","instanceId":"db-U424M2YDPPXVCNXLRDQZDYGRV4","databaseActivityEventList":[{"type":"heartbeat"}]}' new_databaseActivityEventList =[] json_new=json.loads(json_str) for record in json_new['databaseActivityEventList']: if(record['type']!='heartbeat'): # if(record['class']=='READ' and record['objectName']!='pg_catalog.pg_stat_database'): #if(record['class']=='ROLE' and record['command']=='GRANT'): #time.sleep(.200) #a=record['commandText'] #print('Entering SNS') #response = sns.publish(TopicArn=snstopic_arn, Message='str(a)') #response = sns.publish(TopicArn=snstopic_arn, Message=record['dbuserusername']+ ' user executed '+record['commandtext']+' at' +record['logtime']) #new_databaseActivityEventList.append(record) #else: new_databaseActivityEventList.append(record) json_new['databaseActivityEventList']=new_databaseActivityEventList return json.dumps(json_new), len(json_new['databaseActivityEventList']) '''