// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package confighttp // import "go.opentelemetry.io/collector/config/confighttp" import ( "crypto/tls" "errors" "io" "net" "net/http" "time" "github.com/rs/cors" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "golang.org/x/net/http2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/config/internal" "go.opentelemetry.io/collector/extension/auth" ) const headerContentEncoding = "Content-Encoding" // HTTPClientSettings defines settings for creating an HTTP client. type HTTPClientSettings struct { // The target URL to send data to (e.g.: http://some.url:9411/v1/traces). Endpoint string `mapstructure:"endpoint"` // TLSSetting struct exposes TLS client configuration. TLSSetting configtls.TLSClientSetting `mapstructure:"tls"` // ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize. ReadBufferSize int `mapstructure:"read_buffer_size"` // WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize. WriteBufferSize int `mapstructure:"write_buffer_size"` // Timeout parameter configures `http.Client.Timeout`. Timeout time.Duration `mapstructure:"timeout"` // Additional headers attached to each HTTP request sent by the client. // Existing header values are overwritten if collision happens. // Header values are opaque since they may be sensitive. Headers map[string]configopaque.String `mapstructure:"headers"` // Custom Round Tripper to allow for individual components to intercept HTTP requests CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) // Auth configuration for outgoing HTTP calls. Auth *configauth.Authentication `mapstructure:"auth"` // The compression key for supported compression types within collector. Compression configcompression.CompressionType `mapstructure:"compression"` // MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open. // There's an already set value, and we want to override it only if an explicit value provided MaxIdleConns *int `mapstructure:"max_idle_conns"` // MaxIdleConnsPerHost is used to set a limit to the maximum idle HTTP connections the host can keep open. // There's an already set value, and we want to override it only if an explicit value provided MaxIdleConnsPerHost *int `mapstructure:"max_idle_conns_per_host"` // MaxConnsPerHost limits the total number of connections per host, including connections in the dialing, // active, and idle states. // There's an already set value, and we want to override it only if an explicit value provided MaxConnsPerHost *int `mapstructure:"max_conns_per_host"` // IdleConnTimeout is the maximum amount of time a connection will remain open before closing itself. // There's an already set value, and we want to override it only if an explicit value provided IdleConnTimeout *time.Duration `mapstructure:"idle_conn_timeout"` } // NewDefaultHTTPClientSettings returns HTTPClientSettings type object with // the default values of 'MaxIdleConns' and 'IdleConnTimeout'. // Other config options are not added as they are initialized with 'zero value' by GoLang as default. // We encourage to use this function to create an object of HTTPClientSettings. func NewDefaultHTTPClientSettings() HTTPClientSettings { // The default values are taken from the values of 'DefaultTransport' of 'http' package. maxIdleConns := 100 idleConnTimeout := 90 * time.Second return HTTPClientSettings{ MaxIdleConns: &maxIdleConns, IdleConnTimeout: &idleConnTimeout, } } // ToClient creates an HTTP client. func (hcs *HTTPClientSettings) ToClient(host component.Host, settings component.TelemetrySettings) (*http.Client, error) { tlsCfg, err := hcs.TLSSetting.LoadTLSConfig() if err != nil { return nil, err } transport := http.DefaultTransport.(*http.Transport).Clone() if tlsCfg != nil { transport.TLSClientConfig = tlsCfg } if hcs.ReadBufferSize > 0 { transport.ReadBufferSize = hcs.ReadBufferSize } if hcs.WriteBufferSize > 0 { transport.WriteBufferSize = hcs.WriteBufferSize } if hcs.MaxIdleConns != nil { transport.MaxIdleConns = *hcs.MaxIdleConns } if hcs.MaxIdleConnsPerHost != nil { transport.MaxIdleConnsPerHost = *hcs.MaxIdleConnsPerHost } if hcs.MaxConnsPerHost != nil { transport.MaxConnsPerHost = *hcs.MaxConnsPerHost } if hcs.IdleConnTimeout != nil { transport.IdleConnTimeout = *hcs.IdleConnTimeout } clientTransport := (http.RoundTripper)(transport) // The Auth RoundTripper should always be the innermost to ensure that // request signing-based auth mechanisms operate after compression // and header middleware modifies the request if hcs.Auth != nil { ext := host.GetExtensions() if ext == nil { return nil, errors.New("extensions configuration not found") } httpCustomAuthRoundTripper, aerr := hcs.Auth.GetClientAuthenticator(ext) if aerr != nil { return nil, aerr } clientTransport, err = httpCustomAuthRoundTripper.RoundTripper(clientTransport) if err != nil { return nil, err } } if len(hcs.Headers) > 0 { clientTransport = &headerRoundTripper{ transport: clientTransport, headers: hcs.Headers, } } // Compress the body using specified compression methods if non-empty string is provided. // Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed. if configcompression.IsCompressed(hcs.Compression) { clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression) if err != nil { return nil, err } } // wrapping http transport with otelhttp transport to enable otel instrumenetation if settings.TracerProvider != nil && settings.MeterProvider != nil { clientTransport = otelhttp.NewTransport( clientTransport, otelhttp.WithTracerProvider(settings.TracerProvider), otelhttp.WithMeterProvider(settings.MeterProvider), otelhttp.WithPropagators(otel.GetTextMapPropagator()), ) } if hcs.CustomRoundTripper != nil { clientTransport, err = hcs.CustomRoundTripper(clientTransport) if err != nil { return nil, err } } return &http.Client{ Transport: clientTransport, Timeout: hcs.Timeout, }, nil } // Custom RoundTripper that adds headers. type headerRoundTripper struct { transport http.RoundTripper headers map[string]configopaque.String } // RoundTrip is a custom RoundTripper that adds headers to the request. func (interceptor *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { for k, v := range interceptor.headers { req.Header.Set(k, string(v)) } // Send the request to next transport. return interceptor.transport.RoundTrip(req) } // HTTPServerSettings defines settings for creating an HTTP server. type HTTPServerSettings struct { // Endpoint configures the listening address for the server. Endpoint string `mapstructure:"endpoint"` // TLSSetting struct exposes TLS client configuration. TLSSetting *configtls.TLSServerSetting `mapstructure:"tls"` // CORS configures the server for HTTP cross-origin resource sharing (CORS). CORS *CORSSettings `mapstructure:"cors"` // Auth for this receiver Auth *configauth.Authentication `mapstructure:"auth"` // MaxRequestBodySize sets the maximum request body size in bytes MaxRequestBodySize int64 `mapstructure:"max_request_body_size"` // IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers // Experimental: *NOTE* this option is subject to change or removal in the future. IncludeMetadata bool `mapstructure:"include_metadata"` // Additional headers attached to each HTTP response sent to the client. // Header values are opaque since they may be sensitive. ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"` } // ToListener creates a net.Listener. func (hss *HTTPServerSettings) ToListener() (net.Listener, error) { listener, err := net.Listen("tcp", hss.Endpoint) if err != nil { return nil, err } if hss.TLSSetting != nil { var tlsCfg *tls.Config tlsCfg, err = hss.TLSSetting.LoadTLSConfig() if err != nil { return nil, err } tlsCfg.NextProtos = []string{http2.NextProtoTLS, "http/1.1"} listener = tls.NewListener(listener, tlsCfg) } return listener, nil } // toServerOptions has options that change the behavior of the HTTP server // returned by HTTPServerSettings.ToServer(). type toServerOptions struct { errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) } // ToServerOption is an option to change the behavior of the HTTP server // returned by HTTPServerSettings.ToServer(). type ToServerOption func(opts *toServerOptions) // WithErrorHandler overrides the HTTP error handler that gets invoked // when there is a failure inside httpContentDecompressor. func WithErrorHandler(e func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)) ToServerOption { return func(opts *toServerOptions) { opts.errHandler = e } } // WithDecoder provides support for additional decoders to be configured // by the caller. func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)) ToServerOption { return func(opts *toServerOptions) { if opts.decoders == nil { opts.decoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){} } opts.decoders[key] = dec } } // ToServer creates an http.Server from settings object. func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) { internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint) serverOpts := &toServerOptions{} for _, o := range opts { o(serverOpts) } handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders) if hss.MaxRequestBodySize > 0 { handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize) } if hss.Auth != nil { server, err := hss.Auth.GetServerAuthenticator(host.GetExtensions()) if err != nil { return nil, err } handler = authInterceptor(handler, server) } // TODO: emit a warning when non-empty CorsHeaders and empty CorsOrigins. if hss.CORS != nil && len(hss.CORS.AllowedOrigins) > 0 { co := cors.Options{ AllowedOrigins: hss.CORS.AllowedOrigins, AllowCredentials: true, AllowedHeaders: hss.CORS.AllowedHeaders, MaxAge: hss.CORS.MaxAge, } handler = cors.New(co).Handler(handler) } if hss.ResponseHeaders != nil { handler = responseHeadersHandler(handler, hss.ResponseHeaders) } // Enable OpenTelemetry observability plugin. // TODO: Consider to use component ID string as prefix for all the operations. handler = otelhttp.NewHandler( handler, "", otelhttp.WithTracerProvider(settings.TracerProvider), otelhttp.WithMeterProvider(settings.MeterProvider), otelhttp.WithPropagators(otel.GetTextMapPropagator()), otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string { return r.URL.Path }), ) // wrap the current handler in an interceptor that will add client.Info to the request's context handler = &clientInfoHandler{ next: handler, includeMetadata: hss.IncludeMetadata, } return &http.Server{ Handler: handler, }, nil } func responseHeadersHandler(handler http.Handler, headers map[string]configopaque.String) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { h := w.Header() for k, v := range headers { h.Set(k, string(v)) } handler.ServeHTTP(w, r) }) } // CORSSettings configures a receiver for HTTP cross-origin resource sharing (CORS). // See the underlying https://github.com/rs/cors package for details. type CORSSettings struct { // AllowedOrigins sets the allowed values of the Origin header for // HTTP/JSON requests to an OTLP receiver. An origin may contain a // wildcard (*) to replace 0 or more characters (e.g., // "http://*.domain.com", or "*" to allow any origin). AllowedOrigins []string `mapstructure:"allowed_origins"` // AllowedHeaders sets what headers will be allowed in CORS requests. // The Accept, Accept-Language, Content-Type, and Content-Language // headers are implicitly allowed. If no headers are listed, // X-Requested-With will also be accepted by default. Include "*" to // allow any request header. AllowedHeaders []string `mapstructure:"allowed_headers"` // MaxAge sets the value of the Access-Control-Max-Age response header. // Set it to the number of seconds that browsers should cache a CORS // preflight response for. MaxAge int `mapstructure:"max_age"` } func authInterceptor(next http.Handler, server auth.Server) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx, err := server.Authenticate(r.Context(), r.Header) if err != nil { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return } next.ServeHTTP(w, r.WithContext(ctx)) }) } func maxRequestBodySizeInterceptor(next http.Handler, maxRecvSize int64) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, maxRecvSize) next.ServeHTTP(w, r) }) }