// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package manifest

import (
	"errors"
	"strings"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/copilot-cli/internal/pkg/manifest/manifestinfo"
	"github.com/aws/copilot-cli/internal/pkg/template"
	"github.com/imdario/mergo"
	"gopkg.in/yaml.v3"
)

const (
	workerSvcManifestPath = "workloads/services/worker/manifest.yml"
)

var (
	errUnmarshalQueueOpts  = errors.New(`cannot unmarshal "queue" field into bool or map`)
	errUnmarshalFifoConfig = errors.New(`unable to unmarshal "fifo" field into boolean or compose-style map`)
)

// WorkerService holds the configuration to create a worker service.
type WorkerService struct {
	Workload            `yaml:",inline"`
	WorkerServiceConfig `yaml:",inline"`
	// Use *WorkerServiceConfig because of https://github.com/imdario/mergo/issues/146
	Environments map[string]*WorkerServiceConfig `yaml:",flow"`
	parser       template.Parser
}

// Publish returns the list of topics where notifications can be published.
func (s *WorkerService) Publish() []Topic {
	return s.WorkerServiceConfig.PublishConfig.publishedTopics()
}

func (s *WorkerService) subnets() *SubnetListOrArgs {
	return &s.Network.VPC.Placement.Subnets
}

// WorkerServiceConfig holds the configuration that can be overridden per environments.
type WorkerServiceConfig struct {
	ImageConfig      ImageWithHealthcheck `yaml:"image,flow"`
	ImageOverride    `yaml:",inline"`
	TaskConfig       `yaml:",inline"`
	Logging          Logging                   `yaml:"logging,flow"`
	Sidecars         map[string]*SidecarConfig `yaml:"sidecars"` // NOTE: keep the pointers because `mergo` doesn't automatically deep merge map's value unless it's a pointer type.
	Subscribe        SubscribeConfig           `yaml:"subscribe"`
	PublishConfig    PublishConfig             `yaml:"publish"`
	Network          NetworkConfig             `yaml:"network"`
	TaskDefOverrides []OverrideRule            `yaml:"taskdef_overrides"`
	DeployConfig     WorkerDeploymentConfig    `yaml:"deployment"`
	Observability    Observability             `yaml:"observability"`
}

// SubscribeConfig represents the configurable options for setting up subscriptions.
type SubscribeConfig struct {
	Topics []TopicSubscription `yaml:"topics"`
	Queue  SQSQueue            `yaml:"queue"`
}

// IsEmpty returns empty if the struct has all zero members.
func (s *SubscribeConfig) IsEmpty() bool {
	return s.Topics == nil && s.Queue.IsEmpty()
}

// TopicSubscription represents the configurable options for setting up a SNS Topic Subscription.
type TopicSubscription struct {
	Name         *string                `yaml:"name"`
	Service      *string                `yaml:"service"`
	FilterPolicy map[string]interface{} `yaml:"filter_policy"`
	Queue        SQSQueueOrBool         `yaml:"queue"`
}

// SQSQueueOrBool is a custom type which supports unmarshaling yaml which
// can either be of type bool or type SQSQueue.
type SQSQueueOrBool struct {
	Advanced SQSQueue
	Enabled  *bool
}

// IsEmpty returns empty if the struct has all zero members.
func (q *SQSQueueOrBool) IsEmpty() bool {
	return q.Advanced.IsEmpty() && q.Enabled == nil
}

// UnmarshalYAML implements the yaml(v3) interface. It allows SQSQueueOrBool to be specified as a
// string or a struct alternately.
func (q *SQSQueueOrBool) UnmarshalYAML(value *yaml.Node) error {
	if err := value.Decode(&q.Advanced); err != nil {
		switch err.(type) {
		case *yaml.TypeError:
			break
		default:
			return err
		}
	}
	if !q.Advanced.IsEmpty() {
		// Unmarshaled successfully to q.Advanced, unset q.Enabled, and return.
		q.Enabled = nil
		return nil
	}
	if err := value.Decode(&q.Enabled); err != nil {
		return errUnmarshalQueueOpts
	}
	return nil
}

