// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing`
// permissions and limitations under the License.

// Package messageservice will be responsible for initializing MDS and MGS interactors and then
// launch message handlers to handle the commands received from interactors.
// This package is the starting point for the message service module.
package messageservice

import (
	"fmt"
	"runtime/debug"
	"sync"

	"github.com/aws/amazon-ssm-agent/agent/context"
	"github.com/aws/amazon-ssm-agent/agent/contracts"
	"github.com/aws/amazon-ssm-agent/agent/messageservice/interactor"
	"github.com/aws/amazon-ssm-agent/agent/messageservice/interactor/mdsinteractor"
	"github.com/aws/amazon-ssm-agent/agent/messageservice/interactor/mgsinteractor"
	"github.com/aws/amazon-ssm-agent/agent/messageservice/messagehandler"
	"github.com/aws/amazon-ssm-agent/agent/messageservice/messagehandler/processorwrappers"
	"github.com/aws/amazon-ssm-agent/agent/messageservice/utils"
	"github.com/aws/amazon-ssm-agent/agent/platform"
)

var (
	isPlatformNanoServer           = platform.IsPlatformNanoServer
	getProcessorWrapperDelegateMap = processorwrappers.GetProcessorWrapperDelegateMap
)

const (
	// ServiceName represents MessageService ICoreModule name
	ServiceName = "MessageService"
)

// MessageService is the core module for initializing MDS and MGS interactors
// and then launch message handlers which in turn initiates processors
// to process the commands from interactors
type MessageService struct {
	context         context.T
	name            string
	messageHandler  messagehandler.IMessageHandler
	interactors     []interactor.IInteractor
	msgServiceMutex sync.Mutex
}

// NewService instantiates MessageService object and assigns value if needed
func NewService(context context.T) contracts.ICoreModule {
	messageContext := context.With("[" + ServiceName + "]")
	log := messageContext.Log()

	instanceID, err := context.Identity().InstanceID()
	if instanceID == "" {
		log.Errorf("no instanceID provided, %s", err)
		return nil
	}

	messageService := &MessageService{
		context:        messageContext,
		name:           ServiceName,
		messageHandler: messagehandler.NewMessageHandler(messageContext),
	}

	isNanoServer, _ := isPlatformNanoServer(log)
	if !isNanoServer {
		log.Info("Appending MGSInteractor to MessageService interactors")
		mgsRef, err := mgsinteractor.New(messageContext, messageService.messageHandler)
		if err == nil {
			messageService.interactors = append(messageService.interactors, mgsRef)
		}
	}
	if !messageContext.AppConfig().Agent.ContainerMode {
		log.Info("Appending MDSInteractor to MessageService interactors")
		mdsRef, err := mdsinteractor.New(messageContext, messageService.messageHandler, nil)
		if err == nil {
			messageService.interactors = append(messageService.interactors, mdsRef)
		}
	}

	return messageService
}

// SetInteractor resets the interactor value
// TODO remove once we start doing stress tests using service mock framework
func (msgSvc *MessageService) SetInteractor(iInteractor []interactor.IInteractor) {
	msgSvc.interactors = iInteractor
}

// GetMessageHandler returns the message handler object reference
// TODO remove once we start doing stress tests using service mock framework
func (msgSvc *MessageService) GetMessageHandler() messagehandler.IMessageHandler {
	return msgSvc.messageHandler
}

// ICoreModule implementation

// ModuleName returns the name of module
func (msgSvc *MessageService) ModuleName() string {
	return msgSvc.name
}

// ModuleExecute starts the MessageService module
func (msgSvc *MessageService) ModuleExecute() (err error) {
	log := msgSvc.context.Log()
	log.Info("starting MessageService")
	// initialize message handler
	msgSvc.messageHandler.Initialize()

	var ableToOpenMGSConnection uint32
	var wg sync.WaitGroup
	errArr := make([]error, 0)
	for _, interactRef := range msgSvc.interactors {
		// this is a safety check
		if interactRef == nil {
			log.Error("skipping as the loaded interactor is nil")
			return
		}
		wg.Add(1)
		go func(interactor interactor.IInteractor) {
			interactorName := interactor.GetName()
			log.Infof("%v initialization started", interactorName)
			defer func() {
				wg.Done()
				log.Infof("%v initialization completed", interactorName)
				if msg := recover(); msg != nil {
					log.Errorf("%v initialization panicked: %v", interactorName, msg)
					log.Errorf("stacktrace:\n%s", debug.Stack())
				}
			}()
			// In MGS Interactor, control channel connection may retry indefinitely
			// This will be blocked during that case
			if err = interactor.Initialize(&ableToOpenMGSConnection); err != nil {
				errorMsg := fmt.Errorf("error occurred while initializing Interactor %v: %v", interactorName, err)
				log.Error(errorMsg)
				errArr = append(errArr, errorMsg)
				return
			}

			supportedWorkers := interactor.GetSupportedWorkers()
			log.Infof("supported workers for the interactor %v: %v", interactorName, supportedWorkers)

			// initializes and registers the processor with message handler
			msgSvc.initializeProcessor(interactor, supportedWorkers)
		}(interactRef)
	}
	wg.Wait()
	if len(errArr) != 0 {
		return fmt.Errorf("message service module execution threw error: %v", errArr)
	}
	return err
}

