// /*******************************************************************************
// * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// * Licensed under the Apache License, Version 2.0 (the "License"). You may not use
// * this file except in compliance with the License. A copy of the License is located at
// *
// * http://aws.amazon.com/apache2.0
// *
// * or in the "license" file accompanying this file.
// * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// * CONDITIONS OF ANY KIND, either express or implied. See the License for the
// * specific language governing permissions and limitations under the License.
// * *****************************************************************************
// * __ _ _ ___
// * ( )( \/\/ )/ __)
// * /__\ \ / \__ \
// * (_)(_) \/\/ (___/
// *
// * AWS SDK for .NET
// *
// */
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
namespace Amazon.Runtime.EventStreams.Internal
{
///
/// The contract for the .
///
/// An implementation of IEventStreamEvent (e.g. IS3Event).
/// An implementation of EventStreamException (e.g. S3EventStreamException).
[SuppressMessage("Microsoft.Naming", "CA1710", Justification = "IEventStreamCollection is not descriptive.")]
public interface IEnumerableEventStream : IEventStream, IEnumerable where T : IEventStreamEvent where TE : EventStreamException, new()
{
}
///
/// A subclass of that enables an enumerable interface for interacting with Events.
///
/// An implementation of IEventStreamEvent (e.g. IS3Event).
/// An implementation of EventStreamException (e.g. S3EventStreamException).
[SuppressMessage("Microsoft.Naming", "CA1710", Justification = "EventStreamCollection is not descriptive.")]
[SuppressMessage("Microsoft.Design", "CA1063", Justification = "IDisposable is a transient interface from IEventStream. Users need to be able to call Dispose.")]
public abstract class EnumerableEventStream : EventStream, IEnumerableEventStream where T : IEventStreamEvent where TE : EventStreamException, new()
{
private const string MutuallyExclusiveExceptionMessage = "Stream has already begun processing. Event-driven and Enumerable traversals of the stream are mutually exclusive. " +
"You can either use the event driven or enumerable interface, but not both.";
///
/// Flag if the stream was chosen to be enumerated.
///
protected bool IsEnumerated { get; set; }
///
/// A Stream of Events. Events can be retrieved from this stream by either
///
/// - attaching handlers to listen events, and then call StartProcessing or
/// - enumerating over the events.
///
///
/// These options should be treated as mutually exclusive.
///
protected EnumerableEventStream(Stream stream) : this(stream, null)
{
}
///
/// A Stream of Events. Events can be retrieved from this stream by either
///
/// - attaching handlers to listen events, and then call StartProcessing or
/// - enumerating over the events.
///
///
/// These options should be treated as mutually exclusive.
///
protected EnumerableEventStream(Stream stream, IEventStreamDecoder eventStreamDecoder) : base(stream, eventStreamDecoder)
{
}
///
/// Returns an enumerator that iterates through the collection.
///
/// An enumerator that can be used to iterate through the collection.
public IEnumerator GetEnumerator()
{
if (IsProcessing)
{
// If the queue has already begun processing, refuse to enumerate.
throw new InvalidOperationException(MutuallyExclusiveExceptionMessage);
}
// There could be more than 1 message created per decoder cycle.
var events = new Queue();
// Opting out of events - letting the enumeration handle everything.
IsEnumerated = true;
IsProcessing = true;
// Enumeration is just magic over the event driven mechanism.
EventReceived += (sender, args) => events.Enqueue(args.EventStreamEvent);
var buffer = new byte[BufferSize];
while (IsProcessing)
{
// If there are already events ready to be served, do not ask for more.
if (events.Count > 0)
{
var ev = events.Dequeue();
// Enumeration handles terminal events on behalf of the user.
if (ev is IEventStreamTerminalEvent)
{
IsProcessing = false;
Dispose();
}
yield return ev;
}
else
{
try
{
ReadFromStream(buffer);
}
catch (Exception ex)
{
IsProcessing = false;
Dispose();
// Wrap exceptions as needed to match event-driven behavior.
throw WrapException(ex);
}
}
}
}
///
/// Returns an enumerator that iterates through a collection.
///
/// An object that can be used to iterate through the collection.
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
///
/// Starts the background thread to start reading events from the network stream.
///
public override void StartProcessing()
{
// If they are/have enumerated, the event-driven mode should be disabled
if (IsEnumerated) throw new InvalidOperationException(MutuallyExclusiveExceptionMessage);
base.StartProcessing();
}
}
}