using System; using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using AWS.Lambda.Powertools.Logging; using AWS.Lambda.Powertools.Metrics; using AWS.Lambda.Powertools.Tracing; using SqsEventHandler.Infrastructure; namespace SqsEventHandler.Handlers; /// /// This class abstracts the AWS interaction between Amazon Simple Queue Service (SQS) & AWS Lambda Function. /// /// A generic SQS Message Model Type public abstract class SqsEventHandler where TMessage : class, new() { protected readonly IServiceProvider ServiceProvider; private List _batchItemFailures; private readonly SQSBatchResponse _sqsBatchResponse; protected SqsEventHandler() : this(Startup.ServiceProvider) { _sqsBatchResponse = new SQSBatchResponse(); } private SqsEventHandler(IServiceProvider serviceProvider) { ServiceProvider = serviceProvider; } /// /// This method is completely abstracted from AWS Infrastructure and is called for every message. /// /// SQS Message Object /// Lambda Context /// public abstract Task ProcessSqsMessage(TMessage message, ILambdaContext lambdaContext); /// /// This method is called for every Lambda invocation. This method takes in an SQS event object and creates /// an SQS Event adapter for processing the batch of SQS messages. /// /// SQS Event received by the function handler /// Lambda Context /// [Logging(LogEvent = true, ClearState = true)] [Metrics(Namespace = "SqsEventHandler", CaptureColdStart = true)] [Tracing(Namespace = "SqsEventHandler", SegmentName = "SqsEventHandler")] public async Task Handler(SQSEvent sqsEvent, ILambdaContext lambdaContext) { await ProcessEvent(sqsEvent, lambdaContext); // Set BatchItemFailures if any if (_batchItemFailures != null) { _sqsBatchResponse.BatchItemFailures = _batchItemFailures; } return _sqsBatchResponse; } /// /// This method abstracts the SQS Event for downstream processing. /// /// SQS Event received by the function handler /// Lambda Context [Tracing(SegmentName = "ProcessEvent")] private async Task ProcessEvent(SQSEvent sqsEvent, ILambdaContext lambdaContext) { var sqsMessages = sqsEvent.Records; var batchItemFailures = new List(); foreach (var sqsMessage in sqsMessages) { try { var message = JsonSerializer.Deserialize(sqsMessage.Body); // This abstract method is implemented by the concrete classes i.e. ProcessEmployeeFunction. await ProcessSqsMessage(message, lambdaContext); } catch (Exception ex) { Logger.LogError(ex); batchItemFailures.Add( new SQSBatchResponse.BatchItemFailure { ItemIdentifier = sqsMessage.MessageId } ); } } _batchItemFailures = batchItemFailures; } }