// /*******************************************************************************
// * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// * Licensed under the Apache License, Version 2.0 (the "License"). You may not use
// * this file except in compliance with the License. A copy of the License is located at
// *
// * http://aws.amazon.com/apache2.0
// *
// * or in the "license" file accompanying this file.
// * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// * CONDITIONS OF ANY KIND, either express or implied. See the License for the
// * specific language governing permissions and limitations under the License.
// * *****************************************************************************
// * __ _ _ ___
// * ( )( \/\/ )/ __)
// * /__\ \ / \__ \
// * (_)(_) \/\/ (___/
// *
// * AWS SDK for .NET
// *
// */
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
#if AWS_ASYNC_API
using System.Threading.Tasks;
#else
using System.Threading;
#endif
namespace Amazon.Runtime.EventStreams.Internal
{
///
/// The contract for the .
///
/// An implementation of IEventStreamEvent (e.g. IS3Event).
/// An implementation of EventStreamException (e.g. S3EventStreamException).
public interface IEventStream : IDisposable where T : IEventStreamEvent where TE : EventStreamException, new()
{
///
/// The size of the buffer for reading from the network stream.
///
int BufferSize { get; set; }
///
/// Fires when an event is received.
///
event EventHandler> EventReceived;
///
/// Fired when an exception or error is raised.
///
event EventHandler> ExceptionReceived;
///
/// Starts the background thread to start reading events from the network stream.
///
void StartProcessing();
}
///
/// The superclass for all EventStreams. It contains the common processing logic needed to retreive events from a network Stream. It
/// also contains the mechanisms needed to have a background loop raise events.
///
/// An implementation of IEventStreamEvent (e.g. IS3Event).
/// An implementation of EventStreamException (e.g. S3EventStreamException).
public abstract class EventStream : IEventStream where T : IEventStreamEvent where TE : EventStreamException, new()
{
///
/// "Unique" key for unknown event lookup.
///
protected const string UnknownEventKey = "===UNKNOWN===";
///
/// Header key for message type.
///
private const string HeaderMessageType = ":message-type";
///
/// Header key for event type.
///
private const string HeaderEventType = ":event-type";
///
/// Header key for exception type.
///
private const string HeaderExceptionType = ":exception-type";
///
/// Header key for error code.
///
private const string HeaderErrorCode = ":error-code";
///
/// Header key for error message.
///
private const string HeaderErrorMessage = ":error-message";
///
/// Value of when the message is an event.
///
private const string EventHeaderMessageTypeValue = "event";
///
/// Value of when the message is an exception.
///
private const string ExceptionHeaderMessageTypeValue = "exception";
///
/// Value of when the message is an error.
///
private const string ErrorHeaderMessageTypeValue = "error";
private const string WrappedErrorMessage = "Error.";
///
/// The size of the buffer for reading from the network stream.
/// Default is 8192.
///
public int BufferSize { get; set; } = 8192;
///
/// The underlying stream to read events from.
///
protected Stream NetworkStream { get; }
///
/// Responsible for decoding events from sequences of bytes.
///
protected IEventStreamDecoder Decoder { get; }
///
/// Fires when an event is recieved.
///
public virtual event EventHandler> EventReceived;
///
/// Fired when an exception or error is raised.
///
public virtual event EventHandler> ExceptionReceived;
///
/// The mapping of event message to a generator function to construct the matching Event Stream event.
///
[SuppressMessage("Microsoft.Design", "CA1006",
Justification = "Mapping of string to generic generator function is clear to the reader. This property is not exposed to the end user.")]
protected abstract IDictionary> EventMapping { get; }
///
/// The mapping of event message to a generator function to construct the matching Event Stream exception.
///
[SuppressMessage("Microsoft.Design", "CA1006",
Justification = "Mapping of string to generic generator function is clear to the reader. This property is not exposed to the end user.")]
protected abstract IDictionary> ExceptionMapping { get; }
///
/// Whether the Stream is currently being processed.
///
// This is true is StartProcessing is called, or if enumeration has started.
protected abstract bool IsProcessing { get; set; }
///
/// A Stream of Events. Events can be retrieved from this stream by attaching handlers to listen events, and then calling StartProcessing.
///
protected EventStream(Stream stream) : this(stream, null)
{
}
///
/// A Stream of Events. Events can be retrieved from this stream by attaching handlers to listen events, and then calling StartProcessing.
///
protected EventStream(Stream stream, IEventStreamDecoder eventStreamDecoder)
{
NetworkStream = stream;
Decoder = eventStreamDecoder ?? new EventStreamDecoder();
}
///
/// Converts an EventStreamMessage to an event.
///
/// The event stream message to be converted.
/// The event
protected T ConvertMessageToEvent(EventStreamMessage eventStreamMessage)
{
var eventStreamMessageHeaders = eventStreamMessage.Headers;
string eventStreamMessageType;
try
{
// Message type can be an event, an exception, or an error. This information is stored in the :message-type header.
eventStreamMessageType = eventStreamMessageHeaders[HeaderMessageType].AsString();
}
catch (KeyNotFoundException ex)
{
throw new EventStreamValidationException("Message type missing from event stream message.", ex);
}
switch (eventStreamMessageType)
{
case EventHeaderMessageTypeValue:
string eventTypeKey;
try
{
eventTypeKey = eventStreamMessageHeaders[HeaderEventType].AsString();
}
catch (KeyNotFoundException ex)
{
throw new EventStreamValidationException("Event Type not defined for event.", ex);
}
try
{
return EventMapping[eventTypeKey](eventStreamMessage);
}
catch (KeyNotFoundException)
{
return EventMapping[UnknownEventKey](eventStreamMessage);
}
case ExceptionHeaderMessageTypeValue:
string exceptionTypeKey;
try
{
exceptionTypeKey = eventStreamMessageHeaders[HeaderExceptionType].AsString();
}
catch (KeyNotFoundException ex)
{
throw new EventStreamValidationException("Exception Type not defined for exception.", ex);
}
try
{
throw ExceptionMapping[exceptionTypeKey](eventStreamMessage);
}
catch (KeyNotFoundException)
{
throw new UnknownEventStreamException(exceptionTypeKey);
}
case ErrorHeaderMessageTypeValue:
int errorCode;
try
{
errorCode = eventStreamMessageHeaders[HeaderErrorCode].AsInt32();
}
catch (KeyNotFoundException ex)
{
throw new EventStreamValidationException("Error Code not defined for error.", ex);
}
// Error message is not required for errors. Errors do not have payloads.
IEventStreamHeader errorMessage = null;
var hasErrorMessage = eventStreamMessageHeaders.TryGetValue(HeaderErrorMessage, out errorMessage);
throw new EventStreamErrorCodeException(errorCode, hasErrorMessage ? errorMessage.AsString() : string.Empty);
default:
// Unknown message type. Swallow the message to enable future message types without breaking existing clients.
throw new UnknownEventStreamMessageTypeException();
}
}
///
/// Abstraction for cross-framework initiation of the background thread.
///
protected void Process()
{
#if AWS_ASYNC_API
// Task only exists in framework 4.5 and up, and Standard.
Task.Run(() => ProcessLoop());
#else
// ThreadPool only exists in 3.5 and below. These implementations do not have the Task library.
ThreadPool.QueueUserWorkItem(ProcessLoop);
#endif
}
///
/// The background thread main loop. It will constantly read from the network stream until IsProcessing is false, or an error occurs.
///
/// This stub exists due to FXCop.
///
private void ProcessLoop()
{
ProcessLoop(null);
}
///
/// The background thread main loop. It will constantly read from the network stream until IsProcessing is false, or an error occurs.
///
/// Needed for 3.5 support. Not used.
[SuppressMessage("Microsoft.Usage", "CA1801", Justification = "Needed for .NET 3.5 (ThreadPool.QueueUserWorkItem)")]
private void ProcessLoop(object state)
{
var buffer = new byte[BufferSize];
try
{
while (IsProcessing)
{
ReadFromStream(buffer);
}
}
// These exceptions are raised on the background thread. They are fired as events for visibility.
catch (Exception ex)
{
IsProcessing = false;
// surfaceException means what is surfaced to the user. For example, in S3Select, that would be a S3EventStreamException.
var surfaceException = WrapException(ex);
// Raise the exception as an event.
ExceptionReceived?.Invoke(this,
new EventStreamExceptionReceivedArgs(surfaceException));
}
}
///
/// Reads from the stream into the buffer. It then passes the buffer to the decoder, which raises an event for
/// each message it decodes.
///
/// The buffer to store the read bytes from the stream.
protected void ReadFromStream(byte[] buffer)
{
var bytesRead = NetworkStream.Read(buffer, 0, buffer.Length);
if (NetworkStream.CanRead)
{
if (bytesRead > 0)
{
// Decoder raises MessageReceived for every message it encounters.
Decoder.ProcessData(buffer, 0, bytesRead);
}
}
else
{
IsProcessing = false;
}
}
///
/// Wraps exceptions in an outer exception so they can be passed to event handlers. If the Exception is already of a compatable type,
/// the method returns what it was given.
///
/// The exception to wrap.
/// An exception of type TE
protected TE WrapException(Exception ex)
{
var teEx = ex as TE;
if (teEx != null)
{
return teEx;
}
// Types of exception that would not already be of type TE would be DecoderExceptions, EventStreamValidationExceptions,
// and EventStreamErrorCodeExceptions.
// We want to wrap the exception in the generic type so we can give it to the exception event handler.
// Only one exception should fire, since the background thread dies on the exception. Therefore, the reflection
// used here is not a preformance concern, and lets us abstract this method to the superclass.
var exArgs = new object[] {WrappedErrorMessage, ex};
return (TE) Activator.CreateInstance(typeof(TE), exArgs);
}
///
/// Starts the background thread to start reading events from the network stream.
///
public virtual void StartProcessing()
{
if (IsProcessing) return;
IsProcessing = true;
Process();
}
#region Dispose Pattern
private bool _disposed;
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Disposes the resources of this stream.
///
/// Should dispose of unmanged resources.
protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
IsProcessing = false;
NetworkStream?.Dispose();
Decoder?.Dispose();
}
_disposed = true;
}
#endregion
}
}