// SQSQueue represents the configurable options for setting up a SQS Queue.
type SQSQueue struct {
	Retention  *time.Duration          `yaml:"retention"`
	Delay      *time.Duration          `yaml:"delay"`
	Timeout    *time.Duration          `yaml:"timeout"`
	DeadLetter DeadLetterQueue         `yaml:"dead_letter"`
	FIFO       FIFOAdvanceConfigOrBool `yaml:"fifo"`
}

// FIFOAdvanceConfigOrBool represents the configurable options for fifo queues.
type FIFOAdvanceConfigOrBool struct {
	Enable   *bool
	Advanced FIFOAdvanceConfig
}

// IsEmpty returns true if the FifoAdvanceConfigOrBool struct has all nil values.
func (f *FIFOAdvanceConfigOrBool) IsEmpty() bool {
	return f.Enable == nil && f.Advanced.IsEmpty()
}

// IsEnabled returns true if the FifoAdvanceConfigOrBool struct has all nil values.
func (f *FIFOAdvanceConfigOrBool) IsEnabled() bool {
	return aws.BoolValue(f.Enable) || !f.Advanced.IsEmpty()
}

// FIFOAdvanceConfig represents the advanced fifo queue config.
type FIFOAdvanceConfig struct {
	ContentBasedDeduplication *bool   `yaml:"content_based_deduplication"`
	DeduplicationScope        *string `yaml:"deduplication_scope"`
	FIFOThroughputLimit       *string `yaml:"throughput_limit"`
	HighThroughputFifo        *bool   `yaml:"high_throughput"`
}

// IsEmpty returns true if the FifoAdvanceConfig struct has all nil values.
func (f *FIFOAdvanceConfig) IsEmpty() bool {
	return f.FIFOThroughputLimit == nil && f.HighThroughputFifo == nil &&
		f.DeduplicationScope == nil && f.ContentBasedDeduplication == nil
}

// UnmarshalYAML overrides the default YAML unmarshaling logic for the FifoAdvanceConfigOrBool
// struct, allowing it to perform more complex unmarshaling behavior.
// This method implements the yaml.Unmarshaler (v3) interface.
func (t *FIFOAdvanceConfigOrBool) UnmarshalYAML(value *yaml.Node) error {
	if err := value.Decode(&t.Advanced); err != nil {
		var yamlTypeErr *yaml.TypeError
		if !errors.As(err, &yamlTypeErr) {
			return err
		}
	}
	if !t.Advanced.IsEmpty() {
		return nil
	}
	if err := value.Decode(&t.Enable); err != nil {
		return errUnmarshalFifoConfig
	}
	return nil
}

// IsEmpty returns empty if the struct has all zero members.
func (q *SQSQueue) IsEmpty() bool {
	return q.Retention == nil && q.Delay == nil && q.Timeout == nil &&
		q.DeadLetter.IsEmpty() && q.FIFO.IsEmpty()
}

// DeadLetterQueue represents the configurable options for setting up a Dead-Letter Queue.
type DeadLetterQueue struct {
	Tries *uint16 `yaml:"tries"`
}

// IsEmpty returns empty if the struct has all zero members.
func (q *DeadLetterQueue) IsEmpty() bool {
	return q.Tries == nil
}

// WorkerServiceProps represents the configuration needed to create a worker service.
type WorkerServiceProps struct {
	WorkloadProps

	HealthCheck ContainerHealthCheck // Optional healthcheck configuration.
	Platform    PlatformArgsOrString // Optional platform configuration.
	Topics      []TopicSubscription  // Optional topics for subscriptions
	Queue       SQSQueue             // Optional queue configuration.
}

