/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using Amazon.Util;
using Amazon.Util.Internal;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
#if AWS_ASYNC_API
using System.Threading;
using System.Threading.Tasks;
#endif
namespace Amazon.Runtime.Internal.Util
{
///
/// Stream wrapper to append trailing headers, including an optional
/// rolling checksum for a request with an unsigned payload.
///
public class TrailingHeadersWrapperStream : WrapperStream
{
private const string STREAM_NEWLINE = "\r\n";
private const string EMPTY_CHUNK = "0\r\n";
private const int NEWLINE_LENGTH = 2; // additional length for any new lines
private const int EMPTY_CHUNK_LENGTH = 3; // additional length for an empty chunk "0CRLF"
private const int HEADER_ROW_PADDING_LENGTH = 3; // additional length for each row of a trailing header, 1 for ':' between the key and value, plus 2 for CRLF
private Stream _baseStream;
private HashAlgorithm _hashAlgorithm;
private IDictionary _trailingHeaders;
private CoreChecksumAlgorithm _checksumAlgorithm;
string _prefix;
string _suffix;
bool _haveFinishedPrefix;
bool _haveFinishedStream;
bool _haveFinishedSuffix;
int _prefixPosition;
int _suffixPosition;
///
/// Initiates a stream wrapper to append trailing headers to an unsigned payload
///
/// Stream to wrap
/// Trailing headers to append after the wrapped stream
public TrailingHeadersWrapperStream(Stream baseStream, IDictionary trailingHeaders) : base(baseStream)
{
if (trailingHeaders == null || trailingHeaders.Count == 0)
{
throw new AmazonClientException($"{nameof(TrailingHeadersWrapperStream)} was initialized without any trailing headers.");
}
_baseStream = baseStream;
_trailingHeaders = trailingHeaders;
_prefix = GenerateContentChunkLength();
}
///
/// Initiates a stream wrapper to append trailing headers to an unsigned payload,
/// with a trailing checksum
///
/// Stream to wrap
/// Header keys and values to append after the stream's conent
/// Algorithm to use to calculate the stream's checksum
public TrailingHeadersWrapperStream(
Stream baseStream,
IDictionary trailingHeaders,
CoreChecksumAlgorithm checksumAlgorithm) : this(baseStream, trailingHeaders)
{
if (checksumAlgorithm != CoreChecksumAlgorithm.NONE)
{
_checksumAlgorithm = checksumAlgorithm;
_hashAlgorithm = CryptoUtilFactory.GetChecksumInstance(checksumAlgorithm);
}
}
///
/// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
///
/// An array of bytes. When this method returns, the buffer contains the specified byte
/// array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source.
/// The zero-based byte offset in buffer at which to begin storing the data read from the current stream.
/// The maximum number of bytes to be read from the current stream.
/// The total number of bytes read into the buffer. This can be less than the number of bytes requested if
/// that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
public override int Read(byte[] buffer, int offset, int count)
{
#if AWS_ASYNC_API
return ReadInternal(buffer, offset, count, false, CancellationToken.None).GetAwaiter().GetResult();
#else
return ReadInternal(buffer, offset, count, false);
#endif
}
#if AWS_ASYNC_API
///
/// Asynchronously reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
///
/// An array of bytes. When this method returns, the buffer contains the specified byte
/// array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source.
/// The zero-based byte offset in buffer at which to begin storing the data read from the current stream.
/// The maximum number of bytes to be read from the current stream.
/// The token to monitor for cancellation requests.
/// The total number of bytes read into the buffer. This can be less than the number of bytes requested if
/// that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return await ReadInternal(buffer, offset, count, true, cancellationToken).ConfigureAwait(false);
}
#endif
#if AWS_ASYNC_API
private async Task ReadInternal(byte[] buffer, int offset, int count, bool useAsyncRead, CancellationToken cancellationToken)
#else
private int ReadInternal(byte[] buffer, int offset, int count, bool useAsyncRead)
#endif
{
var countRemainingForThisRead = count;
var countFromPrefix = 0;
var countFromStream = 0;
var countFromSuffix = 0;
if (countRemainingForThisRead > 0 && !_haveFinishedPrefix)
{
countFromPrefix = ReadFromPrefix(buffer, offset, countRemainingForThisRead);
offset += countFromPrefix;
countRemainingForThisRead -= countFromPrefix;
}
if (countRemainingForThisRead > 0 && !_haveFinishedStream)
{
// First read from the stream into a buffer here
byte[] thisBuffer = new byte[countRemainingForThisRead];
if (!useAsyncRead)
{
countFromStream = base.Read(thisBuffer, 0, countRemainingForThisRead);
}
else
{
#if AWS_ASYNC_API
countFromStream = await base.ReadAsync(thisBuffer, 0, countRemainingForThisRead, cancellationToken).ConfigureAwait(false);
#else
throw new AmazonClientException($"Attempted to call {nameof(TrailingHeadersWrapperStream)}.ReadAsync from an unsupported target platform.");
#endif
}
// Update rolling checksum for that content, and copy it to the output buffer
if (countFromStream != 0)
{
if (_hashAlgorithm != null)
{
_hashAlgorithm.TransformBlock(thisBuffer, 0, countFromStream, thisBuffer, 0);
}
Buffer.BlockCopy(thisBuffer, 0, buffer, offset, countFromStream);
}
else // finished the stream, so finalize checksum
{
if (_hashAlgorithm != null)
{
_hashAlgorithm.TransformFinalBlock(ArrayEx.Empty(), 0, 0);
}
_haveFinishedStream = true;
_suffix = GenerateTrailingHeaderChunk();
}
offset += countFromStream;
countRemainingForThisRead -= countFromStream;
}
if (countRemainingForThisRead > 0 && _haveFinishedStream && !_haveFinishedSuffix)
{
countFromSuffix = ReadFromSuffix(buffer, offset, countRemainingForThisRead);
}
return countFromPrefix + countFromStream + countFromSuffix;
}
///
/// Generates the prefix for the content chunk, which is
/// the content's length in hex followed by CRLF
///
/// Length of a stream chunk
private string GenerateContentChunkLength()
{
return $"{_baseStream.Length.ToString("X", CultureInfo.InvariantCulture)}{STREAM_NEWLINE}";
}
///
/// Copies the chunk length prefix to the output buffer
///
/// The buffer to write the data into
/// The byte offset in buffer at which to begin copying data
/// The maximum number of bytes to copy
/// Number of bytes copied
private int ReadFromPrefix(byte[] buffer, int offset, int countRemainingForThisRead)
{
var charsLeftInPrefix = _prefix.Length - _prefixPosition;
if (charsLeftInPrefix <= countRemainingForThisRead)
{
Encoding.Default.GetBytes(_prefix, _prefixPosition, charsLeftInPrefix, buffer, offset);
_haveFinishedPrefix = true;
return charsLeftInPrefix;
}
else // can only read a subset of the prefix (the rest of countRemainingForThisRead)
{
Encoding.Default.GetBytes(_prefix, _prefixPosition, countRemainingForThisRead, buffer, offset);
_prefixPosition += countRemainingForThisRead;
return countRemainingForThisRead;
}
}
///
/// Generates the trailing header content, assuming the rolling checksum is now finalized
///
/// Trailing headers as a single string
private string GenerateTrailingHeaderChunk()
{
var trailer = new StringBuilder();
// End the data chunk
trailer.Append(STREAM_NEWLINE);
// Append a chunk of size 0
trailer.Append(EMPTY_CHUNK);
// Append trailing headers, including special handling for the checksum.
// The order here must match the order of keys sent already in the X-Amz-Trailer header.
foreach (var kvp in _trailingHeaders.OrderBy(kvp => kvp.Key))
{
if (_checksumAlgorithm != CoreChecksumAlgorithm.NONE && ChecksumUtils.GetChecksumHeaderKey(_checksumAlgorithm) == kvp.Key)
{
// Use the calculated checksum, since it likely wasn't set in advance
trailer.Append($"{kvp.Key}:{Convert.ToBase64String(_hashAlgorithm.Hash)}{STREAM_NEWLINE}");
}
else
{
trailer.Append($"{kvp.Key}:{kvp.Value}{STREAM_NEWLINE}");
}
}
// Append a final trailing CRLF
trailer.Append(STREAM_NEWLINE);
return trailer.ToString();
}
///
/// Copies the trailing header suffix to the output buffer
///
/// The buffer to write the data into
/// The byte offset in buffer at which to begin copying data
/// The maximum number of bytes to copy
/// Number of bytes copied
private int ReadFromSuffix(byte[] buffer, int offset, int countRemainingForThisRead)
{
var charsLeftInSuffix = _suffix.Length - _suffixPosition;
// If the current suffix fits entirely within the current read buffer
if (charsLeftInSuffix <= countRemainingForThisRead)
{
Encoding.Default.GetBytes(_suffix, _suffixPosition, charsLeftInSuffix, buffer, offset);
_haveFinishedSuffix = true;
return charsLeftInSuffix;
}
else // else we can only read a subset of the prefix (the rest of countRemainingForThisRead)
{
Encoding.Default.GetBytes(_suffix, _suffixPosition, countRemainingForThisRead, buffer, offset);
_suffixPosition += countRemainingForThisRead;
return countRemainingForThisRead;
}
}
///
/// Gets the length in bytes of the stream
///
public override long Length => CalculateLength(_trailingHeaders, _checksumAlgorithm, _baseStream.Length);
///
/// Calculates the length in bytes of a TrailingChecksumWrapperStream initialized
/// with the given trailing headers and optional checksum
///
/// Dictionary of trailing headers
/// Trailing checksum
/// Length of the base stream in bytes
/// Length of a TrailingChecksumWrapperStream with given parameters, in bytes
public static long CalculateLength(IDictionary trailingHeaders, CoreChecksumAlgorithm checksumAlgorithm, long baseStreamLength)
{
var prefixLength = baseStreamLength.ToString("X", CultureInfo.InvariantCulture).Length;
var trailingHeaderLength = 0;
if (trailingHeaders != null)
{
foreach (var key in trailingHeaders.Keys)
{
if (checksumAlgorithm != CoreChecksumAlgorithm.NONE && ChecksumUtils.GetChecksumHeaderKey(checksumAlgorithm) == key)
{
trailingHeaderLength += key.Length +
CryptoUtilFactory.GetChecksumBase64Length(checksumAlgorithm) + HEADER_ROW_PADDING_LENGTH;
}
else
{
trailingHeaderLength += key.Length + trailingHeaders[key].Length + HEADER_ROW_PADDING_LENGTH;
}
}
}
return prefixLength +
NEWLINE_LENGTH +
baseStreamLength +
NEWLINE_LENGTH +
EMPTY_CHUNK_LENGTH +
trailingHeaderLength +
NEWLINE_LENGTH;
}
///
/// Gets a value indicating whether the current stream supports seeking
///
public override bool CanSeek => false;
///
/// Gets whether this stream has a length
///
internal override bool HasLength => (_baseStream != null) || (_trailingHeaders != null);
}
}