// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"

import (
	"context"
	"errors"
	"fmt"
	"net"
	"strings"
	"time"

	"go.opentelemetry.io/collector/client"
	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/pdata/pmetric"
	"go.opentelemetry.io/collector/receiver"
	"go.uber.org/zap"

	"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
	"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/transport"
)

var _ receiver.Metrics = (*statsdReceiver)(nil)

// statsdReceiver implements the receiver.Metrics for StatsD protocol.
type statsdReceiver struct {
	settings receiver.CreateSettings
	config   *Config

	server       transport.Server
	reporter     transport.Reporter
	parser       protocol.Parser
	nextConsumer consumer.Metrics
	cancel       context.CancelFunc
}

// New creates the StatsD receiver with the given parameters.
func New(
	set receiver.CreateSettings,
	config Config,
	nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
	if nextConsumer == nil {
		return nil, component.ErrNilNextConsumer
	}

	if config.NetAddr.Endpoint == "" {
		config.NetAddr.Endpoint = "localhost:8125"
	}

	rep, err := newReporter(set)
	if err != nil {
		return nil, err
	}

	r := &statsdReceiver{
		settings:     set,
		config:       &config,
		nextConsumer: nextConsumer,
		reporter:     rep,
		parser: &protocol.StatsDParser{
			BuildInfo: set.BuildInfo,
		},
	}
	return r, nil
}

func buildTransportServer(config Config) (transport.Server, error) {
	// TODO: Add TCP/unix socket transport implementations
	switch strings.ToLower(config.NetAddr.Transport) {
	case "", "udp":
		return transport.NewUDPServer(config.NetAddr.Endpoint)
	}

	return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport)
}

// Start starts a UDP server that can process StatsD messages.
func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {
	ctx, r.cancel = context.WithCancel(ctx)
	server, err := buildTransportServer(*r.config)
	if err != nil {
		return err
	}
	r.server = server
	transferChan := make(chan transport.Metric, 10)
	ticker := time.NewTicker(r.config.AggregationInterval)
	err = r.parser.Initialize(
		r.config.EnableMetricType,
		r.config.IsMonotonicCounter,
		r.config.TimerHistogramMapping,
	)
	if err != nil {
		return err
	}
	go func() {
		if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil {
			if !errors.Is(err, net.ErrClosed) {
				host.ReportFatalError(err)
			}
		}
	}()
	go func() {
		for {
			select {
			case <-ticker.C:
				batchMetrics := r.parser.GetMetrics()
				for _, batch := range batchMetrics {
					batchCtx := client.NewContext(ctx, batch.Info)

					if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil {
						r.reporter.OnDebugf("Error flushing metrics", zap.Error(err))
					}
				}
			case metric := <-transferChan:
				if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil {
					r.reporter.OnDebugf("Error aggregating metric", zap.Error(err))
				}
			case <-ctx.Done():
				ticker.Stop()
				return
			}
		}
	}()

	return nil
}

// Shutdown stops the StatsD receiver.
func (r *statsdReceiver) Shutdown(context.Context) error {
	if r.cancel == nil || r.server == nil {
		return nil
	}
	err := r.server.Close()
	r.cancel()
	return err
}

func (r *statsdReceiver) Flush(ctx context.Context, metrics pmetric.Metrics, nextConsumer consumer.Metrics) error {
	return nextConsumer.ConsumeMetrics(ctx, metrics)
}