""" This function takes data (pushed from the kinesis firehose data stream to the lambda) which contains multiple cloud watch log records e.g. event = { "invocationId": '47d09251-89dd-48aa-b219-de8cbad328d6', "deliveryStreamArn": 'arn:aws:firehose:us-east-1:632514164718:deliverystream/kinesis-firehose-splunk-stream', "region": 'us-east-1', "records": [ { "recordId": '1', "approximateArrivalTimestamp": 1598261038326, "data": 'H4sIAN+/Q18C/12PTQuCQBRF/4q8dVaCg+lO0tzUSmmTIlO9ZGB0ZD6KEP97TiJB23fuuZc3QItK0QaLd48QOZDERVyf0jyPsxRWDohXh9KCMCTE2/lBQELfAi6aTArTW7bnwtwLSRnfJPighuvjQudkriXS9r+m/nm1US5SpV3PGspc1U2yXjPRHRjXKNXkXiBBpVlH7RmquTl9Yqe/dFhesTNDCWjJeVKndAlRCd56S0oYYazGD2lnJqv3AAAA' }, { "recordId": '2', "approximateArrivalTimestamp": 1598261038340, "data": 'H4sIAN+/Q18C/12PTQuCQBRF/4q8dVaCg+lO0tzUSmmTIlO9ZGB0ZD6KEP97TiJB23fuuZc3QItK0QaLd48QOZDERVyf0jyPsxRWDohXh9KCMCTE2/lBQELfAi6aTArTW7bnwtwLSRnfJPighuvjQudkriXS9r+m/nm1US5SpV3PGspc1U2yXjPRHRjXKNXkXiBBpVlH7RmquTl9Yqe/dFhesTNDCWjJeVKndAlRCd56S0oYYazGD2lnJqv3AAAA' } ] } 1) The handler function will transform each record by the processRecords function. 2) The processRecords function will: - unzip the data - parse the json data in the 'data' field of the record 3) Call the transformLogEvent function - set the 'sourcetype' to 'aws:cloudwatchlogs' - fill the 'event' field with the cloudwatch log message data - set the 'source' field (e.g CloudTrail/DefaultLogGroup/xxxxx) - return the transformed record which must contain: (see https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) + recordId (The transformed record must contain the same record ID. Any mismatch between the ID of the original record and the ID of the transformed record is treated as a data transformation failure.) + result (If a record has a status of Ok or Dropped, Kinesis Data Firehose considers it successfully processed.) + data (The transformed data payload, after base64-encoding.) 3) Due to the 6MB invocation payload limit of lambda function the handler needs to limit the size of the records which will be returned. The process follows the following steps: - First the handler will iterrate over all records and sums up their size (data + recordID) - If the limit of 6000000 (6MB) is reached the following records will be put in a recordsToReingest list - For recordsToReingest records the he 'result' field will be set 'dropped' and the 'data' deleted - Kinesis uses the PutRecords which can support up to 500 records which is why the records will be splited up in a putRecordBatches list - The recordsToReingest list will be put back to the kinesis fire hose stream (these will be picked up later again by a lambda which will process again 6MB) 4) All transformed records ('ok' and 'dropped') will be returned by the handler to tell kinesis we have successfully processed all the records we received. """ import base64 import json import boto3 import sys import gzip import io from lambdaLimit import checkLambdaProcessingLimit def transformLogEvent(log_event, source): return_event = {} return_event['sourcetype'] = 'aws:cloudwatchlogs'; return_event['source'] = source; return_event['event'] = log_event['message'] try: return_event['event'] = json.loads(log_event['message']) except Exception as e: print("Message body does not contain json") return json.dumps(return_event) + '\n' def processRecords(records): for r in records: data = base64.b64decode(r['data']) iodata = io.BytesIO(data) #unzip cloud watch log data with gzip.GzipFile(fileobj=iodata, mode='r') as f: data = json.loads(f.read()) recId = r['recordId'] # Sometimes CloudWatch Logs may emit Kinesis records with a "CONTROL_MESSAGE" type, # mainly for checking if the destination is reachable. # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html if data['messageType'] == 'CONTROL_MESSAGE': yield { 'result': 'Dropped', 'recordId': recId } elif data['messageType'] == 'DATA_MESSAGE': source = data['logGroup'] + ":" + data['logStream'] #make some custom changes in the cloud watch log file data = ''.join([transformLogEvent(e, source) for e in data['logEvents']]) data = base64.b64encode(data.encode('utf-8')).decode() yield { 'data': data, 'result': 'Ok', 'recordId': recId } else: yield { 'result': 'ProcessingFailed', 'recordId': recId } def handler(event, context): #process all records records = list(processRecords(event['records'])) #handling lambda processing limitation records = checkLambdaProcessingLimit(records, event) #processing for records is done return {"records": records}