using System; using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; using AWS.Lambda.Powertools.Metrics; using AWS.Lambda.Powertools.Tracing; using KinesisEventHandler.Infrastructure; using KinesisEventHandler.Models; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisEventHandler.Handlers; /// /// This class abstracts the AWS interaction between Amazon Kinesis Data Streams (Kinesis) & AWS Lambda Function. /// /// A generic Kinesis Record Model Type public abstract class KinesisEventHandler where TRecord : class, new() { protected readonly IServiceProvider ServiceProvider; private List _batchItemFailures; private readonly KinesisEventResponse _kinesisEventResponse; protected KinesisEventHandler() : this(Startup.ServiceProvider) { _kinesisEventResponse = new KinesisEventResponse(); } private KinesisEventHandler(IServiceProvider serviceProvider) { ServiceProvider = serviceProvider; } /// /// This method is completely abstracted from AWS Infrastructure and is called for every record. /// /// Kinesis Record Object /// Lambda Context /// public abstract Task ProcessKinesisRecord(TRecord record, ILambdaContext lambdaContext); /// /// This method is used to perform any validation on the incoming records. /// /// /// public abstract Task ValidateKinesisRecord(TRecord record); /// /// This method is called for every Lambda invocation. This method takes in a Kinesis event object and creates /// an Kinesis Event adapter for processing the shard of Kinesis records. /// /// Kinesis Event received by the function handler /// Lambda Context /// [Logging(LogEvent = true, ClearState = true)] [Metrics(Namespace = "KinesisEventHandler", CaptureColdStart = true)] [Tracing(Namespace = "KinesisEventHandler", SegmentName = "KinesisEventHandler")] public async Task Handler(KinesisEvent kinesisEvent, ILambdaContext lambdaContext) { Metrics.AddMetric("Employees", kinesisEvent.Records.Count, MetricUnit.Count); await ProcessEvent(kinesisEvent, lambdaContext); // Set BatchItemFailures if any if (_batchItemFailures != null) { _kinesisEventResponse.BatchItemFailures = _batchItemFailures; } return _kinesisEventResponse; } /// /// This method abstracts the Kinesis Event for downstream processing. /// /// Kinesis Event received by the function handler /// Lambda Context [Tracing(SegmentName = "ProcessEvent")] private async Task ProcessEvent(KinesisEvent kinesisEvent, ILambdaContext lambdaContext) { var kinesisEventRecords = kinesisEvent.Records; var batchItemFailures = new List(); foreach (var kinesisRecord in kinesisEventRecords) { try { var record = JsonSerializer.Deserialize(kinesisRecord.Kinesis.Data); // These abstract methods is implemented by the concrete classes i.e. ProcessEmployeeFunction. await ValidateKinesisRecord(record); await ProcessKinesisRecord(record, lambdaContext); } catch (Exception ex) { Logger.LogError(ex); batchItemFailures.Add( new KinesisEventResponse.BatchItemFailure { ItemIdentifier = kinesisRecord.Kinesis.SequenceNumber } ); } } _batchItemFailures = batchItemFailures; } }