using System; using System.Collections.Generic; using System.Collections.Concurrent; using System.IO; using System.Linq; using System.Reflection; #if CORECLR using System.Runtime.Loader; #endif using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Amazon.CloudWatchLogs; using Amazon.CloudWatchLogs.Model; using Amazon.Runtime; using Amazon.Runtime.CredentialManagement; namespace AWS.Logger.Core { /// /// Sends LogEvent messages to CloudWatch Logs /// public class AWSLoggerCore : IAWSLoggerCore { const int MAX_MESSAGE_SIZE_IN_BYTES = 256000; #region Private Members const string EMPTY_MESSAGE = "\t"; private ConcurrentQueue _pendingMessageQueue = new ConcurrentQueue(); private string _currentStreamName = null; private LogEventBatch _repo = new LogEventBatch(); private CancellationTokenSource _cancelStartSource; private SemaphoreSlim _flushTriggerEvent; private ManualResetEventSlim _flushCompletedEvent; private AWSLoggerConfig _config; private IAmazonCloudWatchLogs _client; private DateTime _maxBufferTimeStamp = new DateTime(); private string _logType; private int _requestCount = 5; /// /// Minimum interval in minutes between two error messages on in-memory buffer overflow. /// const double MAX_BUFFER_TIMEDIFF = 5; private readonly static Regex invalid_sequence_token_regex = new Regex(@"The given sequenceToken is invalid. The next expected sequenceToken is: (\d+)"); #endregion /// /// Alert details from CloudWatch Log Engine /// public sealed class LogLibraryEventArgs : EventArgs { internal LogLibraryEventArgs(Exception ex) { Exception = ex; } /// /// Exception Details returned /// public Exception Exception { get; } /// /// Service EndPoint Url involved /// public string ServiceUrl { get; internal set; } } /// /// Event Notification on alerts from the CloudWatch Log Engine /// public event EventHandler LogLibraryAlert; /// /// Construct an instance of AWSLoggerCore /// /// Configuration options for logging messages to AWS /// Logging Provider Name to include in UserAgentHeader public AWSLoggerCore(AWSLoggerConfig config, string logType) { _config = config; _logType = logType; var awsConfig = new AmazonCloudWatchLogsConfig(); if (!string.IsNullOrWhiteSpace(_config.ServiceUrl)) { var serviceUrl = _config.ServiceUrl.Trim(); awsConfig.ServiceURL = serviceUrl; if (serviceUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) { awsConfig.UseHttp = true; } } else { if (!string.IsNullOrEmpty(_config.Region)) { awsConfig.RegionEndpoint = Amazon.RegionEndpoint.GetBySystemName(_config.Region); } } var credentials = DetermineCredentials(config); _client = new AmazonCloudWatchLogsClient(credentials, awsConfig); ((AmazonCloudWatchLogsClient)this._client).BeforeRequestEvent += ServiceClientBeforeRequestEvent; ((AmazonCloudWatchLogsClient)this._client).ExceptionEvent += ServiceClienExceptionEvent; StartMonitor(); RegisterShutdownHook(); } #if CORECLR private void RegisterShutdownHook() { var currentAssembly = typeof(AWSLoggerCore).GetTypeInfo().Assembly; AssemblyLoadContext.GetLoadContext(currentAssembly).Unloading += this.OnAssemblyLoadContextUnloading; } internal void OnAssemblyLoadContextUnloading(AssemblyLoadContext obj) { this.Close(); } #else private void RegisterShutdownHook() { AppDomain.CurrentDomain.DomainUnload += ProcessExit; AppDomain.CurrentDomain.ProcessExit += ProcessExit; } private void ProcessExit(object sender, EventArgs e) { Close(); } #endif private static AWSCredentials DetermineCredentials(AWSLoggerConfig config) { if (config.Credentials != null) { return config.Credentials; } if (!string.IsNullOrEmpty(config.Profile)) { var credentials = LookupCredentialsFromProfileStore(config); if (credentials != null) return credentials; } return FallbackCredentialsFactory.GetCredentials(); } private static AWSCredentials LookupCredentialsFromProfileStore(AWSLoggerConfig config) { var credentialProfileStore = string.IsNullOrEmpty(config.ProfilesLocation) ? new CredentialProfileStoreChain() : new CredentialProfileStoreChain(config.ProfilesLocation); if (credentialProfileStore.TryGetAWSCredentials(config.Profile, out var credentials)) return credentials; else return null; } /// public void Close() { try { Flush(); _cancelStartSource.Cancel(); } catch (Exception ex) { LogLibraryServiceError(ex); } finally { LogLibraryAlert = null; } } /// public void Flush() { if (_cancelStartSource.IsCancellationRequested) return; if (!_pendingMessageQueue.IsEmpty || !_repo.IsEmpty) { bool lockTaken = false; try { // Ensure only one thread executes the flush operation System.Threading.Monitor.TryEnter(_flushTriggerEvent, ref lockTaken); if (lockTaken) { _flushCompletedEvent.Reset(); if (_flushTriggerEvent.CurrentCount == 0) { _flushTriggerEvent.Release(); // Signal Monitor-Task to start premature flush } else { // Means that the Background Task is busy, and not yet claimed the previous release (Maybe busy with credentials) var serviceUrl = GetServiceUrl(); LogLibraryServiceError(new TimeoutException($"Flush Pending - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); } } // Waiting for Monitor-Task to complete flush if (!_flushCompletedEvent.Wait(_config.FlushTimeout, _cancelStartSource.Token)) { var serviceUrl = GetServiceUrl(); LogLibraryServiceError(new TimeoutException($"Flush Timeout - ServiceURL={serviceUrl}, StreamName={_currentStreamName}, PendingMessages={_pendingMessageQueue.Count}, CurrentBatch={_repo.CurrentBatchMessageCount}"), serviceUrl); } } finally { if (lockTaken) System.Threading.Monitor.Exit(_flushTriggerEvent); } } } private string GetServiceUrl() { try { _client.Config.Validate(); return _client.Config.DetermineServiceURL() ?? "Undetermined ServiceURL"; } catch (Exception ex) { LogLibraryServiceError(ex, string.Empty); return "Unknown ServiceURL"; } } private void AddSingleMessage(string message) { if (_pendingMessageQueue.Count > _config.MaxQueuedMessages) { if (_maxBufferTimeStamp.AddMinutes(MAX_BUFFER_TIMEDIFF) < DateTime.UtcNow) { message = "The AWS Logger in-memory buffer has reached maximum capacity"; if (_maxBufferTimeStamp == DateTime.MinValue) { LogLibraryServiceError(new System.InvalidOperationException(message)); } _maxBufferTimeStamp = DateTime.UtcNow; _pendingMessageQueue.Enqueue(new InputLogEvent { Timestamp = DateTime.UtcNow, Message = message, }); } } else { _pendingMessageQueue.Enqueue(new InputLogEvent { Timestamp = DateTime.UtcNow, Message = message, }); } } /// /// A Concurrent Queue is used to store the messages from /// the logger /// /// Message to log. public void AddMessage(string rawMessage) { if (string.IsNullOrEmpty(rawMessage)) { rawMessage = EMPTY_MESSAGE; } // Only do the extra work of breaking up the message if the max unicode bytes exceeds the possible size. This is not // an exact measurement since the string is UTF8 but it gives us a chance to skip the extra computation for // typically small messages. if (Encoding.Unicode.GetMaxByteCount(rawMessage.Length) < MAX_MESSAGE_SIZE_IN_BYTES) { AddSingleMessage(rawMessage); } else { var messageParts = BreakupMessage(rawMessage); foreach (var message in messageParts) { AddSingleMessage(message); } } } /// /// Finalizer to ensure shutdown when forgetting to dispose /// ~AWSLoggerCore() { if (_cancelStartSource != null) { _cancelStartSource.Dispose(); } } /// /// Kicks off the Poller Thread to keep tabs on the PutLogEvent request and the /// Concurrent Queue /// public void StartMonitor() { _flushTriggerEvent = new SemaphoreSlim(0, 1); _flushCompletedEvent = new ManualResetEventSlim(false); _cancelStartSource = new CancellationTokenSource(); Task.Run(async () => { await Monitor(_cancelStartSource.Token); }); } /// /// Patrolling thread. keeps tab on the PutLogEvent request and the /// Concurrent Queue /// private async Task Monitor(CancellationToken token) { bool executeFlush = false; while (_currentStreamName == null && !token.IsCancellationRequested) { try { _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); } catch (OperationCanceledException ex) { if (!_pendingMessageQueue.IsEmpty) LogLibraryServiceError(ex); if (token.IsCancellationRequested) { _client.Dispose(); return; } } catch (Exception ex) { // We don't want to kill the main monitor loop. We will simply log the error, then continue. // If it is an OperationCancelledException, die LogLibraryServiceError(ex); await Task.Delay(Math.Max(100, DateTime.UtcNow.Second * 10), token); } } while (!token.IsCancellationRequested) { try { while (_pendingMessageQueue.TryDequeue(out var inputLogEvent)) { // See if new message will cause the current batch to violote the size constraint. // If so send the current batch now before adding more to the batch of messages to send. if (_repo.CurrentBatchMessageCount > 0 && _repo.IsSizeConstraintViolated(inputLogEvent.Message)) { await SendMessages(token).ConfigureAwait(false); } _repo.AddMessage(inputLogEvent); } if (_repo.ShouldSendRequest(_config.MaxQueuedMessages) || (executeFlush && !_repo.IsEmpty)) { await SendMessages(token).ConfigureAwait(false); } if (executeFlush) _flushCompletedEvent.Set(); executeFlush = await _flushTriggerEvent.WaitAsync(TimeSpan.FromMilliseconds(_config.MonitorSleepTime.TotalMilliseconds), token); } catch (OperationCanceledException ex) when (!token.IsCancellationRequested) { // Workaround to handle timeouts of .net httpclient // https://github.com/dotnet/corefx/issues/20296 LogLibraryServiceError(ex); } catch (OperationCanceledException ex) { if (!token.IsCancellationRequested || !_repo.IsEmpty || !_pendingMessageQueue.IsEmpty) LogLibraryServiceError(ex); _client.Dispose(); return; } catch (Exception ex) { // We don't want to kill the main monitor loop. We will simply log the error, then continue. // If it is an OperationCancelledException, die LogLibraryServiceError(ex); } } } /// /// Method to transmit the PutLogEvent Request /// /// /// private async Task SendMessages(CancellationToken token) { try { //Make sure the log events are in the right order. _repo._request.LogEvents.Sort((ev1, ev2) => ev1.Timestamp.CompareTo(ev2.Timestamp)); var response = await _client.PutLogEventsAsync(_repo._request, token).ConfigureAwait(false); _repo.Reset(response.NextSequenceToken); _requestCount = 5; } catch (InvalidSequenceTokenException ex) { //In case the NextSequenceToken is invalid for the last sent message, a new stream would be //created for the said application. LogLibraryServiceError(ex); if (_requestCount > 0) { _requestCount--; var regexResult = invalid_sequence_token_regex.Match(ex.Message); if (regexResult.Success) { _repo._request.SequenceToken = regexResult.Groups[1].Value; await SendMessages(token).ConfigureAwait(false); } } else { _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); } } catch (ResourceNotFoundException ex) { // The specified log stream does not exist. Refresh or create new stream. LogLibraryServiceError(ex); _currentStreamName = await LogEventTransmissionSetup(token).ConfigureAwait(false); } } /// /// Creates and Allocates resources for message trasnmission /// /// private async Task LogEventTransmissionSetup(CancellationToken token) { string serviceURL = GetServiceUrl(); if (!_config.DisableLogGroupCreation) { var logGroupResponse = await _client.DescribeLogGroupsAsync(new DescribeLogGroupsRequest { LogGroupNamePrefix = _config.LogGroup }, token).ConfigureAwait(false); if (!IsSuccessStatusCode(logGroupResponse)) { LogLibraryServiceError(new System.Net.WebException($"Lookup LogGroup {_config.LogGroup} returned status: {logGroupResponse.HttpStatusCode}"), serviceURL); } if (logGroupResponse.LogGroups.FirstOrDefault(x => string.Equals(x.LogGroupName, _config.LogGroup, StringComparison.Ordinal)) == null) { var createGroupResponse = await _client.CreateLogGroupAsync(new CreateLogGroupRequest { LogGroupName = _config.LogGroup }, token).ConfigureAwait(false); if (!IsSuccessStatusCode(createGroupResponse)) { LogLibraryServiceError(new System.Net.WebException($"Create LogGroup {_config.LogGroup} returned status: {createGroupResponse.HttpStatusCode}"), serviceURL); } } } var currentStreamName = GenerateStreamName(_config); var streamResponse = await _client.CreateLogStreamAsync(new CreateLogStreamRequest { LogGroupName = _config.LogGroup, LogStreamName = currentStreamName }, token).ConfigureAwait(false); if (!IsSuccessStatusCode(streamResponse)) { LogLibraryServiceError(new System.Net.WebException($"Create LogStream {currentStreamName} for LogGroup {_config.LogGroup} returned status: {streamResponse.HttpStatusCode}"), serviceURL); } _repo = new LogEventBatch(_config.LogGroup, currentStreamName, Convert.ToInt32(_config.BatchPushInterval.TotalSeconds), _config.BatchSizeInBytes); return currentStreamName; } /// /// Generate a logstream name /// /// logstream name that ALWAYS includes a unique date-based segment public static string GenerateStreamName(IAWSLoggerConfig config) { var streamName = new StringBuilder(); var prefix = config.LogStreamNamePrefix; if (!string.IsNullOrEmpty(prefix)) { streamName.Append(prefix); streamName.Append(" - "); } streamName.Append(DateTime.Now.ToString("yyyy/MM/ddTHH.mm.ss")); var suffix = config.LogStreamNameSuffix; if (!string.IsNullOrEmpty(suffix)) { streamName.Append(" - "); streamName.Append(suffix); } return streamName.ToString(); } private static bool IsSuccessStatusCode(AmazonWebServiceResponse serviceResponse) { return (int)serviceResponse.HttpStatusCode >= 200 && (int)serviceResponse.HttpStatusCode <= 299; } /// /// Break up the message into max parts of 256K. /// /// /// public static IList BreakupMessage(string message) { var parts = new List(); var singleCharArray = new char[1]; var encoding = Encoding.UTF8; int byteCount = 0; var sb = new StringBuilder(MAX_MESSAGE_SIZE_IN_BYTES); foreach (var c in message) { singleCharArray[0] = c; byteCount += encoding.GetByteCount(singleCharArray); sb.Append(c); // This could go a couple bytes if (byteCount > MAX_MESSAGE_SIZE_IN_BYTES) { parts.Add(sb.ToString()); sb.Clear(); byteCount = 0; } } if (sb.Length > 0) { parts.Add(sb.ToString()); } return parts; } /// /// Class to handle PutLogEvent request and associated parameters. /// Also has the requisite checks to determine when the object is ready for Transmission. /// private class LogEventBatch { public TimeSpan TimeIntervalBetweenPushes { get; private set; } public int MaxBatchSize { get; private set; } public bool ShouldSendRequest(int maxQueuedEvents) { if (_request.LogEvents.Count == 0) return false; if (_nextPushTime < DateTime.UtcNow) return true; if (maxQueuedEvents <= _request.LogEvents.Count) return true; return false; } int _totalMessageSize { get; set; } DateTime _nextPushTime; public PutLogEventsRequest _request = new PutLogEventsRequest(); public LogEventBatch(string logGroupName, string streamName, int timeIntervalBetweenPushes, int maxBatchSize) { _request.LogGroupName = logGroupName; _request.LogStreamName = streamName; TimeIntervalBetweenPushes = TimeSpan.FromSeconds(timeIntervalBetweenPushes); MaxBatchSize = maxBatchSize; Reset(null); } public LogEventBatch() { } public int CurrentBatchMessageCount { get { return this._request.LogEvents.Count; } } public bool IsEmpty => _request.LogEvents.Count == 0; public bool IsSizeConstraintViolated(string message) { Encoding unicode = Encoding.Unicode; int prospectiveLength = _totalMessageSize + unicode.GetMaxByteCount(message.Length); if (MaxBatchSize < prospectiveLength) return true; return false; } public void AddMessage(InputLogEvent ev) { Encoding unicode = Encoding.Unicode; _totalMessageSize += unicode.GetMaxByteCount(ev.Message.Length); _request.LogEvents.Add(ev); } public void Reset(string SeqToken) { _request.LogEvents.Clear(); _totalMessageSize = 0; _request.SequenceToken = SeqToken; _nextPushTime = DateTime.UtcNow.Add(TimeIntervalBetweenPushes); } } const string UserAgentHeader = "User-Agent"; void ServiceClientBeforeRequestEvent(object sender, RequestEventArgs e) { var args = e as Amazon.Runtime.WebServiceRequestEventArgs; if (args == null || !args.Headers.ContainsKey(UserAgentHeader)) return; args.Headers[UserAgentHeader] = args.Headers[UserAgentHeader] + " AWSLogger/" + _logType; } void ServiceClienExceptionEvent(object sender, ExceptionEventArgs e) { var eventArgs = e as WebServiceExceptionEventArgs; if (eventArgs?.Exception != null) LogLibraryServiceError(eventArgs?.Exception, eventArgs.Endpoint?.ToString()); else LogLibraryServiceError(new System.Net.WebException(e.GetType().ToString())); } private void LogLibraryServiceError(Exception ex, string serviceUrl = null) { LogLibraryAlert?.Invoke(this, new LogLibraryEventArgs(ex) { ServiceUrl = serviceUrl ?? GetServiceUrl() }); if (!string.IsNullOrEmpty(_config.LibraryLogFileName) && _config.LibraryLogErrors) { LogLibraryError(ex, _config.LibraryLogFileName); } } /// /// Write Exception details to the file specified with the filename /// public static void LogLibraryError(Exception ex, string LibraryLogFileName) { try { using (StreamWriter w = File.AppendText(LibraryLogFileName)) { w.WriteLine("Log Entry : "); w.WriteLine("{0}", DateTime.Now.ToString()); w.WriteLine(" :"); w.WriteLine(" :{0}", ex.ToString()); w.WriteLine("-------------------------------"); } } catch (Exception e) { Console.WriteLine("Exception caught when writing error log to file" + e.ToString()); } } } }