// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 using System.Net; using System.Text.Json; namespace Corp.Demo.Extensions.Common; /// /// Lambda Extension API client /// internal class ExtensionClient : IExtensionClient { private readonly ExtensionConfig _config; private readonly HttpClient _httpClient; private readonly IExtensionEventProcessor _eventProcessor; private readonly ISerializationHandler _serializationHandler; public ExtensionClient(string extensionName, IExtensionEventProcessor eventProcessor, ISerializationHandler serializationHandler) { _eventProcessor = eventProcessor ?? throw new ApplicationException("Extension event processor has to be supplied"); _serializationHandler = serializationHandler ?? throw new ApplicationException( "Serialization handler has to be supplied"); _httpClient = new HttpClient(); // Set infinite timeout so that underlying connection is kept alive _httpClient.Timeout = Timeout.InfiniteTimeSpan; // Get Extension API service base URL from the environment variable var apiUri = new UriBuilder(Environment.GetEnvironmentVariable(Constants.LambdaRuntimeApiAddress)!).Uri; // Common path for all Extension API URLs var basePath = "2020-01-01/extension"; _config = new ExtensionConfig( extensionName ?? throw new ArgumentNullException(nameof(extensionName), "Extension name cannot be null"), new Uri(apiUri, $"{basePath}/register"), new Uri(apiUri, $"{basePath}/event/next"), new Uri(apiUri, $"{basePath}/init/error"), new Uri(apiUri, $"{basePath}/exit/error") ); } /// /// Extension registration and event loop handling /// public async Task ProcessEvents() { // Register extension with AWS Lambda Extension API to handle both INVOKE and SHUTDOWN events var registrationId = await RegisterExtensionAsync(); var decorator = new ExtensionEventProcessorDecorator(registrationId, _config, _eventProcessor, _httpClient); await decorator.ProcessInitEvent(registrationId); // loop till SHUTDOWN event is received var hasNext = true; while (hasNext) { // get the next event type and details var (type, payload) = await GetNextAsync(); switch (type) { case ExtensionEvent.INVOKE: await decorator.ProcessInvokeEvent(payload); break; case ExtensionEvent.SHUTDOWN: // terminate the loop, invoke onShutdown function if there is any and report any unhandled exceptions to AWS Extension API hasNext = false; await decorator.ProcessShutdownEvent(payload); break; default: throw new ApplicationException($"Unexpected event type: {type}"); } } } /// /// Register extension with Extension API /// /// Awaitable void /// This method is expected to be called just once when extension is being registered with the Extension API. private async Task RegisterExtensionAsync() { var request = new HttpRequestMessage() { Method = HttpMethod.Post, RequestUri = _config.RegisterUrl, Headers = { { HttpRequestHeader.Accept.ToString(), "application/json" }, { Constants.LambdaExtensionNameHeader, _config.ExtensionName } }, Content = new StringContent(_serializationHandler.SerializeExtensionEvents(), System.Text.Encoding.UTF8, "application/json") }; // POST call to Extension API using var response = await this._httpClient.SendAsync(request); // if POST call didn't succeed if (!response.IsSuccessStatusCode) { // log details Console.WriteLine($"[{_config.ExtensionName}] Error response received for registration request: {await response.Content.ReadAsStringAsync()}"); // throw an unhandled exception, so that extension is terminated by Lambda runtime response.EnsureSuccessStatusCode(); } var registrationId = response.Headers.GetValues(Constants.LambdaExtensionIdHeader).FirstOrDefault(); if (string.IsNullOrEmpty(registrationId)) { throw new ApplicationException("Extension API register call didn't return a valid identifier."); } // configure all HttpClient to send registration id header along with all subsequent requests _httpClient.DefaultRequestHeaders.Add(Constants.LambdaExtensionIdHeader, registrationId); return registrationId; } private async Task<(ExtensionEvent type, string payload)> GetNextAsync() { var contentBody = await _httpClient.GetStringAsync(_config.NextUrl); // use JsonDocument instead of JsonSerializer, since there is no need to construct the entire object using var doc = JsonDocument.Parse(contentBody); // extract eventType from the reply, convert it to ExtensionEvent enum and reply with the typed event type and event content details. return new (Enum.Parse(doc.RootElement.GetProperty("eventType").GetString() ?? string.Empty), contentBody); } // IDisposable implementation as per https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca1063 private bool _isDisposed; public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (_isDisposed) { return; } if (disposing) { // free managed resources _httpClient.Dispose(); } _isDisposed = true; } }