// initializeProcessor initializes and registers the processors with message handler required by interactors.
// Also, opens up the connections to receive commands after initialization is done
func (msgSvc *MessageService) initializeProcessor(interactorRef interactor.IInteractor, supportedWorkers []utils.WorkerName) {
	log := msgSvc.context.Log()

	processorWorkerConfigs := utils.LoadProcessorWorkerConfig(msgSvc.context)

	// we do not write anything in the delegates map once loaded
	// so concurrent read is fine
	processorWrapperDelegateMap := getProcessorWrapperDelegateMap()
	var wg sync.WaitGroup
	for _, workerName := range supportedWorkers {
		wg.Add(1)
		log.Infof("processor initialization started for worker %v belonging to %v", workerName, interactorRef.GetName())
		go func(worker utils.WorkerName) {
			defer func() {
				wg.Done()
				log.Infof("processor initialization completed for worker %v belonging to %v", worker, interactorRef.GetName())
				if msg := recover(); msg != nil {
					log.Errorf("%v processor initialization panicked: %v", worker, msg)
					log.Errorf("stacktrace:\n%s", debug.Stack())
				}
			}()
			if _, ok := processorWorkerConfigs[worker]; !ok {
				log.Errorf("worker name not present in the config: %v", worker)
				return
			}
			if _, ok := processorWrapperDelegateMap[worker]; !ok {
				log.Errorf("processor wrapper delegate not available for the worker name: %v", worker)
				return
			}
			procWrapper := processorWrapperDelegateMap[worker](msgSvc.context, processorWorkerConfigs[worker])
			procWrapperName := procWrapper.GetName()
			log.Infof("registering processor %v for the interactor: %v", procWrapperName, interactorRef.GetName())
			// When we try to re-register processor wrapper with same name, InitializeAndRegisterProcessor blocks until the first registered processor initialization is done.
			// This is done intentionally to make sure that we do not open connections to receive commands when in-progress and pending commands are not yet loaded
			err := msgSvc.messageHandler.InitializeAndRegisterProcessor(procWrapper)
			if err != nil {
				log.Warnf("error during initialization of processor wrapper %v: %v", procWrapperName, err) // No error is returned for now
			}
			// post processor initialization opens agent incoming job messages in the interactors after initialization of processor id done
			interactorRef.PostProcessorInitialization(worker)
		}(workerName)
	}
	wg.Wait()
}

// ModuleStop stops the MessageService module
func (msgSvc *MessageService) ModuleStop() error {
	log := msgSvc.context.Log()
	log.Infof("Stopping %v", msgSvc.name)
	var err error
	for _, interactRef := range msgSvc.interactors {
		// Perform actions to do by interactors before the message handler close
		// This function performs following operations:
		// close send failed reply job and message polling in MDS interactor
		// drop incoming agent job messages in MGS interactor
		interactRef.PreProcessorClose()
	}
	// close the launched processors
	err = msgSvc.messageHandler.Stop()
	if err != nil {
		log.Errorf("error occurred during closing message handlers: %v", err)
	}

	var wg sync.WaitGroup
	for _, interactRef := range msgSvc.interactors {
		// no action in MDS interactor
		// control channel is closed
		wg.Add(1)
		go func(interactor interactor.IInteractor) {
			defer wg.Done()
			closeErr := interactor.Close()
			if closeErr != nil {
				log.Errorf("error occurred while closing connection in interactor %v: %v", interactor.GetName(), closeErr)
			}
		}(interactRef)
	}
	wg.Wait()
	log.Infof("Stopped %v", msgSvc.name)
	return nil
}