using Amazon.SQS.Model;
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace Amazon.Lambda.TestTool.Runtime
{
///
/// This class will continually poll a SQS queue for more messages from a dead letter queue. If a message was read then the Lambda function
/// will be invoked within the test tool.
///
public class DlqMonitor
{
private readonly object LOG_LOCK = new object();
private CancellationTokenSource _cancelSource;
private IList _records = new List();
private readonly ILocalLambdaRuntime _runtime;
private readonly LambdaFunction _function;
private readonly string _profile;
private readonly string _region;
private readonly string _queueUrl;
public DlqMonitor(ILocalLambdaRuntime runtime, LambdaFunction function, string profile, string region, string queueUrl)
{
this._runtime = runtime;
this._function = function;
this._profile = profile;
this._region = region;
this._queueUrl = queueUrl;
}
public void Start()
{
this._cancelSource = new CancellationTokenSource();
_ = Loop(this._cancelSource.Token);
}
public void Stop()
{
this._cancelSource.Cancel();
}
private async Task Loop(CancellationToken token)
{
var aws = this._runtime.AWSService;
while (!token.IsCancellationRequested)
{
Message message = null;
LogRecord logRecord = null;
try
{
// Read a message from the queue using the ExternalCommands console application.
message = await aws.ReadMessageAsync(this._profile, this._region, this._queueUrl);
if (token.IsCancellationRequested)
{
return;
}
if (message == null)
{
// Since there are no messages, sleep a bit to wait for messages to come.
Thread.Sleep(1000);
continue;
}
// If a message was received execute the Lambda function within the test tool.
var request = new ExecutionRequest
{
AWSProfile = this._profile,
AWSRegion = this._region,
Function = this._function,
Payload = JsonSerializer.Serialize(new
{
Records = new List
{
message
}
})
};
var response = await this._runtime.ExecuteLambdaFunctionAsync(request);
// Capture the results to send back to the client application.
logRecord = new LogRecord
{
ProcessTime = DateTime.Now,
ReceiptHandle = message.ReceiptHandle,
Logs = response.Logs,
Error = response.Error
};
}
catch (Exception e)
{
logRecord = new LogRecord
{
ProcessTime = DateTime.Now,
Error = e.Message
};
Thread.Sleep(1000);
}
if (logRecord != null && message != null)
{
logRecord.Event = message.Body;
}
lock (LOG_LOCK)
{
this._records.Add(logRecord);
}
}
}
// Grabs the log messages since last requests and then resets the records collection.
public IList FetchNewLogs()
{
lock (LOG_LOCK)
{
var logsToSend = this._records;
this._records = new List();
return logsToSend;
}
}
public class LogRecord
{
public DateTime ProcessTime { get; set; }
public string Event { get; set; }
public string Logs { get; set; }
public string Error { get; set; }
public string ReceiptHandle { get; set; }
}
}
}