/*******************************************************************************
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* *****************************************************************************
* __ _ _ ___
* ( )( \/\/ )/ __)
* /__\ \ / \__ \
* (_)(_) \/\/ (___/
*
* AWS SDK for .NET
*
*/
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Net;
using Amazon.Runtime.Internal.Util;
using ThirdParty.Ionic.Zlib;
namespace Amazon.Runtime.EventStreams
{
#region exceptions
#if !NETSTANDARD
[Serializable]
#endif
public class EventStreamParseException : Exception
{
public EventStreamParseException(string message) : base(message)
{
}
#if !NETSTANDARD
///
/// Constructs a new instance of the EventStreamParseException class with serialized data.
///
/// The that holds the serialized object data about the exception being thrown.
/// The that contains contextual information about the source or destination.
/// The parameter is null.
/// The class name is null or is zero (0).
protected EventStreamParseException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
: base(info, context)
{
}
#endif
}
#if !NETSTANDARD
[Serializable]
#endif
public class EventStreamChecksumFailureException : Exception
{
public EventStreamChecksumFailureException(string message) : base(message)
{
}
#if !NETSTANDARD
///
/// Constructs a new instance of the EventStreamChecksumFailureException class with serialized data.
///
/// The that holds the serialized object data about the exception being thrown.
/// The that contains contextual information about the source or destination.
/// The parameter is null.
/// The class name is null or is zero (0).
protected EventStreamChecksumFailureException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
: base(info, context)
{
}
#endif
}
#endregion
#region messageImplementation
public interface IEventStreamMessage
{
///
/// Headers for the message. Can be null.
///
Dictionary Headers { get; set; }
///
/// Payload for the message. Can be null.
///
[SuppressMessage("Microsoft.Performance", "CA1819", Justification = "This needs to be a byte[], and it makes the most sense as a property.")]
byte[] Payload { get; set; }
///
/// Converts a message into a byte buffer (usually for network transmission).
///
byte[] ToByteArray();
}
///
/// Message is a single datagram. The format is as follows:
/// [ total length (4) ] | [ headers_length (4)] | [ prelude crc(4)]
/// [ headers (v)]
/// [ payload (v)}
/// [ trailing crc ]
///
/// CRCs use the CRC32 algorithm.
///
public class EventStreamMessage : IEventStreamMessage
{
internal const int SizeOfInt32 = 4;
internal const int PreludeLen = SizeOfInt32 * 3;
internal const int TrailerLen = SizeOfInt32;
internal const int FramingSize = PreludeLen + TrailerLen;
///
/// Content type for EventStreams.
///
public const string ContentType = "vnd.amazon.eventstream";
///
/// Headers for the message. Can be null.
///
public Dictionary Headers { get; set; }
///
/// Payload for the message. Can be null.
///
public byte[] Payload { get; set; }
private EventStreamMessage() { }
///
/// Initialize a message with headers and a payload.
///
/// list of headers for the message, can be null.
/// payload for the message, can be null.
public EventStreamMessage(List headers, byte[] payload)
{
Headers = new Dictionary(headers.Count, StringComparer.Ordinal);
foreach (var header in headers)
{
Headers.Add(header.Name, header);
}
Payload = payload;
}
///
/// Builds a message from buffer.
///
/// buffer to read
/// offset to start reading
/// buffer length.
///
/// parsed instance of EventStreamMessage. Doesn't return null,
/// does throw if CRCs don't match.
///
public static EventStreamMessage FromBuffer(byte[] buffer, int offset, int length)
{
var currentOffset = offset;
//get the total length of the message
var totalLength = BitConverter.ToInt32(buffer, currentOffset);
//endianness conversion
totalLength = IPAddress.NetworkToHostOrder(totalLength);
currentOffset += SizeOfInt32;
//get the length of the headers block.
var headersLength = BitConverter.ToInt32(buffer, currentOffset);
//endianness conversion
headersLength = IPAddress.NetworkToHostOrder(headersLength);
currentOffset += SizeOfInt32;
//get the prelude crc
var preludeCrc = BitConverter.ToInt32(buffer, currentOffset);
//endianness conversion
preludeCrc = IPAddress.NetworkToHostOrder(preludeCrc);
var message = new EventStreamMessage();
message.Headers = new Dictionary(StringComparer.Ordinal);
using (var nullStream = new NullStream())
using (var runningChecksum = new CrcCalculatorStream(nullStream))
{
//write up to the prelude crc to the checksum stream
runningChecksum.Write(buffer, offset, currentOffset - offset);
//compare the current crc to the prelude crc and make sure they match.
if (preludeCrc != runningChecksum.Crc32)
{
throw new EventStreamChecksumFailureException(string.Format(CultureInfo.InvariantCulture, "Message Prelude Checksum failure. Expected {0} but was {1}", preludeCrc, runningChecksum.Crc32));
}
//if the data length passed isn't enough for the total length, that's an error condition.
if (totalLength != length)
{
throw new EventStreamChecksumFailureException(
string.Format(CultureInfo.InvariantCulture, "Message Total Length didn't match the passed in length. Expected {0} but was {1}", length, totalLength));
}
//now write the prelude crc to the checksum stream
runningChecksum.Write(buffer, currentOffset, SizeOfInt32);
currentOffset += SizeOfInt32;
//prelude length is total message, minus framing and headers size.
var payloadLength = totalLength - headersLength - FramingSize;
//if we have headers, then loop over each one and parse them out.
if (headersLength > 0)
{
int preOpOffset = currentOffset;
while (currentOffset - PreludeLen < headersLength)
{
EventStreamHeader header = EventStreamHeader.FromBuffer(buffer, currentOffset, ref currentOffset);
message.Headers.Add(header.Name, header);
}
//after parsing the header remember to write that data to the checksum stream
runningChecksum.Write(buffer, preOpOffset, currentOffset - preOpOffset);
}
// now we're on the payload
message.Payload = new byte[payloadLength];
Buffer.BlockCopy(buffer, currentOffset, message.Payload, 0, message.Payload.Length);
runningChecksum.Write(buffer, currentOffset, message.Payload.Length);
currentOffset += message.Payload.Length;
//after reading the payload, get the message crc and make sure it matches.
var trailingCrc = BitConverter.ToInt32(buffer, currentOffset);
//endianness conversion.
trailingCrc = IPAddress.NetworkToHostOrder(trailingCrc);
if (trailingCrc != runningChecksum.Crc32)
{
throw new EventStreamChecksumFailureException(
string.Format(CultureInfo.InvariantCulture, "Message Checksum failure. Expected {0} but was {1}", trailingCrc, runningChecksum.Crc32));
}
}
return message;
}
///
/// Converts a message into a byte buffer (usually for network transmission).
///
public byte[] ToByteArray()
{
int headersWireLength = 0;
//first we need to figure out how much space the headers will take up.
if (Headers != null)
{
foreach (var header in Headers)
{
headersWireLength += header.Value.GetWireSize();
}
}
var payloadLength = Payload?.Length ?? 0;
//total message length is the framing size + the payload size + the headers wire size.
var totalLength = headersWireLength + payloadLength + FramingSize;
var messageBuffer = new byte[totalLength];
//now write the total length and the headers length to the message. make sure to handle endianness conversions
var offset = 0;
Buffer.BlockCopy(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(totalLength)), 0,
messageBuffer, offset, SizeOfInt32);
offset += SizeOfInt32;
Buffer.BlockCopy(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(headersWireLength)), 0,
messageBuffer, offset, SizeOfInt32);
offset += SizeOfInt32;
using (var nullStream = new NullStream())
using (var runningChecksum = new CrcCalculatorStream(nullStream))
{
//write the total length and headers length to the checksum stream.
runningChecksum.Write(messageBuffer, 0, offset);
//take the current checksum and write it to the message.
Buffer.BlockCopy(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(runningChecksum.Crc32)), 0,
messageBuffer, offset, SizeOfInt32);
//now take the current checksum and write it to the checksum stream.
runningChecksum.Write(messageBuffer, offset, SizeOfInt32);
offset += SizeOfInt32;
//loop over the headers and write them out to the message.
if (Headers != null)
{
foreach (var header in Headers)
{
offset = header.Value.WriteToBuffer(messageBuffer, offset);
}
//make sure to add the header bytes to the checksum stream.
runningChecksum.Write(messageBuffer, PreludeLen, offset - PreludeLen);
}
//write the payload to the message.
if (Payload != null)
{
Buffer.BlockCopy(Payload, 0, messageBuffer, offset, Payload.Length);
//update the checksum
runningChecksum.Write(messageBuffer, offset, Payload.Length);
offset += Payload.Length;
}
//take the final checksum and add it to the end of the message.
Buffer.BlockCopy(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(runningChecksum.Crc32)), 0,
messageBuffer, offset, SizeOfInt32);
}
return messageBuffer;
}
}
#endregion
}