/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*  Licensed to Elasticsearch B.V. under one or more contributor
*  license agreements. See the NOTICE file distributed with
*  this work for additional information regarding copyright
*  ownership. Elasticsearch B.V. licenses this file to you under
*  the Apache License, Version 2.0 (the "License"); you may
*  not use this file except in compliance with the License.
*  You may obtain a copy of the License at
*
* 	http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing,
*  software distributed under the License is distributed on an
*  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
*  KIND, either express or implied.  See the License for the
*  specific language governing permissions and limitations
*  under the License.
*/

using System;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;

namespace OpenSearch.Net
{
	/// <summary>
	/// Provides an <see cref="HttpContent"/> implementation that exposes an output <see cref="Stream"/>
	/// which can be written to directly. The ability to push data to the output stream differs from the
	/// <see cref="StreamContent"/> where data is pulled and not pushed.
	/// </summary>
	internal class RequestDataContent : HttpContent
	{
		private readonly RequestData _requestData;
		private readonly Func<RequestData, CompleteTaskOnCloseStream, RequestDataContent, TransportContext, Task> _onStreamAvailable;

		public RequestDataContent(RequestData requestData)
		{
			_requestData = requestData;
			Headers.ContentType = new MediaTypeHeaderValue(requestData.RequestMimeType);
			if (requestData.HttpCompression)
				Headers.ContentEncoding.Add("gzip");

			Task OnStreamAvailable(RequestData data, Stream stream, HttpContent content, TransportContext context)
			{
				if (data.HttpCompression)
					stream = new GZipStream(stream, CompressionMode.Compress, false);

				using(stream)
					data.PostData.Write(stream, data.ConnectionSettings);

				return Task.CompletedTask;
			}

			_onStreamAvailable = OnStreamAvailable;
		}
		public RequestDataContent(RequestData requestData, CancellationToken token)
		{
			_requestData = requestData;
			Headers.ContentType = new MediaTypeHeaderValue(requestData.RequestMimeType);
			if (requestData.HttpCompression)
				Headers.ContentEncoding.Add("gzip");

			async Task OnStreamAvailable(RequestData data, Stream stream, HttpContent content, TransportContext context)
			{
				if (data.HttpCompression)
					stream = new GZipStream(stream, CompressionMode.Compress, false);

				using (stream)
					await data.PostData.WriteAsync(stream, data.ConnectionSettings, token).ConfigureAwait(false);
			}

			_onStreamAvailable = OnStreamAvailable;
		}

		/// <summary>
		/// When this method is called, it calls the action provided in the constructor with the output
		/// stream to write to. Once the action has completed its work it closes the stream which will
		/// close this content instance and complete the HTTP request or response.
		/// </summary>
		/// <param name="stream">The <see cref="Stream"/> to which to write.</param>
		/// <param name="context">The associated <see cref="TransportContext"/>.</param>
		/// <returns>A <see cref="Task"/> instance that is asynchronously serializing the object's content.</returns>
		[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Exception is passed as task result.")]
		protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
		{
			var serializeToStreamTask = new TaskCompletionSource<bool>();
			var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
            await _onStreamAvailable(_requestData, wrappedStream, this, context).ConfigureAwait(false);
            await serializeToStreamTask.Task.ConfigureAwait(false);
		}

		/// <summary>
		/// Computes the length of the stream if possible.
		/// </summary>
		/// <param name="length">The computed length of the stream.</param>
		/// <returns><c>true</c> if the length has been computed; otherwise <c>false</c>.</returns>
		protected override bool TryComputeLength(out long length)
		{
			// We can't know the length of the content being pushed to the output stream.
			length = -1;
			return false;
		}

		internal class CompleteTaskOnCloseStream : DelegatingStream
		{
			private readonly TaskCompletionSource<bool> _serializeToStreamTask;

			public CompleteTaskOnCloseStream(Stream innerStream, TaskCompletionSource<bool> serializeToStreamTask)
				: base(innerStream)
			{
				Contract.Assert(serializeToStreamTask != null);
				_serializeToStreamTask = serializeToStreamTask;
			}

			protected override void Dispose(bool disposing)
			{
				_serializeToStreamTask.TrySetResult(true);
				base.Dispose();
			}

			public override void Close() => _serializeToStreamTask.TrySetResult(true);
		}

		/// <summary>
		/// Stream that delegates to inner stream.
		/// This is taken from System.Net.Http
		/// </summary>
		internal abstract class DelegatingStream : Stream
		{
			private readonly Stream _innerStream;

			protected DelegatingStream(Stream innerStream) => _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));

			public override bool CanRead => _innerStream.CanRead;

			public override bool CanSeek => _innerStream.CanSeek;

			public override bool CanWrite => _innerStream.CanWrite;

			public override long Length => _innerStream.Length;

			public override long Position
			{
				get => _innerStream.Position;
				set => _innerStream.Position = value;
			}

			public override int ReadTimeout
			{
				get => _innerStream.ReadTimeout;
				set => _innerStream.ReadTimeout = value;
			}

			public override bool CanTimeout => _innerStream.CanTimeout;

			public override int WriteTimeout
			{
				get => _innerStream.WriteTimeout;
				set => _innerStream.WriteTimeout = value;
			}

			protected override void Dispose(bool disposing)
			{
				if (disposing) _innerStream.Dispose();
				base.Dispose(disposing);
			}

			public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);

			public override int Read(byte[] buffer, int offset, int count) => _innerStream.Read(buffer, offset, count);

			public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
				_innerStream.ReadAsync(buffer, offset, count, cancellationToken);

			public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
				_innerStream.BeginRead(buffer, offset, count, callback, state);

			public override int EndRead(IAsyncResult asyncResult) => _innerStream.EndRead(asyncResult);

			public override int ReadByte() => _innerStream.ReadByte();

			public override void Flush() => _innerStream.Flush();

			public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken);

			public override void SetLength(long value) => _innerStream.SetLength(value);

			public override void Write(byte[] buffer, int offset, int count) => _innerStream.Write(buffer, offset, count);

			public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
				_innerStream.WriteAsync(buffer, offset, count, cancellationToken);

			public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
				_innerStream.BeginWrite(buffer, offset, count, callback, state);

			public override void EndWrite(IAsyncResult asyncResult) => _innerStream.EndWrite(asyncResult);

			public override void WriteByte(byte value) => _innerStream.WriteByte(value);

			public override void Close() => _innerStream.Close();
		}
	}
}