// NewWorkerService applies the props to a default Worker service configuration with
// minimal cpu/memory thresholds, single replica, no healthcheck, and then returns it.
func NewWorkerService(props WorkerServiceProps) *WorkerService {
	svc := newDefaultWorkerService()
	// Apply overrides.
	svc.Name = stringP(props.Name)
	svc.WorkerServiceConfig.ImageConfig.Image.Location = stringP(props.Image)
	svc.WorkerServiceConfig.ImageConfig.Image.Build.BuildArgs.Dockerfile = stringP(props.Dockerfile)
	svc.WorkerServiceConfig.ImageConfig.HealthCheck = props.HealthCheck
	svc.WorkerServiceConfig.Platform = props.Platform
	if isWindowsPlatform(props.Platform) {
		svc.WorkerServiceConfig.TaskConfig.CPU = aws.Int(MinWindowsTaskCPU)
		svc.WorkerServiceConfig.TaskConfig.Memory = aws.Int(MinWindowsTaskMemory)
	}
	if len(props.Topics) > 0 {
		setSubscriptionQueueDefaults(props.Topics, &svc.WorkerServiceConfig.Subscribe.Queue)
	}
	svc.WorkerServiceConfig.Subscribe.Topics = props.Topics
	svc.WorkerServiceConfig.Platform = props.Platform
	for _, envName := range props.PrivateOnlyEnvironments {
		svc.Environments[envName] = &WorkerServiceConfig{
			Network: NetworkConfig{
				VPC: vpcConfig{
					Placement: PlacementArgOrString{
						PlacementString: placementStringP(PrivateSubnetPlacement),
					},
				},
			},
		}
	}
	svc.parser = template.New()
	return svc
}

// setSubscriptionQueueDefaults function modifies the manifest to have
//  1. FIFO Topic names without ".fifo" suffix.
//  2. If there are both FIFO and Standard topic subscriptions are specified then set
//     default events queue to FIFO and add standard topic-specific queue for all the standard topic subscriptions.
//  3. If there are only Standard topic subscriptions are specified then do nothing and return.
func setSubscriptionQueueDefaults(topics []TopicSubscription, eventsQueue *SQSQueue) {
	var isFIFOEnabled bool
	for _, topic := range topics {
		if isFIFO(aws.StringValue(topic.Name)) {
			isFIFOEnabled = true
			break
		}
	}
	if !isFIFOEnabled {
		return
	}
	eventsQueue.FIFO.Enable = aws.Bool(true)
	for idx, topic := range topics {
		if isFIFO(aws.StringValue(topic.Name)) {
			topics[idx].Name = aws.String(strings.TrimSuffix(aws.StringValue(topic.Name), ".fifo"))
		} else {
			topics[idx].Queue.Enabled = aws.Bool(true)
		}
	}
}

func isFIFO(topic string) bool {
	return strings.HasSuffix(topic, ".fifo")
}

// MarshalBinary serializes the manifest object into a binary YAML document.
// Implements the encoding.BinaryMarshaler interface.
func (s *WorkerService) MarshalBinary() ([]byte, error) {
	content, err := s.parser.Parse(workerSvcManifestPath, *s, template.WithFuncs(map[string]interface{}{
		"fmtSlice":   template.FmtSliceFunc,
		"quoteSlice": template.QuoteSliceFunc,
	}))
	if err != nil {
		return nil, err
	}
	return content.Bytes(), nil
}

// BuildArgs returns a docker.BuildArguments object for the service given a context directory
func (s *WorkerService) BuildArgs(contextDir string) (map[string]*DockerBuildArgs, error) {
	required, err := requiresBuild(s.ImageConfig.Image)
	if err != nil {
		return nil, err
	}
	// Creating an map to store buildArgs of all sidecar images and main container image.
	buildArgsPerContainer := make(map[string]*DockerBuildArgs, len(s.Sidecars)+1)
	if required {
		buildArgsPerContainer[aws.StringValue(s.Name)] = s.ImageConfig.Image.BuildConfig(contextDir)
	}
	return buildArgs(contextDir, buildArgsPerContainer, s.Sidecars)
}

