/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Globalization;
using System.IO;
using System.Text;
using Amazon.Util;
using Amazon.Util.Internal;
using Amazon.Runtime.Internal.Auth;
using System.Security.Cryptography;
using System.Collections.Generic;
using System.Linq;
#if AWS_ASYNC_API
using System.Threading;
using System.Threading.Tasks;
#endif
namespace Amazon.Runtime.Internal.Util
{
///
/// Stream wrapper that double-buffers from a wrapped stream and
/// returns the buffered content as a series of signed 'chunks'
/// for the AWS4 ('Signature V4') protocol or the asymmetric Sigv4 (Sigv4a) protocol.
///
public class ChunkedUploadWrapperStream : WrapperStream
{
public static readonly int DefaultChunkSize = 81920;
private const string STREAM_NEWLINE = "\r\n";
private const int NEWLINE_LENGTH = 2;
private const int HEADER_ROW_PADDING_LENGTH = 3; // additional length for each row of a trailing header, 1 for ':' between the key and value, plus 2 for CRLF
private const string CHUNK_STRING_TO_SIGN_PREFIX = "AWS4-HMAC-SHA256-PAYLOAD";
private const string CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
public const int V4_SIGNATURE_LENGTH = 64;
public const int V4A_SIGNATURE_LENGTH = 144;
private const string TRAILING_HEADER_SIGNATURE_KEY = "x-amz-trailer-signature";
private const string TRAILING_HEADER_STRING_TO_SIGN_PREFIX = "AWS4-HMAC-SHA256-TRAILER";
private byte[] _inputBuffer;
private readonly byte[] _outputBuffer;
private int _outputBufferPos = -1;
private int _outputBufferDataLen = -1;
private readonly int _wrappedStreamBufferSize;
private bool _wrappedStreamConsumed;
private CoreChecksumAlgorithm _trailingChecksum;
private HashAlgorithm _hashAlgorithm;
private IDictionary _trailingHeaders;
private string _trailingHeaderChunk;
private int _trailingHeaderPos = 0;
private bool _trailingHeadersConsumed = true;
// if this is set, we've exhausted the input stream and are now sending
// back to the client the final termination chunk, after which all Read
// operations will return 0 bytes.
private bool _outputBufferIsTerminatingChunk;
// The reading strategy used by FillInputBuffer against the wrapped stream.
// We prefer to read direct into our _inputBuffer but this isn't compatible
// with wrapped encryption streams, where we need to read into an interim
// buffer and then copy the encrypted content to _inputBuffer
private enum ReadStrategy
{
ReadDirect,
ReadAndCopy
}
private readonly ReadStrategy _readStrategy = ReadStrategy.ReadDirect;
///
/// Initializes a chunked upload stream
///
/// stream to wrap
/// Size of buffer used for reading from stream
/// SigV4 or SigV4a signing result for the request's headers
internal ChunkedUploadWrapperStream(Stream stream, int wrappedStreamBufferSize, AWSSigningResultBase headerSigningResult) : base(stream)
{
if (!(headerSigningResult is AWS4aSigningResult || headerSigningResult is AWS4SigningResult))
{
throw new AmazonClientException($"{nameof(ChunkedUploadWrapperStream)} was initialized without a SigV4 or SigV4a signing result.");
}
else if (headerSigningResult is AWS4aSigningResult)
{
Sigv4aSigner = new AWS4aSignerCRTWrapper();
}
HeaderSigningResult = headerSigningResult;
PreviousChunkSignature = headerSigningResult?.Signature;
_wrappedStreamBufferSize = wrappedStreamBufferSize;
_inputBuffer = new byte[DefaultChunkSize];
_outputBuffer = new byte[CalculateChunkHeaderLength(DefaultChunkSize, HeaderSigningResult is AWS4aSigningResult ? V4A_SIGNATURE_LENGTH : V4_SIGNATURE_LENGTH)]; // header+data
// if the wrapped stream implements encryption, switch to a read-and-copy
// strategy for filling the chunk buffer
var encryptionStream = SearchWrappedStream(s =>
{
var encryptUploadPartStream = s as EncryptUploadPartStream;
if (encryptUploadPartStream != null)
return true;
var encryptStream = s as EncryptStream;
return encryptStream != null;
});
if (encryptionStream != null)
_readStrategy = ReadStrategy.ReadAndCopy;
}
///
/// Initializes a chunked upload stream with one or more trailing headers,
/// which may include a trailing checksum header
///
/// Stream to wrap
/// Size of buffer used for reading from stream
/// SigV4 or SigV4a signing result for the request's headers
/// Algorithm to use to calculate the stream's checksum
/// Trailing headers to append after the wrapped stream
public ChunkedUploadWrapperStream(Stream stream,
int wrappedStreamBufferSize,
AWSSigningResultBase headerSigningResult,
CoreChecksumAlgorithm trailingChecksum,
IDictionary trailingHeaders)
: this(stream, wrappedStreamBufferSize, headerSigningResult)
{
if (trailingChecksum != CoreChecksumAlgorithm.NONE)
{
_trailingChecksum = trailingChecksum;
_hashAlgorithm = CryptoUtilFactory.GetChecksumInstance(trailingChecksum);
}
_trailingHeadersConsumed = false;
_trailingHeaders = trailingHeaders;
}
///
/// Reads some or all of the processed chunk to the consumer, constructing
/// and streaming a new chunk if more input data is available.
///
///
///
///
///
public override int Read(byte[] buffer, int offset, int count)
{
int bytesRead = 0;
// If we have more no output and it was the special termination chunk,
// write the trailing headers if they're set, then we're done
//
// Otherwise fill the input buffer with enough data
// for the next chunk (or with whatever is left) and construct
// the chunk in the output buffer ready for streaming
if (_outputBufferPos == -1)
{
if (_wrappedStreamConsumed && _outputBufferIsTerminatingChunk)
{
if (_trailingHeadersConsumed)
{
return 0;
}
else
{
return WriteTrailingHeaders(buffer, offset, count);
}
}
bytesRead = FillInputBuffer();
}
return AdjustBufferAfterReading(buffer, offset, count, bytesRead);
}
private int AdjustBufferAfterReading(byte[] buffer, int offset, int count, int bytesRead)
{
if (_outputBufferPos == -1)
{
ConstructOutputBufferChunk(bytesRead);
_outputBufferIsTerminatingChunk = (_wrappedStreamConsumed && bytesRead == 0);
}
var outputRemaining = _outputBufferDataLen - _outputBufferPos;
if (outputRemaining < count)
count = outputRemaining;
Buffer.BlockCopy(_outputBuffer, _outputBufferPos, buffer, offset, count);
_outputBufferPos += count;
if (_outputBufferPos >= _outputBufferDataLen)
_outputBufferPos = -1;
return count;
}
#if AWS_ASYNC_API
public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int bytesRead = 0;
// If we have more no output and it was the special termination chunk,
// write the trailing headers if they're set, then we're done
//
// Otherwise fill the input buffer with enough data
// for the next chunk (or with whatever is left) and construct
// the chunk in the output buffer ready for streaming
if (_outputBufferPos == -1)
{
if (_wrappedStreamConsumed && _outputBufferIsTerminatingChunk)
{
if (_trailingHeadersConsumed)
{
return 0;
}
else
{
return WriteTrailingHeaders(buffer, offset, count);
}
}
bytesRead = await FillInputBufferAsync(cancellationToken).ConfigureAwait(false);
}
return AdjustBufferAfterReading(buffer, offset, count, bytesRead);
}
///
/// Attempt to read sufficient data for a whole chunk from the wrapped stream,
/// returning the number of bytes successfully read to be processed into a chunk
///
private async Task FillInputBufferAsync(CancellationToken cancellationToken)
{
if (_wrappedStreamConsumed)
return 0;
var inputBufferPos = 0;
if (_readStrategy == ReadStrategy.ReadDirect)
{
while (inputBufferPos < _inputBuffer.Length && !_wrappedStreamConsumed)
{
// chunk buffer size may not align exactly with underlying buffer size
var chunkBufferRemaining = _inputBuffer.Length - inputBufferPos;
if (chunkBufferRemaining > _wrappedStreamBufferSize)
chunkBufferRemaining = _wrappedStreamBufferSize;
var bytesRead = await BaseStream.ReadAsync(_inputBuffer, inputBufferPos, chunkBufferRemaining, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
_wrappedStreamConsumed = true;
else
inputBufferPos += bytesRead;
}
}
else
{
var readBuffer = new byte[_wrappedStreamBufferSize];
while (inputBufferPos < _inputBuffer.Length && !_wrappedStreamConsumed)
{
var bytesRead = await BaseStream.ReadAsync(readBuffer, 0, _wrappedStreamBufferSize, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
_wrappedStreamConsumed = true;
else
{
Buffer.BlockCopy(readBuffer, 0, _inputBuffer, inputBufferPos, bytesRead);
inputBufferPos += bytesRead;
}
}
}
return inputBufferPos;
}
#endif
///
/// Results of the header-signing portion of the request when using SigV4 signing
///
private AWSSigningResultBase HeaderSigningResult { get; set; }
///
/// SigV4a signer
///
private AWS4aSignerCRTWrapper Sigv4aSigner { get; set; }
///
/// Computed signature of the chunk prior to the one in-flight in hex,
/// for either SigV4 or SigV4a
///
private string PreviousChunkSignature { get; set; }
///
/// Computes the derived signature for a chunk of data of given length in the input buffer,
/// placing a formatted chunk with headers, signature and data into the output buffer
/// ready for streaming back to the consumer.
///
///
private void ConstructOutputBufferChunk(int dataLen)
{
// if the input wasn't sufficient to fill the buffer, size it
// down to make the subseqent hashing/computations easier since
// they don't take any length arguments
if (dataLen > 0 && dataLen < _inputBuffer.Length)
{
var temp = new byte[dataLen];
Buffer.BlockCopy(_inputBuffer, 0, temp, 0, dataLen);
_inputBuffer = temp;
}
var isFinalDataChunk = dataLen == 0;
var chunkHeader = new StringBuilder();
// variable-length size of the embedded chunk data in hex
chunkHeader.Append(dataLen.ToString("X", CultureInfo.InvariantCulture));
string chunkSignature = "";
if (HeaderSigningResult is AWS4aSigningResult v4aHeaderSigningResult)
{
if (isFinalDataChunk) // _inputBuffer still contains previous chunk, but this is the final 0 content chunk so sign null
{
chunkSignature = Sigv4aSigner.SignChunk(null, PreviousChunkSignature, v4aHeaderSigningResult);
}
else
{
chunkSignature = Sigv4aSigner.SignChunk(new MemoryStream(_inputBuffer), PreviousChunkSignature, v4aHeaderSigningResult);
}
}
else if (HeaderSigningResult is AWS4SigningResult v4HeaderSingingResult) // SigV4
{
var chunkStringToSign = BuildChunkedStringToSign(CHUNK_STRING_TO_SIGN_PREFIX, v4HeaderSingingResult.ISO8601DateTime,
v4HeaderSingingResult.Scope, PreviousChunkSignature, dataLen, _inputBuffer);
chunkSignature = AWSSDKUtils.ToHex(AWS4Signer.SignBlob(v4HeaderSingingResult.GetSigningKey(), chunkStringToSign), true);
}
// For Sigv4a the chunk signature must be padded when being appended to the chunk metadata,
// but not when being used as the input for the next chunk
PreviousChunkSignature = chunkSignature;
if (HeaderSigningResult is AWS4aSigningResult)
{
chunkHeader.Append(CHUNK_SIGNATURE_HEADER + chunkSignature.PadRight(V4A_SIGNATURE_LENGTH, '*'));
}
else // SigV4
{
chunkHeader.Append(CHUNK_SIGNATURE_HEADER + chunkSignature);
}
// If we're sending a trailing checksum, update the rolling checksum with this chunk's raw data
if (_hashAlgorithm != null)
{
_hashAlgorithm.TransformBlock(_inputBuffer, 0, dataLen, _inputBuffer, 0);
}
chunkHeader.Append(STREAM_NEWLINE);
try
{
var header = Encoding.UTF8.GetBytes(chunkHeader.ToString());
var trailer = ArrayEx.Empty();
// Append a trailing CRLF unless this is the final data chunk and there are trailing headers
if (!(isFinalDataChunk && _trailingHeaders?.Count > 0))
{
trailer = Encoding.UTF8.GetBytes(STREAM_NEWLINE);
}
var writePos = 0;
Buffer.BlockCopy(header, 0, _outputBuffer, writePos, header.Length);
writePos += header.Length;
if (dataLen > 0)
{
Buffer.BlockCopy(_inputBuffer, 0, _outputBuffer, writePos, dataLen);
writePos += dataLen;
}
Buffer.BlockCopy(trailer, 0, _outputBuffer, writePos, trailer.Length);
_outputBufferPos = 0;
_outputBufferDataLen = header.Length + dataLen + trailer.Length;
}
catch (Exception e)
{
throw new AmazonClientException("Unable to sign the chunked data. " + e.Message, e);
}
}
///
/// Constructs the signed trailing headers, optionally including
/// the selected checksum for this stream's data. For example:
/// trailing-header-A:value CRLF
/// trailing-header-B:value CRLF
/// x-amz-trailer-signature:signature_value CRLF
/// CRLF
///
/// Stream chunk containing the trailing headers and their signature
private string ConstructSignedTrailersChunk()
{
// If the trailing headers included a trailing checksum, set the hash value
if (_hashAlgorithm != null)
{
_hashAlgorithm.TransformFinalBlock(ArrayEx.Empty(), 0, 0);
_trailingHeaders[ChecksumUtils.GetChecksumHeaderKey(_trailingChecksum)] = Convert.ToBase64String(_hashAlgorithm.Hash);
}
string chunkSignature;
if (HeaderSigningResult is AWS4SigningResult)
{
var sortedTrailingHeaders = AWS4Signer.SortAndPruneHeaders(_trailingHeaders);
var canonicalizedTrailingHeaders = AWS4Signer.CanonicalizeHeaders(sortedTrailingHeaders);
var chunkStringToSign =
TRAILING_HEADER_STRING_TO_SIGN_PREFIX + "\n" +
HeaderSigningResult.ISO8601DateTime + "\n" +
HeaderSigningResult.Scope + "\n" +
PreviousChunkSignature + "\n" +
AWSSDKUtils.ToHex(AWS4Signer.ComputeHash(canonicalizedTrailingHeaders), true);
chunkSignature = AWSSDKUtils.ToHex(AWS4Signer.SignBlob(((AWS4SigningResult)HeaderSigningResult).GetSigningKey(), chunkStringToSign), true);
}
else // SigV4a
{
chunkSignature = Sigv4aSigner.SignTrailingHeaderChunk(_trailingHeaders, PreviousChunkSignature, (AWS4aSigningResult)HeaderSigningResult).PadRight(V4A_SIGNATURE_LENGTH, '*');
}
var chunk = new StringBuilder();
// The order here must match the order of keys sent already in the X-Amz-Trailer header.
foreach (var kvp in _trailingHeaders.OrderBy(kvp => kvp.Key))
{
chunk.Append($"{kvp.Key}:{kvp.Value}{STREAM_NEWLINE}");
}
chunk.Append($"{TRAILING_HEADER_SIGNATURE_KEY}:{chunkSignature}{STREAM_NEWLINE}");
chunk.Append(STREAM_NEWLINE);
return chunk.ToString();
}
///
/// Copies the signed trailing headers to the output buffer
///
/// The buffer to write the data into
/// The byte offset in buffer at which to begin copying data
/// The maximum number of bytes to copy
/// Number of bytes copied
private int WriteTrailingHeaders(byte[] buffer, int offset, int count)
{
if (string.IsNullOrEmpty(_trailingHeaderChunk))
{
_trailingHeaderChunk = ConstructSignedTrailersChunk();
}
var lengthRemaining = _trailingHeaderChunk.Length - _trailingHeaderPos;
if (lengthRemaining == 0)
{
_trailingHeadersConsumed = true;
return 0;
}
if (lengthRemaining <= count)
{
Buffer.BlockCopy(Encoding.Default.GetBytes(_trailingHeaderChunk), _trailingHeaderPos, buffer, offset, lengthRemaining);
_trailingHeadersConsumed = true;
return lengthRemaining;
}
else // the trailing headers don't entirely fit in the buffer for the current read, so use the remaining count
{
Buffer.BlockCopy(Encoding.Default.GetBytes(_trailingHeaderChunk), _trailingHeaderPos, buffer, offset, count);
_trailingHeaderPos += count;
return count;
}
}
///
/// Length override to return the true length of the payload plus the metainfo
/// supplied with each chunk
///
public override long Length
{
get
{
if (BaseStream == null)
{
return 0;
}
return ComputeChunkedContentLength(BaseStream.Length, HeaderSigningResult is AWS4aSigningResult ? V4A_SIGNATURE_LENGTH : V4_SIGNATURE_LENGTH, _trailingHeaders, _trailingChecksum);
}
}
public override bool CanSeek
{
get
{
return false;
}
}
///
/// Computes the total size of the data payload, including the chunk metadata.
/// Called externally so as to be able to set the correct Content-Length header
/// value.
///
/// Length of the wrapped stream
/// Length of the signature for each chunk, in bytes
/// Total size of the wrapped payload, in bytes
public static long ComputeChunkedContentLength(long originalLength, int signatureLength)
{
return ComputeChunkedContentLength(originalLength, signatureLength, null, CoreChecksumAlgorithm.NONE);
}
///
/// Computes the total size of the data payload, including the chunk metadata
/// and optional trailing headers. Called externally so as to be able to set
/// the correct Content-Length header value.
///
/// Length of the wrapped stream
/// Length of the signature for each chunk, in bytes
/// Optional trailing headers
/// Optional checksum algorithm for a trailing header
/// Total size of the wrapped payload, in bytes
public static long ComputeChunkedContentLength(long originalLength, int signatureLength, IDictionary trailingHeaders, CoreChecksumAlgorithm trailingChecksum)
{
if (originalLength < 0)
{
throw new ArgumentOutOfRangeException("originalLength", "Expected 0 or greater value for originalLength.");
}
int trailingHeaderLength = 0;
long chunkedContentLength;
// Calculate the size of the chunked content, before trailing headers/checksum
if (originalLength == 0)
{
chunkedContentLength = CalculateChunkHeaderLength(0, signatureLength);
}
else
{
var maxSizeChunks = originalLength / DefaultChunkSize;
var remainingBytes = originalLength % DefaultChunkSize;
chunkedContentLength = maxSizeChunks * CalculateChunkHeaderLength(DefaultChunkSize, signatureLength)
+ (remainingBytes > 0 ? CalculateChunkHeaderLength(remainingBytes, signatureLength) : 0)
+ CalculateChunkHeaderLength(0, signatureLength);
}
if (trailingHeaders?.Count > 0)
{
foreach (var key in trailingHeaders.Keys)
{
// If the trailing checksum key is already in dictionary, use the
// expected length since the checksum value may not be set yet.
if (trailingChecksum != CoreChecksumAlgorithm.NONE && ChecksumUtils.GetChecksumHeaderKey(trailingChecksum) == key)
{
trailingHeaderLength += key.Length +
CryptoUtilFactory.GetChecksumBase64Length(trailingChecksum) + HEADER_ROW_PADDING_LENGTH;
}
else
{
trailingHeaderLength += key.Length + trailingHeaders[key].Length + HEADER_ROW_PADDING_LENGTH;
}
}
trailingHeaderLength += TRAILING_HEADER_SIGNATURE_KEY.Length + signatureLength + HEADER_ROW_PADDING_LENGTH;
}
return chunkedContentLength + trailingHeaderLength;
}
///
/// Builds the string to sign for a single V4/V4a chunk
///
/// Algorithm being used
/// ISO8601DateTime that we're signing the request for
/// Signing scope (date/region/service/aws4_request)
/// Previous chunk's unpadded signature
/// Length of the content for this chunk
/// Content of this chunk
/// The string to sign for this chunk
public static string BuildChunkedStringToSign(string prefix, string dateTime, string scope, string previousSignature, int dataLength, byte[] inputBuffer)
{
return prefix + "\n" +
dateTime + "\n" +
scope + "\n" +
previousSignature + "\n" +
AWSSDKUtils.ToHex(AWS4Signer.ComputeHash(""), true) + "\n" +
(dataLength > 0
? AWSSDKUtils.ToHex(AWS4Signer.ComputeHash(inputBuffer), true)
: AWS4Signer.EmptyBodySha256);
}
///
/// Computes the size of the header data for each chunk.
///
/// Payload size of each chunk, in bytes
/// Length of the signature for each chunk, in bytes
///
private static long CalculateChunkHeaderLength(long chunkDataSize, int signatureLength)
{
return chunkDataSize.ToString("X", CultureInfo.InvariantCulture).Length
+ CHUNK_SIGNATURE_HEADER.Length
+ signatureLength
+ NEWLINE_LENGTH
+ chunkDataSize
+ NEWLINE_LENGTH;
}
///
/// Attempt to read sufficient data for a whole chunk from the wrapped stream,
/// returning the number of bytes successfully read to be processed into a chunk
///
private int FillInputBuffer()
{
if (_wrappedStreamConsumed)
return 0;
var inputBufferPos = 0;
if (_readStrategy == ReadStrategy.ReadDirect)
{
while (inputBufferPos < _inputBuffer.Length && !_wrappedStreamConsumed)
{
// chunk buffer size may not align exactly with underlying buffer size
var chunkBufferRemaining = _inputBuffer.Length - inputBufferPos;
if (chunkBufferRemaining > _wrappedStreamBufferSize)
chunkBufferRemaining = _wrappedStreamBufferSize;
var bytesRead = BaseStream.Read(_inputBuffer, inputBufferPos, chunkBufferRemaining);
if (bytesRead == 0)
_wrappedStreamConsumed = true;
else
inputBufferPos += bytesRead;
}
}
else
{
var readBuffer = new byte[_wrappedStreamBufferSize];
while (inputBufferPos < _inputBuffer.Length && !_wrappedStreamConsumed)
{
var bytesRead = BaseStream.Read(readBuffer, 0, _wrappedStreamBufferSize);
if (bytesRead == 0)
_wrappedStreamConsumed = true;
else
{
Buffer.BlockCopy(readBuffer, 0, _inputBuffer, inputBufferPos, bytesRead);
inputBufferPos += bytesRead;
}
}
}
return inputBufferPos;
}
internal override bool HasLength
{
get
{
return HeaderSigningResult != null;
}
}
}
}