""" This file contains the functions to handle the 6MB lambda processing limit. The function checkLambdaProcessingLimit must be called after all records are transformed in order to check if the 6MB processing limit of a lambda function gets exceeded. For more information visit: https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html """ import base64 import boto3 def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts): failedRecords = [] codes = [] errMsg = '' # if put_record_batch throws an error, response['xx'] will error out, adding a check for a valid # response will prevent this response = None try: response = client.put_record_batch(DeliveryStreamName=streamName, Records=records) except Exception as e: failedRecords = records errMsg = str(e) # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results if not failedRecords and response and response['FailedPutCount'] > 0: for idx, res in enumerate(response['RequestResponses']): # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest if 'ErrorCode' not in res or not res['ErrorCode']: continue codes.append(res['ErrorCode']) failedRecords.append(records[idx]) errMsg = 'Individual error codes: ' + ','.join(codes) # if records failed to reingest try again. If max max attempts made reached raise a runtime error if len(failedRecords) > 0: if attemptsMade + 1 < maxAttempts: print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg)) putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts) else: raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg)) def createReingestionRecord(originalRecord): return {'data': base64.b64decode(originalRecord['data'])} def getReingestionRecord(reIngestionRecord): return {'Data': reIngestionRecord['data']} def checkLambdaProcessingLimit(records, event): #get meta data from the event streamARN = event['deliveryStreamArn'] region = streamARN.split(':')[3] streamName = streamARN.split('/')[1] projectedSize = 0 dataByRecordId = {rec['recordId']: createReingestionRecord(rec) for rec in event['records']} putRecordBatches = [] recordsToReingest = [] totalRecordsToBeReingested = 0 #Iterrate over all records for idx, rec in enumerate(records): if rec['result'] != 'Ok': continue projectedSize += len(rec['data']) + len(rec['recordId']) # 6000000 instead of 6291456 to leave ample headroom # https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html if projectedSize > 6000000: totalRecordsToBeReingested += 1 recordsToReingest.append( getReingestionRecord(dataByRecordId[rec['recordId']]) ) # If a record has a status of Ok or Dropped, Kinesis Data Firehose considers it successfully processed # This will tell kinesis that this records can be delete from the stream since we reingest the records in the recordsToReingest again records[idx]['result'] = 'Dropped' del(records[idx]['data']) # split out the record batches into multiple groups, 500 records at max per group # Each PutRecords request can support up to 500 records https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html if len(recordsToReingest) == 500: putRecordBatches.append(recordsToReingest) recordsToReingest = [] if len(recordsToReingest) > 0: # add the last batch putRecordBatches.append(recordsToReingest) # iterate and call putRecordBatch for each group recordsReingestedSoFar = 0 if len(putRecordBatches) > 0: client = boto3.client('firehose', region_name=region) for recordBatch in putRecordBatches: putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20) recordsReingestedSoFar += len(recordBatch) else: print('All files processed - no log records needed to be reingested') #processing for records is done, no reingestion return records