/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
namespace OpenSearch.Net
{
///
/// Provides an implementation that exposes an output
/// which can be written to directly. The ability to push data to the output stream differs from the
/// where data is pulled and not pushed.
///
internal class RequestDataContent : HttpContent
{
private readonly RequestData _requestData;
private readonly Func _onStreamAvailable;
public RequestDataContent(RequestData requestData)
{
_requestData = requestData;
Headers.ContentType = new MediaTypeHeaderValue(requestData.RequestMimeType);
if (requestData.HttpCompression)
Headers.ContentEncoding.Add("gzip");
Task OnStreamAvailable(RequestData data, Stream stream, HttpContent content, TransportContext context)
{
if (data.HttpCompression)
stream = new GZipStream(stream, CompressionMode.Compress, false);
using(stream)
data.PostData.Write(stream, data.ConnectionSettings);
return Task.CompletedTask;
}
_onStreamAvailable = OnStreamAvailable;
}
public RequestDataContent(RequestData requestData, CancellationToken token)
{
_requestData = requestData;
Headers.ContentType = new MediaTypeHeaderValue(requestData.RequestMimeType);
if (requestData.HttpCompression)
Headers.ContentEncoding.Add("gzip");
async Task OnStreamAvailable(RequestData data, Stream stream, HttpContent content, TransportContext context)
{
if (data.HttpCompression)
stream = new GZipStream(stream, CompressionMode.Compress, false);
using (stream)
await data.PostData.WriteAsync(stream, data.ConnectionSettings, token).ConfigureAwait(false);
}
_onStreamAvailable = OnStreamAvailable;
}
///
/// When this method is called, it calls the action provided in the constructor with the output
/// stream to write to. Once the action has completed its work it closes the stream which will
/// close this content instance and complete the HTTP request or response.
///
/// The to which to write.
/// The associated .
/// A instance that is asynchronously serializing the object's content.
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Exception is passed as task result.")]
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
var serializeToStreamTask = new TaskCompletionSource();
var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
await _onStreamAvailable(_requestData, wrappedStream, this, context).ConfigureAwait(false);
await serializeToStreamTask.Task.ConfigureAwait(false);
}
///
/// Computes the length of the stream if possible.
///
/// The computed length of the stream.
/// true if the length has been computed; otherwise false.
protected override bool TryComputeLength(out long length)
{
// We can't know the length of the content being pushed to the output stream.
length = -1;
return false;
}
internal class CompleteTaskOnCloseStream : DelegatingStream
{
private readonly TaskCompletionSource _serializeToStreamTask;
public CompleteTaskOnCloseStream(Stream innerStream, TaskCompletionSource serializeToStreamTask)
: base(innerStream)
{
Contract.Assert(serializeToStreamTask != null);
_serializeToStreamTask = serializeToStreamTask;
}
protected override void Dispose(bool disposing)
{
_serializeToStreamTask.TrySetResult(true);
base.Dispose();
}
public override void Close() => _serializeToStreamTask.TrySetResult(true);
}
///
/// Stream that delegates to inner stream.
/// This is taken from System.Net.Http
///
internal abstract class DelegatingStream : Stream
{
private readonly Stream _innerStream;
protected DelegatingStream(Stream innerStream) => _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
public override bool CanRead => _innerStream.CanRead;
public override bool CanSeek => _innerStream.CanSeek;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;
public override long Position
{
get => _innerStream.Position;
set => _innerStream.Position = value;
}
public override int ReadTimeout
{
get => _innerStream.ReadTimeout;
set => _innerStream.ReadTimeout = value;
}
public override bool CanTimeout => _innerStream.CanTimeout;
public override int WriteTimeout
{
get => _innerStream.WriteTimeout;
set => _innerStream.WriteTimeout = value;
}
protected override void Dispose(bool disposing)
{
if (disposing) _innerStream.Dispose();
base.Dispose(disposing);
}
public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);
public override int Read(byte[] buffer, int offset, int count) => _innerStream.Read(buffer, offset, count);
public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_innerStream.ReadAsync(buffer, offset, count, cancellationToken);
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
_innerStream.BeginRead(buffer, offset, count, callback, state);
public override int EndRead(IAsyncResult asyncResult) => _innerStream.EndRead(asyncResult);
public override int ReadByte() => _innerStream.ReadByte();
public override void Flush() => _innerStream.Flush();
public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken);
public override void SetLength(long value) => _innerStream.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => _innerStream.Write(buffer, offset, count);
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_innerStream.WriteAsync(buffer, offset, count, cancellationToken);
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
_innerStream.BeginWrite(buffer, offset, count, callback, state);
public override void EndWrite(IAsyncResult asyncResult) => _innerStream.EndWrite(asyncResult);
public override void WriteByte(byte value) => _innerStream.WriteByte(value);
public override void Close() => _innerStream.Close();
}
}
}