// EnvFiles returns the locations of all env files against the ws root directory.
// This method returns a map[string]string where the keys are container names
// and the values are either env file paths or empty strings.
func (s *WorkerService) EnvFiles() map[string]string {
	return envFiles(s.Name, s.TaskConfig, s.Logging, s.Sidecars)
}

// Subscriptions returns a list of TopicSubscriotion objects which represent the SNS topics the service
// receives messages from. This method also appends ".fifo" to the topics and returns a new set of subs.
func (s *WorkerService) Subscriptions() []TopicSubscription {
	var subs []TopicSubscription
	for _, topic := range s.Subscribe.Topics {
		topicSubscription := topic
		// if condition appends .fifo suffix to the topic which doesn't have topic specific queue and subscribing to default FIFO queue.
		if topic.Queue.IsEmpty() && !s.Subscribe.Queue.IsEmpty() && s.Subscribe.Queue.FIFO.IsEnabled() {
			topicSubscription.Name = aws.String(aws.StringValue(topic.Name) + ".fifo")
		} else if !topic.Queue.IsEmpty() && !topic.Queue.Advanced.IsEmpty() && topic.Queue.Advanced.FIFO.IsEnabled() { // else if condition appends .fifo suffix to the topic which has topic specific FIFO queue configuration.
			topicSubscription.Name = aws.String(aws.StringValue(topic.Name) + ".fifo")
		}
		subs = append(subs, topicSubscription)
	}
	return subs
}

func (s WorkerService) applyEnv(envName string) (workloadManifest, error) {
	overrideConfig, ok := s.Environments[envName]
	if !ok {
		return &s, nil
	}

	if overrideConfig == nil {
		return &s, nil
	}

	// Apply overrides to the original service s.
	for _, t := range defaultTransformers {
		err := mergo.Merge(&s, WorkerService{
			WorkerServiceConfig: *overrideConfig,
		}, mergo.WithOverride, mergo.WithTransformers(t))

		if err != nil {
			return nil, err
		}
	}
	s.Environments = nil
	return &s, nil
}

func (s *WorkerService) requiredEnvironmentFeatures() []string {
	var features []string
	features = append(features, s.Network.requiredEnvFeatures()...)
	features = append(features, s.Storage.requiredEnvFeatures()...)
	return features
}

// newDefaultWorkerService returns a Worker service with minimal task sizes and a single replica.
func newDefaultWorkerService() *WorkerService {
	return &WorkerService{
		Workload: Workload{
			Type: aws.String(manifestinfo.WorkerServiceType),
		},
		WorkerServiceConfig: WorkerServiceConfig{
			ImageConfig: ImageWithHealthcheck{},
			Subscribe:   SubscribeConfig{},
			TaskConfig: TaskConfig{
				CPU:    aws.Int(256),
				Memory: aws.Int(512),
				Count: Count{
					Value: aws.Int(1),
					AdvancedCount: AdvancedCount{ // Leave advanced count empty while passing down the type of the workload.
						workloadType: manifestinfo.WorkerServiceType,
					},
				},
				ExecuteCommand: ExecuteCommand{
					Enable: aws.Bool(false),
				},
			},
			Network: NetworkConfig{
				VPC: vpcConfig{
					Placement: PlacementArgOrString{
						PlacementString: placementStringP(PublicSubnetPlacement),
					},
				},
			},
		},
		Environments: map[string]*WorkerServiceConfig{},
	}
}

// ExposedPorts returns all the ports that are sidecar container ports available to receive traffic.
func (ws *WorkerService) ExposedPorts() (ExposedPortsIndex, error) {
	var exposedPorts []ExposedPort
	for name, sidecar := range ws.Sidecars {
		out, err := sidecar.exposedPorts(name)
		if err != nil {
			return ExposedPortsIndex{}, err
		}
		exposedPorts = append(exposedPorts, out...)
	}
	portsForContainer, containerForPort := prepareParsedExposedPortsMap(sortExposedPorts(exposedPorts))
	return ExposedPortsIndex{
		PortsForContainer: portsForContainer,
		ContainerForPort:  containerForPort,
	}, nil
}