/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ using Amazon.Util; using Amazon.Util.Internal; using System; using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; using System.Security.Cryptography; using System.Text; #if AWS_ASYNC_API using System.Threading; using System.Threading.Tasks; #endif namespace Amazon.Runtime.Internal.Util { /// /// Stream wrapper to append trailing headers, including an optional /// rolling checksum for a request with an unsigned payload. /// public class TrailingHeadersWrapperStream : WrapperStream { private const string STREAM_NEWLINE = "\r\n"; private const string EMPTY_CHUNK = "0\r\n"; private const int NEWLINE_LENGTH = 2; // additional length for any new lines private const int EMPTY_CHUNK_LENGTH = 3; // additional length for an empty chunk "0CRLF" private const int HEADER_ROW_PADDING_LENGTH = 3; // additional length for each row of a trailing header, 1 for ':' between the key and value, plus 2 for CRLF private Stream _baseStream; private HashAlgorithm _hashAlgorithm; private IDictionary _trailingHeaders; private CoreChecksumAlgorithm _checksumAlgorithm; string _prefix; string _suffix; bool _haveFinishedPrefix; bool _haveFinishedStream; bool _haveFinishedSuffix; int _prefixPosition; int _suffixPosition; /// /// Initiates a stream wrapper to append trailing headers to an unsigned payload /// /// Stream to wrap /// Trailing headers to append after the wrapped stream public TrailingHeadersWrapperStream(Stream baseStream, IDictionary trailingHeaders) : base(baseStream) { if (trailingHeaders == null || trailingHeaders.Count == 0) { throw new AmazonClientException($"{nameof(TrailingHeadersWrapperStream)} was initialized without any trailing headers."); } _baseStream = baseStream; _trailingHeaders = trailingHeaders; _prefix = GenerateContentChunkLength(); } /// /// Initiates a stream wrapper to append trailing headers to an unsigned payload, /// with a trailing checksum /// /// Stream to wrap /// Header keys and values to append after the stream's conent /// Algorithm to use to calculate the stream's checksum public TrailingHeadersWrapperStream( Stream baseStream, IDictionary trailingHeaders, CoreChecksumAlgorithm checksumAlgorithm) : this(baseStream, trailingHeaders) { if (checksumAlgorithm != CoreChecksumAlgorithm.NONE) { _checksumAlgorithm = checksumAlgorithm; _hashAlgorithm = CryptoUtilFactory.GetChecksumInstance(checksumAlgorithm); } } /// /// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read. /// /// An array of bytes. When this method returns, the buffer contains the specified byte /// array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source. /// The zero-based byte offset in buffer at which to begin storing the data read from the current stream. /// The maximum number of bytes to be read from the current stream. /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if /// that many bytes are not currently available, or zero (0) if the end of the stream has been reached. public override int Read(byte[] buffer, int offset, int count) { #if AWS_ASYNC_API return ReadInternal(buffer, offset, count, false, CancellationToken.None).GetAwaiter().GetResult(); #else return ReadInternal(buffer, offset, count, false); #endif } #if AWS_ASYNC_API /// /// Asynchronously reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read. /// /// An array of bytes. When this method returns, the buffer contains the specified byte /// array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source. /// The zero-based byte offset in buffer at which to begin storing the data read from the current stream. /// The maximum number of bytes to be read from the current stream. /// The token to monitor for cancellation requests. /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if /// that many bytes are not currently available, or zero (0) if the end of the stream has been reached. public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return await ReadInternal(buffer, offset, count, true, cancellationToken).ConfigureAwait(false); } #endif #if AWS_ASYNC_API private async Task ReadInternal(byte[] buffer, int offset, int count, bool useAsyncRead, CancellationToken cancellationToken) #else private int ReadInternal(byte[] buffer, int offset, int count, bool useAsyncRead) #endif { var countRemainingForThisRead = count; var countFromPrefix = 0; var countFromStream = 0; var countFromSuffix = 0; if (countRemainingForThisRead > 0 && !_haveFinishedPrefix) { countFromPrefix = ReadFromPrefix(buffer, offset, countRemainingForThisRead); offset += countFromPrefix; countRemainingForThisRead -= countFromPrefix; } if (countRemainingForThisRead > 0 && !_haveFinishedStream) { // First read from the stream into a buffer here byte[] thisBuffer = new byte[countRemainingForThisRead]; if (!useAsyncRead) { countFromStream = base.Read(thisBuffer, 0, countRemainingForThisRead); } else { #if AWS_ASYNC_API countFromStream = await base.ReadAsync(thisBuffer, 0, countRemainingForThisRead, cancellationToken).ConfigureAwait(false); #else throw new AmazonClientException($"Attempted to call {nameof(TrailingHeadersWrapperStream)}.ReadAsync from an unsupported target platform."); #endif } // Update rolling checksum for that content, and copy it to the output buffer if (countFromStream != 0) { if (_hashAlgorithm != null) { _hashAlgorithm.TransformBlock(thisBuffer, 0, countFromStream, thisBuffer, 0); } Buffer.BlockCopy(thisBuffer, 0, buffer, offset, countFromStream); } else // finished the stream, so finalize checksum { if (_hashAlgorithm != null) { _hashAlgorithm.TransformFinalBlock(ArrayEx.Empty(), 0, 0); } _haveFinishedStream = true; _suffix = GenerateTrailingHeaderChunk(); } offset += countFromStream; countRemainingForThisRead -= countFromStream; } if (countRemainingForThisRead > 0 && _haveFinishedStream && !_haveFinishedSuffix) { countFromSuffix = ReadFromSuffix(buffer, offset, countRemainingForThisRead); } return countFromPrefix + countFromStream + countFromSuffix; } /// /// Generates the prefix for the content chunk, which is /// the content's length in hex followed by CRLF /// /// Length of a stream chunk private string GenerateContentChunkLength() { return $"{_baseStream.Length.ToString("X", CultureInfo.InvariantCulture)}{STREAM_NEWLINE}"; } /// /// Copies the chunk length prefix to the output buffer /// /// The buffer to write the data into /// The byte offset in buffer at which to begin copying data /// The maximum number of bytes to copy /// Number of bytes copied private int ReadFromPrefix(byte[] buffer, int offset, int countRemainingForThisRead) { var charsLeftInPrefix = _prefix.Length - _prefixPosition; if (charsLeftInPrefix <= countRemainingForThisRead) { Encoding.Default.GetBytes(_prefix, _prefixPosition, charsLeftInPrefix, buffer, offset); _haveFinishedPrefix = true; return charsLeftInPrefix; } else // can only read a subset of the prefix (the rest of countRemainingForThisRead) { Encoding.Default.GetBytes(_prefix, _prefixPosition, countRemainingForThisRead, buffer, offset); _prefixPosition += countRemainingForThisRead; return countRemainingForThisRead; } } /// /// Generates the trailing header content, assuming the rolling checksum is now finalized /// /// Trailing headers as a single string private string GenerateTrailingHeaderChunk() { var trailer = new StringBuilder(); // End the data chunk trailer.Append(STREAM_NEWLINE); // Append a chunk of size 0 trailer.Append(EMPTY_CHUNK); // Append trailing headers, including special handling for the checksum. // The order here must match the order of keys sent already in the X-Amz-Trailer header. foreach (var kvp in _trailingHeaders.OrderBy(kvp => kvp.Key)) { if (_checksumAlgorithm != CoreChecksumAlgorithm.NONE && ChecksumUtils.GetChecksumHeaderKey(_checksumAlgorithm) == kvp.Key) { // Use the calculated checksum, since it likely wasn't set in advance trailer.Append($"{kvp.Key}:{Convert.ToBase64String(_hashAlgorithm.Hash)}{STREAM_NEWLINE}"); } else { trailer.Append($"{kvp.Key}:{kvp.Value}{STREAM_NEWLINE}"); } } // Append a final trailing CRLF trailer.Append(STREAM_NEWLINE); return trailer.ToString(); } /// /// Copies the trailing header suffix to the output buffer /// /// The buffer to write the data into /// The byte offset in buffer at which to begin copying data /// The maximum number of bytes to copy /// Number of bytes copied private int ReadFromSuffix(byte[] buffer, int offset, int countRemainingForThisRead) { var charsLeftInSuffix = _suffix.Length - _suffixPosition; // If the current suffix fits entirely within the current read buffer if (charsLeftInSuffix <= countRemainingForThisRead) { Encoding.Default.GetBytes(_suffix, _suffixPosition, charsLeftInSuffix, buffer, offset); _haveFinishedSuffix = true; return charsLeftInSuffix; } else // else we can only read a subset of the prefix (the rest of countRemainingForThisRead) { Encoding.Default.GetBytes(_suffix, _suffixPosition, countRemainingForThisRead, buffer, offset); _suffixPosition += countRemainingForThisRead; return countRemainingForThisRead; } } /// /// Gets the length in bytes of the stream /// public override long Length => CalculateLength(_trailingHeaders, _checksumAlgorithm, _baseStream.Length); /// /// Calculates the length in bytes of a TrailingChecksumWrapperStream initialized /// with the given trailing headers and optional checksum /// /// Dictionary of trailing headers /// Trailing checksum /// Length of the base stream in bytes /// Length of a TrailingChecksumWrapperStream with given parameters, in bytes public static long CalculateLength(IDictionary trailingHeaders, CoreChecksumAlgorithm checksumAlgorithm, long baseStreamLength) { var prefixLength = baseStreamLength.ToString("X", CultureInfo.InvariantCulture).Length; var trailingHeaderLength = 0; if (trailingHeaders != null) { foreach (var key in trailingHeaders.Keys) { if (checksumAlgorithm != CoreChecksumAlgorithm.NONE && ChecksumUtils.GetChecksumHeaderKey(checksumAlgorithm) == key) { trailingHeaderLength += key.Length + CryptoUtilFactory.GetChecksumBase64Length(checksumAlgorithm) + HEADER_ROW_PADDING_LENGTH; } else { trailingHeaderLength += key.Length + trailingHeaders[key].Length + HEADER_ROW_PADDING_LENGTH; } } } return prefixLength + NEWLINE_LENGTH + baseStreamLength + NEWLINE_LENGTH + EMPTY_CHUNK_LENGTH + trailingHeaderLength + NEWLINE_LENGTH; } /// /// Gets a value indicating whether the current stream supports seeking /// public override bool CanSeek => false; /// /// Gets whether this stream has a length /// internal override bool HasLength => (_baseStream != null) || (_trailingHeaders != null); } }