// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.

// Package runcommand implements runcommand core processing module
package runcommand

import (
	"fmt"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"github.com/aws/amazon-ssm-agent/agent/contracts"
	"github.com/aws/amazon-ssm-agent/agent/framework/docmanager"
	mdsService "github.com/aws/amazon-ssm-agent/agent/runcommand/mds"
	"github.com/aws/amazon-ssm-agent/agent/sdkutil"
	"github.com/aws/aws-sdk-go/service/ssmmds"
	"github.com/carlescere/scheduler"
)

const (
	documentContent  = "DocumentContent"
	runtimeConfig    = "runtimeConfig"
	cloudwatchPlugin = "aws:cloudWatch"
	properties       = "properties"
	parameters       = "Parameters"
	// MDS service will mark document as timeout if it didn't recieve any responce from the agent after 2 hours
	documentLevelTimeOutDurationHour = 2
)

var singletonMapOfUnsupportedSSMDocs map[string]bool
var once sync.Once

var loadDocStateFromSendCommand = parseSendCommandMessage
var loadDocStateFromCancelCommand = parseCancelCommandMessage

// ModuleName returns the module name
func (s *RunCommandService) ModuleName() string {
	return s.name
}

// ModuleExecute starts the scheduling of the message processor plugin
func (s *RunCommandService) ModuleExecute() (err error) {
	log := s.context.Log()
	defer func() {
		if msg := recover(); msg != nil {
			log.Errorf("run command ModuleExecute run panic: %v", msg)
		}
	}()
	log.Info("Starting document processing engine...")
	var resultChan chan contracts.DocumentResult
	if resultChan, err = s.processor.Start(); err != nil {
		log.Errorf("unable to start document processor: %v", err)
		return
	}

	go s.listenReply(resultChan)

	if err = s.processor.InitialProcessing(true); err != nil {
		log.Errorf("initial processing in EngineProcessor encountered error: %v", err)
		return
	}

	log.Info("Scheduling message polling")
	s.messagePollWaitGroup = &sync.WaitGroup{}
	if s.messagePollJob, err = scheduler.Every(pollMessageFrequencyMinutes).Minutes().Run(s.messagePollLoop); err != nil {
		s.context.Log().Errorf("unable to schedule message poll job. %v", err)
	}

	log.Info("Starting send replies to MDS")
	if s.sendReplyJob, err = scheduler.Every(sendReplyFrequencyMinutes).Minutes().Run(s.sendReplyLoop); err != nil {
		s.context.Log().Errorf("unable to schedule send reply job. %v", err)
	}
	return
}

func (s *RunCommandService) ModuleStop() (err error) {
	//first stop sending failed replies to the service and the message poller
	s.stop()
	//second stop the message processor
	s.processor.Stop()
	return nil
}

func (s *RunCommandService) listenReply(resultChan chan contracts.DocumentResult) {
	log := s.context.Log()
	//processor guarantees to close this channel upon stop
	for res := range resultChan {
		func() {
			defer func() {
				if err := recover(); err != nil {
					log.Errorf("Failed to process replies with error %v", err)
					log.Errorf("Stracktrace:\n%s", debug.Stack())
				}
			}()

			if res.LastPlugin != "" {
				log.Infof("received plugin: %v result from Processor", res.LastPlugin)
			} else {
				log.Infof("command: %v complete", res.MessageID)
				//Deleting Old Log Files after the execution is over and files have been moved to completed folder
				//clean completed document state files and orchestration dirs. Takes care of only files generated by RunCommand in the folder
				shortInstanceId, _ := s.context.Identity().ShortInstanceID()
				go docmanager.DeleteOldOrchestrationDirectories(log,
					shortInstanceId,
					s.context.AppConfig().Agent.OrchestrationRootDir,
					s.context.AppConfig().Ssm.RunCommandLogsRetentionDurationHours,
					s.context.AppConfig().Ssm.AssociationLogsRetentionDurationHours)
			}
			s.sendResponse(res.MessageID, res)
		}()
	}
}

