using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
using AWS.Lambda.Powertools.Metrics;
using AWS.Lambda.Powertools.Tracing;
using KinesisEventHandler.Infrastructure;
using KinesisEventHandler.Models;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisEventHandler.Handlers;
///
/// This class abstracts the AWS interaction between Amazon Kinesis Data Streams (Kinesis) & AWS Lambda Function.
///
/// A generic Kinesis Record Model Type
public abstract class KinesisEventHandler where TRecord : class, new()
{
protected readonly IServiceProvider ServiceProvider;
private List _batchItemFailures;
private readonly KinesisEventResponse _kinesisEventResponse;
protected KinesisEventHandler() : this(Startup.ServiceProvider)
{
_kinesisEventResponse = new KinesisEventResponse();
}
private KinesisEventHandler(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
///
/// This method is completely abstracted from AWS Infrastructure and is called for every record.
///
/// Kinesis Record Object
/// Lambda Context
///
public abstract Task ProcessKinesisRecord(TRecord record, ILambdaContext lambdaContext);
///
/// This method is used to perform any validation on the incoming records.
///
///
///
public abstract Task ValidateKinesisRecord(TRecord record);
///
/// This method is called for every Lambda invocation. This method takes in a Kinesis event object and creates
/// an Kinesis Event adapter for processing the shard of Kinesis records.
///
/// Kinesis Event received by the function handler
/// Lambda Context
///
[Logging(LogEvent = true, ClearState = true)]
[Metrics(Namespace = "KinesisEventHandler", CaptureColdStart = true)]
[Tracing(Namespace = "KinesisEventHandler", SegmentName = "KinesisEventHandler")]
public async Task Handler(KinesisEvent kinesisEvent, ILambdaContext lambdaContext)
{
Metrics.AddMetric("Employees", kinesisEvent.Records.Count, MetricUnit.Count);
await ProcessEvent(kinesisEvent, lambdaContext);
// Set BatchItemFailures if any
if (_batchItemFailures != null)
{
_kinesisEventResponse.BatchItemFailures = _batchItemFailures;
}
return _kinesisEventResponse;
}
///
/// This method abstracts the Kinesis Event for downstream processing.
///
/// Kinesis Event received by the function handler
/// Lambda Context
[Tracing(SegmentName = "ProcessEvent")]
private async Task ProcessEvent(KinesisEvent kinesisEvent, ILambdaContext lambdaContext)
{
var kinesisEventRecords = kinesisEvent.Records;
var batchItemFailures = new List();
foreach (var kinesisRecord in kinesisEventRecords)
{
try
{
var record = JsonSerializer.Deserialize(kinesisRecord.Kinesis.Data);
// These abstract methods is implemented by the concrete classes i.e. ProcessEmployeeFunction.
await ValidateKinesisRecord(record);
await ProcessKinesisRecord(record, lambdaContext);
}
catch (Exception ex)
{
Logger.LogError(ex);
batchItemFailures.Add(
new KinesisEventResponse.BatchItemFailure
{
ItemIdentifier = kinesisRecord.Kinesis.SequenceNumber
}
);
}
}
_batchItemFailures = batchItemFailures;
}
}