// /******************************************************************************* // * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // * Licensed under the Apache License, Version 2.0 (the "License"). You may not use // * this file except in compliance with the License. A copy of the License is located at // * // * http://aws.amazon.com/apache2.0 // * // * or in the "license" file accompanying this file. // * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // * CONDITIONS OF ANY KIND, either express or implied. See the License for the // * specific language governing permissions and limitations under the License. // * ***************************************************************************** // * __ _ _ ___ // * ( )( \/\/ )/ __) // * /__\ \ / \__ \ // * (_)(_) \/\/ (___/ // * // * AWS SDK for .NET // * // */ using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; namespace Amazon.Runtime.EventStreams.Internal { /// /// The contract for the . /// /// An implementation of IEventStreamEvent (e.g. IS3Event). /// An implementation of EventStreamException (e.g. S3EventStreamException). [SuppressMessage("Microsoft.Naming", "CA1710", Justification = "IEventStreamCollection is not descriptive.")] public interface IEnumerableEventStream : IEventStream, IEnumerable where T : IEventStreamEvent where TE : EventStreamException, new() { } /// /// A subclass of that enables an enumerable interface for interacting with Events. /// /// An implementation of IEventStreamEvent (e.g. IS3Event). /// An implementation of EventStreamException (e.g. S3EventStreamException). [SuppressMessage("Microsoft.Naming", "CA1710", Justification = "EventStreamCollection is not descriptive.")] [SuppressMessage("Microsoft.Design", "CA1063", Justification = "IDisposable is a transient interface from IEventStream. Users need to be able to call Dispose.")] public abstract class EnumerableEventStream : EventStream, IEnumerableEventStream where T : IEventStreamEvent where TE : EventStreamException, new() { private const string MutuallyExclusiveExceptionMessage = "Stream has already begun processing. Event-driven and Enumerable traversals of the stream are mutually exclusive. " + "You can either use the event driven or enumerable interface, but not both."; /// /// Flag if the stream was chosen to be enumerated. /// protected bool IsEnumerated { get; set; } /// /// A Stream of Events. Events can be retrieved from this stream by either /// /// attaching handlers to listen events, and then call StartProcessing or /// enumerating over the events. /// /// /// These options should be treated as mutually exclusive. /// protected EnumerableEventStream(Stream stream) : this(stream, null) { } /// /// A Stream of Events. Events can be retrieved from this stream by either /// /// attaching handlers to listen events, and then call StartProcessing or /// enumerating over the events. /// /// /// These options should be treated as mutually exclusive. /// protected EnumerableEventStream(Stream stream, IEventStreamDecoder eventStreamDecoder) : base(stream, eventStreamDecoder) { } /// /// Returns an enumerator that iterates through the collection. /// /// An enumerator that can be used to iterate through the collection. public IEnumerator GetEnumerator() { if (IsProcessing) { // If the queue has already begun processing, refuse to enumerate. throw new InvalidOperationException(MutuallyExclusiveExceptionMessage); } // There could be more than 1 message created per decoder cycle. var events = new Queue(); // Opting out of events - letting the enumeration handle everything. IsEnumerated = true; IsProcessing = true; // Enumeration is just magic over the event driven mechanism. EventReceived += (sender, args) => events.Enqueue(args.EventStreamEvent); var buffer = new byte[BufferSize]; while (IsProcessing) { // If there are already events ready to be served, do not ask for more. if (events.Count > 0) { var ev = events.Dequeue(); // Enumeration handles terminal events on behalf of the user. if (ev is IEventStreamTerminalEvent) { IsProcessing = false; Dispose(); } yield return ev; } else { try { ReadFromStream(buffer); } catch (Exception ex) { IsProcessing = false; Dispose(); // Wrap exceptions as needed to match event-driven behavior. throw WrapException(ex); } } } } /// /// Returns an enumerator that iterates through a collection. /// /// An object that can be used to iterate through the collection. IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } /// /// Starts the background thread to start reading events from the network stream. /// public override void StartProcessing() { // If they are/have enumerated, the event-driven mode should be disabled if (IsEnumerated) throw new InvalidOperationException(MutuallyExclusiveExceptionMessage); base.StartProcessing(); } } }