/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.NetworkInformation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenSearch.Net.Diagnostics;
using OpenSearch.Net.Extensions;
using static System.Net.DecompressionMethods;
namespace OpenSearch.Net
{
internal class WebProxy : IWebProxy
{
private readonly Uri _uri;
public WebProxy(Uri uri) => _uri = uri;
public ICredentials Credentials { get; set; }
public Uri GetProxy(Uri destination) => _uri;
public bool IsBypassed(Uri host) => host.IsLoopback;
}
/// The default IConnection implementation. Uses .
public class HttpConnection : IConnection
{
private static DiagnosticSource DiagnosticSource { get; } = new DiagnosticListener(DiagnosticSources.HttpConnection.SourceName);
private static readonly string MissingConnectionLimitMethodError =
$"Your target platform does not support {nameof(ConnectionConfiguration.ConnectionLimit)}"
+ $" please set {nameof(ConnectionConfiguration.ConnectionLimit)} to -1 on your connection configuration/settings."
+ $" this will cause the {nameof(HttpClientHandler.MaxConnectionsPerServer)} not to be set on {nameof(HttpClientHandler)}";
private RequestDataHttpClientFactory HttpClientFactory { get; }
public int InUseHandlers => HttpClientFactory.InUseHandlers;
public int RemovedHandlers => HttpClientFactory.RemovedHandlers;
public HttpConnection() => HttpClientFactory = new RequestDataHttpClientFactory(r => CreateHttpClientHandler(r));
public virtual TResponse Request(RequestData requestData)
where TResponse : class, IOpenSearchResponse, new()
{
var client = GetClient(requestData);
HttpResponseMessage responseMessage;
int? statusCode = null;
IEnumerable warnings = null;
Stream responseStream = null;
Exception ex = null;
string mimeType = null;
IDisposable receive = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary tcpStats = null;
ReadOnlyDictionary threadPoolStats = null;
try
{
var requestMessage = CreateHttpRequestMessage(requestData);
if (requestData.PostData != null)
SetContent(requestMessage, requestData);
using(requestMessage?.Content ?? (IDisposable)Stream.Null)
using (var d = DiagnosticSource.Diagnose(DiagnosticSources.HttpConnection.SendAndReceiveHeaders, requestData))
{
if (requestData.TcpStats)
tcpStats = TcpStats.GetStates();
if (requestData.ThreadPoolStats)
threadPoolStats = ThreadPoolStats.GetStats();
responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead).GetAwaiter().GetResult();
statusCode = (int)responseMessage.StatusCode;
d.EndState = statusCode;
}
requestData.MadeItToResponse = true;
responseMessage.Headers.TryGetValues("Warning", out warnings);
mimeType = responseMessage.Content.Headers.ContentType?.MediaType;
if (responseMessage.Content != null)
{
receive = DiagnosticSource.Diagnose(DiagnosticSources.HttpConnection.ReceiveBody, requestData, statusCode);
responseStream = responseMessage.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
}
}
catch (TaskCanceledException e)
{
ex = e;
}
catch (HttpRequestException e)
{
ex = e;
}
using(receive)
using (responseStream ??= Stream.Null)
{
var response = ResponseBuilder.ToResponse(requestData, ex, statusCode, warnings, responseStream, mimeType);
// set TCP and threadpool stats on the response here so that in the event the request fails after the point of
// gathering stats, they are still exposed on the call details. Ideally these would be set inside ResponseBuilder.ToResponse,
// but doing so would be a breaking change
response.ApiCall.TcpStats = tcpStats;
response.ApiCall.ThreadPoolStats = threadPoolStats;
return response;
}
}
public virtual async Task RequestAsync(RequestData requestData, CancellationToken cancellationToken)
where TResponse : class, IOpenSearchResponse, new()
{
var client = GetClient(requestData);
HttpResponseMessage responseMessage;
int? statusCode = null;
IEnumerable warnings = null;
Stream responseStream = null;
Exception ex = null;
string mimeType = null;
IDisposable receive = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary tcpStats = null;
ReadOnlyDictionary threadPoolStats = null;
requestData.IsAsync = true;
try
{
var requestMessage = CreateHttpRequestMessage(requestData);
if (requestData.PostData != null)
await SetContentAsync(requestMessage, requestData, cancellationToken).ConfigureAwait(false);
using(requestMessage?.Content ?? (IDisposable)Stream.Null)
using (var d = DiagnosticSource.Diagnose(DiagnosticSources.HttpConnection.SendAndReceiveHeaders, requestData))
{
if (requestData.TcpStats)
tcpStats = TcpStats.GetStates();
if (requestData.ThreadPoolStats)
threadPoolStats = ThreadPoolStats.GetStats();
responseMessage = await client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
statusCode = (int)responseMessage.StatusCode;
d.EndState = statusCode;
}
requestData.MadeItToResponse = true;
mimeType = responseMessage.Content.Headers.ContentType?.MediaType;
responseMessage.Headers.TryGetValues("Warning", out warnings);
if (responseMessage.Content != null)
{
receive = DiagnosticSource.Diagnose(DiagnosticSources.HttpConnection.ReceiveBody, requestData, statusCode);
responseStream = await responseMessage.Content.ReadAsStreamAsync().ConfigureAwait(false);
}
}
catch (TaskCanceledException e)
{
ex = e;
}
catch (HttpRequestException e)
{
ex = e;
}
using (receive)
using (responseStream = responseStream ?? Stream.Null)
{
var response = await ResponseBuilder.ToResponseAsync
(requestData, ex, statusCode, warnings, responseStream, mimeType, cancellationToken)
.ConfigureAwait(false);
// set TCP and threadpool stats on the response here so that in the event the request fails after the point of
// gathering stats, they are still exposed on the call details. Ideally these would be set inside ResponseBuilder.ToResponse,
// but doing so would be a breaking change
response.ApiCall.TcpStats = tcpStats;
response.ApiCall.ThreadPoolStats = threadPoolStats;
return response;
}
}
void IDisposable.Dispose() => DisposeManagedResources();
private HttpClient GetClient(RequestData requestData) => HttpClientFactory.CreateClient(requestData);
protected virtual HttpMessageHandler CreateHttpClientHandler(RequestData requestData)
{
var handler = new HttpClientHandler { AutomaticDecompression = requestData.HttpCompression ? GZip | Deflate : None, };
// same limit as desktop clr
if (requestData.ConnectionSettings.ConnectionLimit > 0)
{
try
{
handler.MaxConnectionsPerServer = requestData.ConnectionSettings.ConnectionLimit;
}
catch (MissingMethodException e)
{
throw new Exception(MissingConnectionLimitMethodError, e);
}
catch (PlatformNotSupportedException e)
{
throw new Exception(MissingConnectionLimitMethodError, e);
}
}
if (!requestData.ProxyAddress.IsNullOrEmpty())
{
var uri = new Uri(requestData.ProxyAddress);
var proxy = new WebProxy(uri);
if (!string.IsNullOrEmpty(requestData.ProxyUsername))
{
var credentials = new NetworkCredential(requestData.ProxyUsername, requestData.ProxyPassword);
proxy.Credentials = credentials;
}
handler.Proxy = proxy;
}
else if (requestData.DisableAutomaticProxyDetection) handler.UseProxy = false;
var callback = requestData.ConnectionSettings?.ServerCertificateValidationCallback;
if (callback != null && handler.ServerCertificateCustomValidationCallback == null)
handler.ServerCertificateCustomValidationCallback = callback;
if (requestData.ClientCertificates != null)
{
handler.ClientCertificateOptions = ClientCertificateOption.Manual;
handler.ClientCertificates.AddRange(requestData.ClientCertificates);
}
return handler;
}
protected virtual HttpRequestMessage CreateHttpRequestMessage(RequestData requestData)
{
var request = CreateRequestMessage(requestData);
SetAuthenticationIfNeeded(request, requestData);
return request;
}
protected virtual void SetAuthenticationIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
{
//If user manually specifies an Authorization Header give it preference
if (requestData.Headers.HasKeys() && requestData.Headers.AllKeys.Contains("Authorization"))
{
var header = AuthenticationHeaderValue.Parse(requestData.Headers["Authorization"]);
requestMessage.Headers.Authorization = header;
return;
}
// Api Key authentication takes precedence
var apiKeySet = SetApiKeyAuthenticationIfNeeded(requestMessage, requestData);
if (!apiKeySet)
SetBasicAuthenticationIfNeeded(requestMessage, requestData);
}
protected virtual bool SetApiKeyAuthenticationIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
{
// ApiKey auth credentials take the following precedence (highest -> lowest):
// 1 - Specified on the request (highest precedence)
// 2 - Specified at the global IConnectionSettings level
string apiKey = null;
if (requestData.ApiKeyAuthenticationCredentials != null)
apiKey = requestData.ApiKeyAuthenticationCredentials.Base64EncodedApiKey.CreateString();
if (string.IsNullOrWhiteSpace(apiKey))
return false;
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("ApiKey", apiKey);
return true;
}
protected virtual void SetBasicAuthenticationIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
{
// Basic auth credentials take the following precedence (highest -> lowest):
// 1 - Specified on the request (highest precedence)
// 2 - Specified at the global IConnectionSettings level
// 3 - Specified with the URI (lowest precedence)
string userInfo = null;
if (!requestData.Uri.UserInfo.IsNullOrEmpty())
userInfo = Uri.UnescapeDataString(requestData.Uri.UserInfo);
else if (requestData.BasicAuthorizationCredentials != null)
userInfo =
$"{requestData.BasicAuthorizationCredentials.Username}:{requestData.BasicAuthorizationCredentials.Password.CreateString()}";
if (!userInfo.IsNullOrEmpty())
{
var credentials = Convert.ToBase64String(Encoding.UTF8.GetBytes(userInfo));
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Basic", credentials);
}
}
protected virtual HttpRequestMessage CreateRequestMessage(RequestData requestData)
{
var method = ConvertHttpMethod(requestData.Method);
var requestMessage = new HttpRequestMessage(method, requestData.Uri);
if (requestData.Headers != null)
foreach (string key in requestData.Headers)
requestMessage.Headers.TryAddWithoutValidation(key, requestData.Headers.GetValues(key));
requestMessage.Headers.Connection.Clear();
requestMessage.Headers.ConnectionClose = false;
requestMessage.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(requestData.Accept));
if (!string.IsNullOrWhiteSpace(requestData.UserAgent))
{
requestMessage.Headers.UserAgent.Clear();
requestMessage.Headers.UserAgent.TryParseAdd(requestData.UserAgent);
}
if (!requestData.RunAs.IsNullOrEmpty())
requestMessage.Headers.Add(RequestData.RunAsSecurityHeader, requestData.RunAs);
if (requestData.MetaHeaderProvider is object)
{
var value = requestData.MetaHeaderProvider.ProduceHeaderValue(requestData);
if (!string.IsNullOrEmpty(value))
requestMessage.Headers.TryAddWithoutValidation(requestData.MetaHeaderProvider.HeaderName, value);
}
return requestMessage;
}
private static void SetContent(HttpRequestMessage message, RequestData requestData)
{
if (requestData.TransferEncodingChunked)
message.Content = new RequestDataContent(requestData);
else
{
var stream = requestData.MemoryStreamFactory.Create();
if (requestData.HttpCompression)
using (var zipStream = new GZipStream(stream, CompressionMode.Compress, true))
requestData.PostData.Write(zipStream, requestData.ConnectionSettings);
else
requestData.PostData.Write(stream, requestData.ConnectionSettings);
// the written bytes are uncompressed, so can only be used when http compression isn't used
if (requestData.PostData.DisableDirectStreaming.GetValueOrDefault(false) && !requestData.HttpCompression)
{
message.Content = new ByteArrayContent(requestData.PostData.WrittenBytes);
stream.Dispose();
}
else
{
stream.Position = 0;
message.Content = new StreamContent(stream);
}
if (requestData.HttpCompression)
message.Content.Headers.ContentEncoding.Add("gzip");
message.Content.Headers.ContentType = new MediaTypeHeaderValue(requestData.RequestMimeType);
}
}
private static async Task SetContentAsync(HttpRequestMessage message, RequestData requestData, CancellationToken cancellationToken)
{
if (requestData.TransferEncodingChunked)
message.Content = new RequestDataContent(requestData, cancellationToken);
else
{
var stream = requestData.MemoryStreamFactory.Create();
if (requestData.HttpCompression)
using (var zipStream = new GZipStream(stream, CompressionMode.Compress, true))
await requestData.PostData.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
else
await requestData.PostData.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
// the written bytes are uncompressed, so can only be used when http compression isn't used
if (requestData.PostData.DisableDirectStreaming.GetValueOrDefault(false) && !requestData.HttpCompression)
{
message.Content = new ByteArrayContent(requestData.PostData.WrittenBytes);
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
await stream.DisposeAsync().ConfigureAwait(false);
#else
stream.Dispose();
#endif
}
else
{
stream.Position = 0;
message.Content = new StreamContent(stream);
}
if (requestData.HttpCompression)
message.Content.Headers.ContentEncoding.Add("gzip");
message.Content.Headers.ContentType = new MediaTypeHeaderValue(requestData.RequestMimeType);
}
}
private static System.Net.Http.HttpMethod ConvertHttpMethod(HttpMethod httpMethod)
{
switch (httpMethod)
{
case HttpMethod.GET: return System.Net.Http.HttpMethod.Get;
case HttpMethod.POST: return System.Net.Http.HttpMethod.Post;
case HttpMethod.PUT: return System.Net.Http.HttpMethod.Put;
case HttpMethod.DELETE: return System.Net.Http.HttpMethod.Delete;
case HttpMethod.HEAD: return System.Net.Http.HttpMethod.Head;
default:
throw new ArgumentException("Invalid value for HttpMethod", nameof(httpMethod));
}
}
internal static int GetClientKey(RequestData requestData)
{
unchecked
{
var hashCode = requestData.RequestTimeout.GetHashCode();
hashCode = (hashCode * 397) ^ requestData.HttpCompression.GetHashCode();
hashCode = (hashCode * 397) ^ (requestData.ProxyAddress?.GetHashCode() ?? 0);
hashCode = (hashCode * 397) ^ (requestData.ProxyUsername?.GetHashCode() ?? 0);
hashCode = (hashCode * 397) ^ (requestData.ProxyPassword?.GetHashCode() ?? 0);
hashCode = (hashCode * 397) ^ requestData.DisableAutomaticProxyDetection.GetHashCode();
return hashCode;
}
}
protected virtual void DisposeManagedResources() => HttpClientFactory.Dispose();
}
}