// /******************************************************************************* // * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // * Licensed under the Apache License, Version 2.0 (the "License"). You may not use // * this file except in compliance with the License. A copy of the License is located at // * // * http://aws.amazon.com/apache2.0 // * // * or in the "license" file accompanying this file. // * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // * CONDITIONS OF ANY KIND, either express or implied. See the License for the // * specific language governing permissions and limitations under the License. // * ***************************************************************************** // * __ _ _ ___ // * ( )( \/\/ )/ __) // * /__\ \ / \__ \ // * (_)(_) \/\/ (___/ // * // * AWS SDK for .NET // * // */ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; #if AWS_ASYNC_API using System.Threading.Tasks; #else using System.Threading; #endif namespace Amazon.Runtime.EventStreams.Internal { /// /// The contract for the . /// /// An implementation of IEventStreamEvent (e.g. IS3Event). /// An implementation of EventStreamException (e.g. S3EventStreamException). public interface IEventStream : IDisposable where T : IEventStreamEvent where TE : EventStreamException, new() { /// /// The size of the buffer for reading from the network stream. /// int BufferSize { get; set; } /// /// Fires when an event is received. /// event EventHandler> EventReceived; /// /// Fired when an exception or error is raised. /// event EventHandler> ExceptionReceived; /// /// Starts the background thread to start reading events from the network stream. /// void StartProcessing(); } /// /// The superclass for all EventStreams. It contains the common processing logic needed to retreive events from a network Stream. It /// also contains the mechanisms needed to have a background loop raise events. /// /// An implementation of IEventStreamEvent (e.g. IS3Event). /// An implementation of EventStreamException (e.g. S3EventStreamException). public abstract class EventStream : IEventStream where T : IEventStreamEvent where TE : EventStreamException, new() { /// /// "Unique" key for unknown event lookup. /// protected const string UnknownEventKey = "===UNKNOWN==="; /// /// Header key for message type. /// private const string HeaderMessageType = ":message-type"; /// /// Header key for event type. /// private const string HeaderEventType = ":event-type"; /// /// Header key for exception type. /// private const string HeaderExceptionType = ":exception-type"; /// /// Header key for error code. /// private const string HeaderErrorCode = ":error-code"; /// /// Header key for error message. /// private const string HeaderErrorMessage = ":error-message"; /// /// Value of when the message is an event. /// private const string EventHeaderMessageTypeValue = "event"; /// /// Value of when the message is an exception. /// private const string ExceptionHeaderMessageTypeValue = "exception"; /// /// Value of when the message is an error. /// private const string ErrorHeaderMessageTypeValue = "error"; private const string WrappedErrorMessage = "Error."; /// /// The size of the buffer for reading from the network stream. /// Default is 8192. /// public int BufferSize { get; set; } = 8192; /// /// The underlying stream to read events from. /// protected Stream NetworkStream { get; } /// /// Responsible for decoding events from sequences of bytes. /// protected IEventStreamDecoder Decoder { get; } /// /// Fires when an event is recieved. /// public virtual event EventHandler> EventReceived; /// /// Fired when an exception or error is raised. /// public virtual event EventHandler> ExceptionReceived; /// /// The mapping of event message to a generator function to construct the matching Event Stream event. /// [SuppressMessage("Microsoft.Design", "CA1006", Justification = "Mapping of string to generic generator function is clear to the reader. This property is not exposed to the end user.")] protected abstract IDictionary> EventMapping { get; } /// /// The mapping of event message to a generator function to construct the matching Event Stream exception. /// [SuppressMessage("Microsoft.Design", "CA1006", Justification = "Mapping of string to generic generator function is clear to the reader. This property is not exposed to the end user.")] protected abstract IDictionary> ExceptionMapping { get; } /// /// Whether the Stream is currently being processed. /// // This is true is StartProcessing is called, or if enumeration has started. protected abstract bool IsProcessing { get; set; } /// /// A Stream of Events. Events can be retrieved from this stream by attaching handlers to listen events, and then calling StartProcessing. /// protected EventStream(Stream stream) : this(stream, null) { } /// /// A Stream of Events. Events can be retrieved from this stream by attaching handlers to listen events, and then calling StartProcessing. /// protected EventStream(Stream stream, IEventStreamDecoder eventStreamDecoder) { NetworkStream = stream; Decoder = eventStreamDecoder ?? new EventStreamDecoder(); } /// /// Converts an EventStreamMessage to an event. /// /// The event stream message to be converted. /// The event protected T ConvertMessageToEvent(EventStreamMessage eventStreamMessage) { var eventStreamMessageHeaders = eventStreamMessage.Headers; string eventStreamMessageType; try { // Message type can be an event, an exception, or an error. This information is stored in the :message-type header. eventStreamMessageType = eventStreamMessageHeaders[HeaderMessageType].AsString(); } catch (KeyNotFoundException ex) { throw new EventStreamValidationException("Message type missing from event stream message.", ex); } switch (eventStreamMessageType) { case EventHeaderMessageTypeValue: string eventTypeKey; try { eventTypeKey = eventStreamMessageHeaders[HeaderEventType].AsString(); } catch (KeyNotFoundException ex) { throw new EventStreamValidationException("Event Type not defined for event.", ex); } try { return EventMapping[eventTypeKey](eventStreamMessage); } catch (KeyNotFoundException) { return EventMapping[UnknownEventKey](eventStreamMessage); } case ExceptionHeaderMessageTypeValue: string exceptionTypeKey; try { exceptionTypeKey = eventStreamMessageHeaders[HeaderExceptionType].AsString(); } catch (KeyNotFoundException ex) { throw new EventStreamValidationException("Exception Type not defined for exception.", ex); } try { throw ExceptionMapping[exceptionTypeKey](eventStreamMessage); } catch (KeyNotFoundException) { throw new UnknownEventStreamException(exceptionTypeKey); } case ErrorHeaderMessageTypeValue: int errorCode; try { errorCode = eventStreamMessageHeaders[HeaderErrorCode].AsInt32(); } catch (KeyNotFoundException ex) { throw new EventStreamValidationException("Error Code not defined for error.", ex); } // Error message is not required for errors. Errors do not have payloads. IEventStreamHeader errorMessage = null; var hasErrorMessage = eventStreamMessageHeaders.TryGetValue(HeaderErrorMessage, out errorMessage); throw new EventStreamErrorCodeException(errorCode, hasErrorMessage ? errorMessage.AsString() : string.Empty); default: // Unknown message type. Swallow the message to enable future message types without breaking existing clients. throw new UnknownEventStreamMessageTypeException(); } } /// /// Abstraction for cross-framework initiation of the background thread. /// protected void Process() { #if AWS_ASYNC_API // Task only exists in framework 4.5 and up, and Standard. Task.Run(() => ProcessLoop()); #else // ThreadPool only exists in 3.5 and below. These implementations do not have the Task library. ThreadPool.QueueUserWorkItem(ProcessLoop); #endif } /// /// The background thread main loop. It will constantly read from the network stream until IsProcessing is false, or an error occurs. /// /// This stub exists due to FXCop. /// private void ProcessLoop() { ProcessLoop(null); } /// /// The background thread main loop. It will constantly read from the network stream until IsProcessing is false, or an error occurs. /// /// Needed for 3.5 support. Not used. [SuppressMessage("Microsoft.Usage", "CA1801", Justification = "Needed for .NET 3.5 (ThreadPool.QueueUserWorkItem)")] private void ProcessLoop(object state) { var buffer = new byte[BufferSize]; try { while (IsProcessing) { ReadFromStream(buffer); } } // These exceptions are raised on the background thread. They are fired as events for visibility. catch (Exception ex) { IsProcessing = false; // surfaceException means what is surfaced to the user. For example, in S3Select, that would be a S3EventStreamException. var surfaceException = WrapException(ex); // Raise the exception as an event. ExceptionReceived?.Invoke(this, new EventStreamExceptionReceivedArgs(surfaceException)); } } /// /// Reads from the stream into the buffer. It then passes the buffer to the decoder, which raises an event for /// each message it decodes. /// /// The buffer to store the read bytes from the stream. protected void ReadFromStream(byte[] buffer) { var bytesRead = NetworkStream.Read(buffer, 0, buffer.Length); if (NetworkStream.CanRead) { if (bytesRead > 0) { // Decoder raises MessageReceived for every message it encounters. Decoder.ProcessData(buffer, 0, bytesRead); } } else { IsProcessing = false; } } /// /// Wraps exceptions in an outer exception so they can be passed to event handlers. If the Exception is already of a compatable type, /// the method returns what it was given. /// /// The exception to wrap. /// An exception of type TE protected TE WrapException(Exception ex) { var teEx = ex as TE; if (teEx != null) { return teEx; } // Types of exception that would not already be of type TE would be DecoderExceptions, EventStreamValidationExceptions, // and EventStreamErrorCodeExceptions. // We want to wrap the exception in the generic type so we can give it to the exception event handler. // Only one exception should fire, since the background thread dies on the exception. Therefore, the reflection // used here is not a preformance concern, and lets us abstract this method to the superclass. var exArgs = new object[] {WrappedErrorMessage, ex}; return (TE) Activator.CreateInstance(typeof(TE), exArgs); } /// /// Starts the background thread to start reading events from the network stream. /// public virtual void StartProcessing() { if (IsProcessing) return; IsProcessing = true; Process(); } #region Dispose Pattern private bool _disposed; /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Disposes the resources of this stream. /// /// Should dispose of unmanged resources. protected virtual void Dispose(bool disposing) { if (_disposed) return; if (disposing) { IsProcessing = false; NetworkStream?.Dispose(); Decoder?.Dispose(); } _disposed = true; } #endregion } }