/******************************************************************************* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * Licensed under the Apache License, Version 2.0 (the "License"). You may not use * this file except in compliance with the License. A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. * ***************************************************************************** * __ _ _ ___ * ( )( \/\/ )/ __) * /__\ \ / \__ \ * (_)(_) \/\/ (___/ * * AWS SDK for .NET * */ using System; using System.Globalization; using System.Net; using Amazon.Runtime.Internal.Util; using ThirdParty.Ionic.Zlib; namespace Amazon.Runtime.EventStreams.Internal { #region streamingDecoder /// /// Event Arguments for EventStreamDecoder.MessageReceived. /// public class EventStreamMessageReceivedEventArgs : EventArgs { /// /// Received message. /// public EventStreamMessage Message { get; private set; } /// /// Additional object context /// public Object Context { get; private set; } /// /// Initialize this with message /// public EventStreamMessageReceivedEventArgs(EventStreamMessage message) { Message = message; } /// /// Initialize this with message and object conetext /// public EventStreamMessageReceivedEventArgs(EventStreamMessage message, Object context) { Message = message; Context = context; } } /// /// Exception thrown when a request is made of the Decoder, but the internal state /// machine is an invalid state. This is usually the result of an interanl exception /// being thrown during parsing of the network stream. /// #if !NETSTANDARD [Serializable] #endif public class EventStreamDecoderIllegalStateException : Exception { public EventStreamDecoderIllegalStateException(string message) : base(message) { } #if !NETSTANDARD /// /// Constructs a new instance of the EventStreamDecoderIllegalStateException class with serialized data. /// /// The that holds the serialized object data about the exception being thrown. /// The that contains contextual information about the source or destination. /// The parameter is null. /// The class name is null or is zero (0). protected EventStreamDecoderIllegalStateException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) : base(info, context) { } #endif } /// /// Contract for EventStreamDecoder /// public interface IEventStreamDecoder : IDisposable { /// /// Invoked anytime an EventStreamMessage is found. /// event EventHandler MessageReceived; /// /// Processes data starting at offset up to length. /// Invokes MessageRecieved for each message encountered. /// If an exception is thrown, this object is not safe for reuse. /// /// buffer to read /// offset in buffer to start reading. /// length of data. void ProcessData(byte[] data, int offset, int length); } /// /// Streaming decoder for listening for incoming EventStreamMessage datagrams. /// public class EventStreamDecoder : IEventStreamDecoder { /// /// Invoked anytime an EventStreamMessage is found. /// public event EventHandler MessageReceived; /// /// Object data (optional) that can be passed to the event handler for MessageReceived. /// public object MessageReceivedContext { get; set; } private delegate int ProcessRead(byte[] data, int offset, int length); private enum DecoderState { Start = 0, ReadPrelude, ProcessPrelude, ReadMessage, Error } private ProcessRead[] _stateFns = null; private DecoderState _state; private int _currentMessageLength; private int _amountBytesRead; private byte[] _workingMessage; private byte[] _workingBuffer; private CrcCalculatorStream _runningChecksumStream; /// /// Default constructor. Initializes internal _state machine. /// public EventStreamDecoder() { _workingBuffer = new byte[EventStreamMessage.PreludeLen]; _stateFns = new ProcessRead[] { Start, ReadPrelude, ProcessPrelude, ReadMessage, Error }; _state = DecoderState.Start; } #region StreamingDecoderStateMachine private int Start(byte[] data, int offset, int length) { _workingMessage = null; _amountBytesRead = 0; if (_runningChecksumStream != null) { _runningChecksumStream.Dispose(); } _runningChecksumStream = new CrcCalculatorStream(new NullStream()); _currentMessageLength = 0; _state = DecoderState.ReadPrelude; return 0; } private int ReadPrelude(byte[] data, int offset, int length) { var read = 0; if (_amountBytesRead < EventStreamMessage.PreludeLen) { read = Math.Min(length - offset, EventStreamMessage.PreludeLen - _amountBytesRead); Buffer.BlockCopy(data, offset, _workingBuffer, _amountBytesRead, read); _amountBytesRead += read; } if (_amountBytesRead == EventStreamMessage.PreludeLen) { _state = DecoderState.ProcessPrelude; } return read; } private int ProcessPrelude(byte[] data, int offset, int length) { /* this is absolutely redundant, but since the totalLength field will result in a potentially huge allocation, we want to fail fast before even attempting to continue if the totalLength field has been corrupted. */ _runningChecksumStream.Write(_workingBuffer, 0, EventStreamMessage.PreludeLen - EventStreamMessage.SizeOfInt32); var preludeChecksum = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(_workingBuffer, EventStreamMessage.PreludeLen - EventStreamMessage.SizeOfInt32)); if (preludeChecksum != _runningChecksumStream.Crc32) { _state = DecoderState.Error; throw new EventStreamChecksumFailureException( string.Format(CultureInfo.InvariantCulture, "Message Prelude Checksum failure. Expected {0} but was {1}", preludeChecksum, _runningChecksumStream.Crc32)); } _runningChecksumStream.Write(_workingBuffer, EventStreamMessage.PreludeLen - 4, EventStreamMessage.SizeOfInt32); _currentMessageLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(_workingBuffer, 0)); /* It's entirely possible to change this to not do this potentially large allocation but it complicates the API a bit and is most likely unnecessary. For now, just allocate the entire message buffer. It will be released after each message is processed. */ _workingMessage = new byte[_currentMessageLength]; Buffer.BlockCopy(_workingBuffer, 0, _workingMessage, 0, EventStreamMessage.PreludeLen); _state = DecoderState.ReadMessage; return 0; } private int ReadMessage(byte[] data, int offset, int length) { var read = 0; if (_amountBytesRead < _currentMessageLength) { read = Math.Min(length - offset, _currentMessageLength - _amountBytesRead); Buffer.BlockCopy(data, offset, _workingMessage, _amountBytesRead, read); _amountBytesRead += read; } if (_amountBytesRead == _currentMessageLength) { ProcessMessage(); } return read; } private void ProcessMessage() { try { var message = EventStreamMessage.FromBuffer(_workingMessage, 0, _currentMessageLength); MessageReceived?.Invoke(this, new EventStreamMessageReceivedEventArgs(message, MessageReceivedContext)); _state = DecoderState.Start; } catch(Exception) { _state = DecoderState.Error; throw; } } private int Error(byte[] data, int offset, int length) { throw new EventStreamDecoderIllegalStateException("Event stream decoder is in an error state. Create a new instance, and use a new stream to continue"); } #endregion /// /// Processes data starting at offset up to length. /// Invokes MessageRecieved for each message encountered. /// If an exception is thrown, this object is not safe for reuse. /// /// buffer to read /// offset in buffer to start reading. /// length of data. public void ProcessData(byte[] data, int offset, int length) { var available = length - offset; while (offset < available) { offset += _stateFns[(int)_state](data, offset, length); } } #region IDisposable Support private bool disposedValue = false; // To detect redundant calls protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { _runningChecksumStream.Dispose(); _workingMessage = null; } disposedValue = true; } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } #endregion } #endregion }