// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kubelet // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet"

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"os"
	"strings"

	"go.uber.org/zap"
	"k8s.io/client-go/rest"

	"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
	"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

const (
	svcAcctCACertPath   = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
	svcAcctTokenPath    = "/var/run/secrets/kubernetes.io/serviceaccount/token" // #nosec
	defaultSecurePort   = "10250"
	defaultReadOnlyPort = "10255"
)

type Client interface {
	Get(path string) ([]byte, error)
}

func NewClientProvider(endpoint string, cfg *ClientConfig, logger *zap.Logger) (ClientProvider, error) {
	switch cfg.APIConfig.AuthType {
	case k8sconfig.AuthTypeTLS:
		return &tlsClientProvider{
			endpoint: endpoint,
			cfg:      cfg,
			logger:   logger,
		}, nil
	case k8sconfig.AuthTypeServiceAccount:
		return &saClientProvider{
			endpoint:   endpoint,
			caCertPath: svcAcctCACertPath,
			tokenPath:  svcAcctTokenPath,
			logger:     logger,
		}, nil
	case k8sconfig.AuthTypeNone:
		return &readOnlyClientProvider{
			endpoint: endpoint,
			logger:   logger,
		}, nil
	case k8sconfig.AuthTypeKubeConfig:
		return &kubeConfigClientProvider{
			endpoint: endpoint,
			cfg:      cfg,
			logger:   logger,
		}, nil
	default:
		return nil, fmt.Errorf("AuthType [%s] not supported", cfg.APIConfig.AuthType)
	}
}

type ClientProvider interface {
	BuildClient() (Client, error)
}

type kubeConfigClientProvider struct {
	endpoint string
	cfg      *ClientConfig
	logger   *zap.Logger
}

func (p *kubeConfigClientProvider) BuildClient() (Client, error) {
	authConf, err := k8sconfig.CreateRestConfig(p.cfg.APIConfig)
	if err != nil {
		return nil, err
	}
	if p.cfg.InsecureSkipVerify {
		// Override InsecureSkipVerify from kubeconfig
		authConf.TLSClientConfig.CAFile = ""
		authConf.TLSClientConfig.CAData = nil
		authConf.TLSClientConfig.Insecure = true
	}

	client, err := rest.HTTPClientFor(authConf)
	if err != nil {
		return nil, err
	}

	joinPath, err := url.JoinPath(authConf.Host, "/api/v1/nodes/", p.endpoint, "/proxy/")
	if err != nil {
		return nil, err
	}
	return &clientImpl{
		baseURL:    joinPath,
		httpClient: *client,
		tok:        nil,
		logger:     p.logger,
	}, nil

}

type readOnlyClientProvider struct {
	endpoint string
	logger   *zap.Logger
}

func (p *readOnlyClientProvider) BuildClient() (Client, error) {
	tr := defaultTransport()
	endpoint, err := buildEndpoint(p.endpoint, false, p.logger)
	if err != nil {
		return nil, err
	}
	return &clientImpl{
		baseURL:    endpoint,
		httpClient: http.Client{Transport: tr},
		tok:        nil,
		logger:     p.logger,
	}, nil

}

type tlsClientProvider struct {
	endpoint string
	cfg      *ClientConfig
	logger   *zap.Logger
}

func (p *tlsClientProvider) BuildClient() (Client, error) {
	rootCAs, err := systemCertPoolPlusPath(p.cfg.CAFile)
	if err != nil {
		return nil, err
	}
	clientCert, err := tls.LoadX509KeyPair(p.cfg.CertFile, p.cfg.KeyFile)
	if err != nil {
		return nil, err
	}
	return defaultTLSClient(
		p.endpoint,
		p.cfg.InsecureSkipVerify,
		rootCAs,
		[]tls.Certificate{clientCert},
		nil,
		p.logger,
	)
}

type saClientProvider struct {
	endpoint   string
	caCertPath string
	tokenPath  string
	logger     *zap.Logger
}

func (p *saClientProvider) BuildClient() (Client, error) {
	rootCAs, err := systemCertPoolPlusPath(p.caCertPath)
	if err != nil {
		return nil, err
	}
	tok, err := os.ReadFile(p.tokenPath)
	if err != nil {
		return nil, fmt.Errorf("unable to read token file %s: %w", p.tokenPath, err)
	}
	tr := defaultTransport()
	tr.TLSClientConfig = &tls.Config{
		RootCAs: rootCAs,
	}
	return defaultTLSClient(p.endpoint, true, rootCAs, nil, tok, p.logger)
}

func defaultTLSClient(
	endpoint string,
	insecureSkipVerify bool,
	rootCAs *x509.CertPool,
	certificates []tls.Certificate,
	tok []byte,
	logger *zap.Logger,
) (*clientImpl, error) {
	tr := defaultTransport()
	tr.TLSClientConfig = &tls.Config{
		RootCAs:            rootCAs,
		Certificates:       certificates,
		InsecureSkipVerify: insecureSkipVerify,
	}
	endpoint, err := buildEndpoint(endpoint, true, logger)
	if err != nil {
		return nil, err
	}
	return &clientImpl{
		baseURL:    endpoint,
		httpClient: http.Client{Transport: tr},
		tok:        tok,
		logger:     logger,
	}, nil
}

// buildEndpoint builds a kubelet endpoint based on value provided by user and whether secure or read-only endpoint
// should be used.
func buildEndpoint(endpoint string, useSecurePort bool, logger *zap.Logger) (string, error) {
	if endpoint == "" {
		// This will work if hostNetwork is turned on, in which case the pod has access
		// to the node's loopback device.
		// https://kubernetes.io/docs/concepts/policy/pod-security-policy/#host-namespaces
		host, err := os.Hostname()
		if err != nil {
			return "", fmt.Errorf("unable to get hostname for default endpoint: %w", err)
		}

		if useSecurePort {
			endpoint = fmt.Sprintf("https://%s:%s", host, defaultSecurePort)
		} else {
			endpoint = fmt.Sprintf("http://%s:%s", host, defaultReadOnlyPort)
		}
		logger.Warn("Kubelet endpoint not defined, using default endpoint " + endpoint)
		return endpoint, nil
	}

	if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
		if useSecurePort {
			return "https://" + endpoint, nil
		}
		return "http://" + endpoint, nil
	}

	return endpoint, nil
}

func defaultTransport() *http.Transport {
	return http.DefaultTransport.(*http.Transport).Clone()
}

// clientImpl

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
	baseURL    string
	httpClient http.Client
	logger     *zap.Logger
	tok        []byte
}

func (c *clientImpl) Get(path string) ([]byte, error) {
	req, err := c.buildReq(path)
	if err != nil {
		return nil, err
	}
	resp, err := c.httpClient.Do(req)
	if err != nil {
		return nil, err
	}
	defer func() {
		closeErr := resp.Body.Close()
		if closeErr != nil {
			c.logger.Warn("failed to close response body", zap.Error(closeErr))
		}
	}()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return nil, fmt.Errorf("failed to read Kubelet response body: %w", err)
	}

	if resp.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("kubelet request GET %s failed - %q, response: %q",
			sanitize.URL(req.URL), resp.Status, string(body))
	}

	return body, nil
}

func (c *clientImpl) buildReq(p string) (*http.Request, error) {
	reqURL, err := url.JoinPath(c.baseURL, p)
	if err != nil {
		return nil, err
	}
	req, err := http.NewRequest("GET", reqURL, nil)
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	if c.tok != nil {
		req.Header.Set("Authorization", fmt.Sprintf("bearer %s", c.tok))
	}
	return req, nil
}