func (s *RunCommandService) processMessage(msg *ssmmds.Message) {
	var (
		docState *contracts.DocumentState
		err      error
	)

	// create separate logger that includes messageID with every log message
	context := s.context.With("[messageID=" + *msg.MessageId + "]")
	log := context.Log()
	log.Debug("Processing message")

	if err = validate(msg); err != nil {
		log.Error("message not valid, ignoring: ", err)
		return
	}

	if strings.HasPrefix(*msg.Topic, string(SendCommandTopicPrefix)) {
		docState, err = loadDocStateFromSendCommand(context, msg, s.orchestrationRootDir)
		if err != nil {
			log.Error(err)
			s.sendDocLevelResponse(*msg.MessageId, contracts.ResultStatusFailed, err.Error())
			return
		}
	} else if strings.HasPrefix(*msg.Topic, string(CancelCommandTopicPrefix)) {
		docState, err = loadDocStateFromCancelCommand(context, msg, s.orchestrationRootDir)
	} else {
		err = fmt.Errorf("unexpected topic name %v", *msg.Topic)
	}

	if err != nil {
		log.Error("format of received message is invalid ", err)
		if err = s.service.FailMessage(log, *msg.MessageId, mdsService.InternalHandlerException); err != nil {
			sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
		}
		return
	}
	if err = s.service.AcknowledgeMessage(log, *msg.MessageId); err != nil {
		sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
		return
	}

	log.Debugf("Ack done. Received message - messageId - %v", *msg.MessageId)

	log.Debugf("Processing to send a reply to update the document status to InProgress")

	//TODO This function should be called in service when it submits the document to the engine
	s.sendDocLevelResponse(*msg.MessageId, contracts.ResultStatusInProgress, "")

	log.Debugf("SendReply done. Received message - messageId - %v", *msg.MessageId)
	switch docState.DocumentType {
	case contracts.SendCommandOffline, contracts.SendCommand:
		s.processor.Submit(*docState)
	case contracts.CancelCommandOffline, contracts.CancelCommand:
		s.processor.Cancel(*docState)

	default:
		log.Error("unexpected document type ", docState.DocumentType)
	}

}

// sendFailedReplies loads replies from local disk and send it again to the service, if it fails no action is needed
func (s *RunCommandService) sendFailedReplies() {
	log := s.context.Log()

	log.Debug("Checking if there are document replies that failed to reach the service, and retry sending them")
	replies := s.service.LoadFailedReplies(log)

	if len(replies) != 0 {
		log.Infof("Found document replies that need to be sent to the service")
		for _, reply := range replies {
			log.Debug("Loading reply ", reply)
			if isValidReplyRequest(reply) == false {
				log.Debug("Reply is old, document execution must have timed out. Deleting the reply")
				s.service.DeleteFailedReply(log, reply)
				continue
			}
			sendReplyRequest, err := s.service.GetFailedReply(log, reply)
			if err != nil {
				log.Error("Couldn't load the reply from disk ", err)
				continue
			}

			log.Info("Sending reply ", reply)
			if err = s.service.SendReplyWithInput(log, sendReplyRequest); err != nil {
				sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
				break
			} else {
				log.Infof("Sending reply %v succeeded, deleting the reply file from disk", reply)
				s.service.DeleteFailedReply(log, reply)
			}
		}
	} else {
		log.Debugf("No failed document replies found")
	}
}

// isValidReplyRequest checks if the sendReply request is older than 2 hours
// If so it is considered as not valid anymore as the document must have timed out
func isValidReplyRequest(filename string) bool {
	splitFileName := strings.Split(filename, "_")
	if len(splitFileName) < 2 {
		return false
	}
	t, _ := time.Parse("2006-01-02T15-04-05", splitFileName[1])
	curTime := time.Now().UTC()
	delta := curTime.Sub(t).Hours()
	if delta > documentLevelTimeOutDurationHour {
		return false
	} else {
		return true
	}
}