// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.

// Package processor defines the document processing unit interface
package processor

import (
	"fmt"
	"os"
	"path/filepath"
	"runtime/debug"
	"sync"
	"time"

	"github.com/aws/amazon-ssm-agent/agent/appconfig"
	"github.com/aws/amazon-ssm-agent/agent/context"
	"github.com/aws/amazon-ssm-agent/agent/contracts"
	"github.com/aws/amazon-ssm-agent/agent/fileutil"
	"github.com/aws/amazon-ssm-agent/agent/framework/docmanager"
	"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer"
	"github.com/aws/amazon-ssm-agent/agent/framework/processor/executer/outofproc"
	"github.com/aws/amazon-ssm-agent/agent/log"
	"github.com/aws/amazon-ssm-agent/agent/longrunning/manager"
	"github.com/aws/amazon-ssm-agent/agent/rebooter"
	"github.com/aws/amazon-ssm-agent/agent/task"
	"github.com/aws/amazon-ssm-agent/agent/times"
)

type ExecuterCreator func(ctx context.T) executer.Executer

// ErrorCode represents processor related error codes
type ErrorCode string

const (

	// hardstopTimeout is the time before the processor will be shutdown during a hardstop
	hardStopTimeout = time.Second * 4

	maxDocumentTimeOutHour = time.Hour * 48

	// CommandBufferFull denotes that the cancel command buffer is full
	CommandBufferFull ErrorCode = "CommandBufferFull"

	// ClosedProcessor denotes that the processor is closed
	ClosedProcessor ErrorCode = "ClosedProcessor"

	// UnsupportedDocType represents unsupported doc type
	UnsupportedDocType ErrorCode = "UnsupportedDocType"

	// DuplicateCommand represents duplicate command in the buffer
	DuplicateCommand ErrorCode = "DuplicateCommand"

	// InvalidDocumentId represents invalid document id
	InvalidDocumentId ErrorCode = "InvalidDocumentId"

	// ConversionFailed represents conversion from pool to processor error code failed
	ConversionFailed ErrorCode = "ConversionFailed"

	// SubmissionPanic represents panic during submission to the pool
	SubmissionPanic ErrorCode = "SubmissionPanic"
)

type Processor interface {
	//Start activate the Processor and pick up the leftover document in the last run, it returns a channel to caller to gather DocumentResult
	Start() (chan contracts.DocumentResult, error)
	//InitialProcessing processes any initial documents loaded from file directory. This should be run after Start().
	InitialProcessing(skipDocumentIfExpired bool) error
	//Stop the processor, save the current state to resume later
	Stop()
	//Submit to the pool a document in form of docState object, results will be streamed back from the central channel returned by Start()
	Submit(docState contracts.DocumentState) ErrorCode
	//Cancel cancels processing of the given document
	Cancel(docState contracts.DocumentState) ErrorCode
	//TODO do we need to implement CancelAll?
	//CancelAll()
}

// EngineProcessor defines methods to process the incoming document by pushing to the executor using JobPools
type EngineProcessor struct {
	context           context.T
	executerCreator   ExecuterCreator
	sendCommandPool   task.Pool
	cancelCommandPool task.Pool
	//TODO this should be abstract as the Processor's domain
	resChan       chan contracts.DocumentResult
	documentMgr   docmanager.DocumentMgr
	stopFlagMutex sync.RWMutex

	isProcessorStopped          bool
	startWorker                 *workerProcessorSpec
	cancelWorker                *workerProcessorSpec
	poolToProcessorErrorCodeMap map[task.PoolErrorCode]ErrorCode
}

// WorkerProcessorSpec contains properties and methods to specify worker related specifications needed for the processor
type workerProcessorSpec struct {
	workerLimit     int
	assignedDocType contracts.DocumentType
	bufferLimit     int
}

// GetAssignedDocType returns the assigned doc type
func (wps *workerProcessorSpec) GetAssignedDocType() contracts.DocumentType {
	return wps.assignedDocType
}

// GetWorkerLimit returns the worker limit
func (wps *workerProcessorSpec) GetWorkerLimit() int {
	return wps.workerLimit
}

// GetBufferLimit returns the worker buffer limit
func (wps *workerProcessorSpec) GetBufferLimit() int {
	return wps.bufferLimit
}

// NewWorkerProcessorSpec return new worker processor specification object reference
func NewWorkerProcessorSpec(ctx context.T, workerLimit int, assignedDocType contracts.DocumentType, bufferLimit int) *workerProcessorSpec {
	logger := ctx.Log()
	workerProcessorSpecObj := &workerProcessorSpec{
		workerLimit:     workerLimit,
		assignedDocType: assignedDocType,
		bufferLimit:     bufferLimit,
	}
	if workerLimit < 1 {
		logger.Warnf("wrong worker limit format, assigning default value as 5")
		workerProcessorSpecObj.workerLimit = 5
	}
	// 0 as buffer limit blocks the buffer channel
	// 0 is passed by association module and offline processor module
	if bufferLimit < 0 {
		logger.Infof("wrong buffer limit format, assigning default value as 1")
		workerProcessorSpecObj.bufferLimit = 1
	}
	if assignedDocType == "" {
		logger.Infof("empty worker type assigned, assigning random doc type")
		workerProcessorSpecObj.assignedDocType = "nodoctype" // dummy value
	}
	return workerProcessorSpecObj
}

// NewEngineProcessor returns the newly initiated EngineProcessor
// TODO worker pool should be triggered in the Start() function
// supported document types indicate the domain of the documents the Processor with run upon. There'll be race-conditions if there're multiple Processors in a certain domain.
func NewEngineProcessor(ctx context.T, startWorker *workerProcessorSpec, cancelWorker *workerProcessorSpec) *EngineProcessor {
	engineProcessorCtx := ctx.With("[EngineProcessor]")
	log := engineProcessorCtx.Log()
	// sendCommand and cancelCommand will be processed by separate worker pools,
	// so we can define the number of workers per each
	cancelWaitDuration := 10000 * time.Millisecond
	clock := times.DefaultClock
	resChan := make(chan contracts.DocumentResult)
	executerCreator := func(ctx context.T) executer.Executer {
		return outofproc.NewOutOfProcExecuter(ctx)
	}

	documentMgr := docmanager.NewDocumentFileMgr(engineProcessorCtx, appconfig.DefaultDataStorePath, appconfig.DefaultDocumentRootDirName, appconfig.DefaultLocationOfState)
	engineProcessor := &EngineProcessor{
		context:                     engineProcessorCtx,
		executerCreator:             executerCreator,
		resChan:                     resChan,
		documentMgr:                 documentMgr,
		sendCommandPool:             task.NewPool(log, startWorker.workerLimit, startWorker.bufferLimit, cancelWaitDuration, clock),
		cancelCommandPool:           task.NewPool(log, cancelWorker.workerLimit, cancelWorker.bufferLimit, cancelWaitDuration, clock),
		startWorker:                 startWorker,
		cancelWorker:                cancelWorker,
		poolToProcessorErrorCodeMap: make(map[task.PoolErrorCode]ErrorCode),
	}
	engineProcessor.loadProcessorPoolErrorCodes()
	return engineProcessor
}

func (p *EngineProcessor) Start() (resChan chan contracts.DocumentResult, err error) {
	context := p.context
	if context == nil {
		return nil, fmt.Errorf("EngineProcessor is not initialized")
	}
	log := context.Log()
	log.Info("Starting")

	resChan = p.resChan
	return
}

func (p *EngineProcessor) InitialProcessing(skipDocumentIfExpired bool) (err error) {
	context := p.context
	if context == nil {
		return fmt.Errorf("EngineProcessor is not initialized")
	}
	log := context.Log()

	log.Info("Initial processing")

	// preloading pending files is added here to handle the below case:
	// In-progress documents starts submission by pushing it to the pending state.
	// This may lead to load same documents again when calling function processPendingDocuments
	pendingFiles := p.getDocStateFiles(log, appconfig.DefaultLocationOfPending)
	//prioritize the ongoing document first
	p.processInProgressDocuments(skipDocumentIfExpired)
	//deal with the pending jobs that have not picked up by worker yet
	p.processPendingDocuments(pendingFiles)
	return
}

// checkDocSubmissionAllowed checks whether the processor submission is allowed or not
func (p *EngineProcessor) checkDocSubmissionAllowed(docState *contracts.DocumentState, taskPool task.Pool, bufferLimit int) (error ErrorCode) {
	logger := p.context.Log()
	tokenSize := taskPool.BufferTokensIssued()
	logger.Debugf("buffer limit start value for doc type %v with command id %v: tokenSize - %v bufferLimit - %v", docState.DocumentType, docState.DocumentInformation.DocumentID, tokenSize, bufferLimit)
	if bufferLimit == 0 { // No synchronization needed as this value is loaded during processor initialization
		return "" // No check needed when buffer limit is zero. sticking with old behavior
	}
	if p.hasProcessorStopped() { // additional check to drop it at the beginning itself
		return ClosedProcessor
	}
	jobId := p.getJobId(docState)
	errorCode := taskPool.AcquireBufferToken(jobId)
	if errorCode != "" {
		if processorErrorCode, ok := p.poolToProcessorErrorCodeMap[errorCode]; ok {
			return processorErrorCode
		} else {
			return ConversionFailed
		}
	}
	tokenSize = taskPool.BufferTokensIssued()
	// Success condition
	logger.Debugf("buffer limit end value for doc type %v with command id %v: tokenSize - %v bufferLimit - %v", docState.DocumentType, docState.DocumentInformation.DocumentID, tokenSize, bufferLimit)
	return "" // Success
}

// loadProcessorPoolErrorCodes loads processor pool error code mappings
func (p *EngineProcessor) loadProcessorPoolErrorCodes() {
	p.poolToProcessorErrorCodeMap[task.InvalidJobId] = InvalidDocumentId
	p.poolToProcessorErrorCodeMap[task.DuplicateCommand] = DuplicateCommand
	p.poolToProcessorErrorCodeMap[task.JobQueueFull] = CommandBufferFull
}

// cleanUpDocSubmissionOnError is used to clean-up initially acquired tokens
// call this function only after acquiring token successfully
func (p *EngineProcessor) cleanUpDocSubmissionOnError(doc *contracts.DocumentState) {
	if doc.DocumentType == p.startWorker.assignedDocType && p.startWorker.bufferLimit > 0 { // do not call release token when buffer limit is zero
		p.decrementCommandBuffer(doc, p.sendCommandPool)
	} else if doc.DocumentType == p.cancelWorker.assignedDocType && p.cancelWorker.bufferLimit > 0 {
		p.decrementCommandBuffer(doc, p.cancelCommandPool)
	}
}

// decrementCommandBuffer used to delete start worker document from buffer
func (p *EngineProcessor) decrementCommandBuffer(doc *contracts.DocumentState, sendCommandPool task.Pool) {
	logger := p.context.Log()
	// safety check
	if doc == nil {
		logger.Errorf("document is nil")
		return
	}
	jobId := p.getJobId(doc)
	errorCode := sendCommandPool.ReleaseBufferToken(jobId)
	tokenSize := sendCommandPool.BufferTokensIssued()
	logger.Debugf("current buffer size for doc type %v with command id %v: tokenSize: %v", doc.DocumentType, jobId, tokenSize)
	// should not happen at any time
	if errorCode != "" {
		logger.Warnf("clean up failed because of the following error code %v", errorCode)
		return
	}
	logger.Infof("cleaned up command %v with doc type %v", jobId, doc.DocumentType)
}

// Submit submits to the pool a document in form of docState object, results will be streamed back from the channel returned by Start()
func (p *EngineProcessor) Submit(docState contracts.DocumentState) (errorCode ErrorCode) {
	return p.submit(&docState, false)
}

// submit will send job to the sendCommandPool
func (p *EngineProcessor) submit(docState *contracts.DocumentState, isInProgressDocument bool) (errorCode ErrorCode) {
	log := p.context.Log()
	jobID := p.getJobId(docState)
	// checks whether the document submission allowed in send command pool
	// duplicate command check also happens here
	// when buffer limit is zero, we return success("") always which means the pool submit will be blocking if it is full already
	errorCode = p.checkProcessorSubmissionAllowed(docState)
	if errorCode != "" {
		return errorCode
	}
	log.Infof("document %v submission started", jobID)
	defer log.Infof("document %v submission ended", jobID)
	defer func() {
		if r := recover(); r != nil {
			errorCode = SubmissionPanic
			p.cleanUpDocSubmissionOnError(docState) // call this function only after acquiring token successfully
			log.Errorf("document %v submission panicked", jobID)
			log.Errorf("stacktrace:\n%s", debug.Stack())
		}
	}()
	if !isInProgressDocument {
		p.documentMgr.PersistDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfPending, *docState)
	}
	//TODO this is a hack, in future jobID should be managed by Processing engine itself, instead of inferring from job's internal field
	err := p.sendCommandPool.Submit(log, jobID, func(cancelFlag task.CancelFlag) {
		processCommand(
			p.context,
			p.executerCreator,
			cancelFlag,
			p.resChan,
			docState,
			p.documentMgr)
	})
	if err != nil {
		// currently, we have only Duplicate command error returned by the job pool
		// * When buffer is zero, we don't have issues as we do not acquire/release token in pool
		// * When buffer is > 0, we do acquire/release the token. In this case, the checkProcessorSubmissionAllowed would have been called at the beginning.
		//   Listing all possible combinations of states in Job pool for a document to discuss Duplicate command error:
		//   1) When job is in the job queue buffer and not yet processed - This case is not possible as we do not receive commands already in "job queue buffer".
		//   2) When job is released from job queue buffer and started processing - This case is also not possible as we do not receive commands already in "job store".
		p.cleanUpDocSubmissionOnError(docState)
		log.Error("Document Submission failed: ", err)
		//move the fail-to-submit document to corrupt folder
		p.documentMgr.MoveDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfPending, appconfig.DefaultLocationOfCorrupt)
		return "" // considered submission successful even though it failed
	}
	return "" // considered submission successful
}

// checkProcessorSubmissionAllowed checks whether the processor submission is allowed or not
func (p *EngineProcessor) checkProcessorSubmissionAllowed(doc *contracts.DocumentState) (error ErrorCode) {
	if doc.DocumentType == p.startWorker.assignedDocType {
		return p.checkDocSubmissionAllowed(doc, p.sendCommandPool, p.startWorker.bufferLimit)
	} else if doc.DocumentType == p.cancelWorker.assignedDocType {
		return p.checkDocSubmissionAllowed(doc, p.cancelCommandPool, p.cancelWorker.bufferLimit)
	}
	return UnsupportedDocType
}

// getJobId returns job id
func (p *EngineProcessor) getJobId(docState *contracts.DocumentState) string {
	var jobID string
	if docState.IsAssociation() {
		jobID = docState.DocumentInformation.AssociationID
	} else {
		jobID = docState.DocumentInformation.MessageID
	}
	return jobID
}

// Cancel pushes the command to CancelThread which is responsible for submitting to cancelCommandPool
func (p *EngineProcessor) Cancel(docState contracts.DocumentState) (errorCode ErrorCode) {
	return p.cancel(&docState, false)
}

func (p *EngineProcessor) cancel(docState *contracts.DocumentState, isInProgressDocument bool) (errorCode ErrorCode) {
	log := p.context.Log()
	jobID := p.getJobId(docState)

	log.Infof("document %v cancellation started", jobID)
	defer log.Infof("document %v cancellation ended", jobID)

	// checks whether the document submission allowed in cancel command pool
	// duplicate command checks also happens here
	// when buffer limit is zero, we return success("") which means the channel will be blocking if buffer is zero
	errorCode = p.checkProcessorSubmissionAllowed(docState)
	if errorCode != "" {
		return errorCode
	}

	defer func() {
		if r := recover(); r != nil {
			errorCode = SubmissionPanic
			p.cleanUpDocSubmissionOnError(docState) // call this function only after acquiring token successfully
			log.Errorf("document %v submission panicked", jobID)
			log.Errorf("stacktrace:\n%s", debug.Stack())
		}
	}()
	if !isInProgressDocument {
		p.documentMgr.PersistDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfPending, *docState)
	}
	err := p.cancelCommandPool.Submit(log, jobID, func(cancelFlag task.CancelFlag) {
		processCancelCommand(p.context, p.sendCommandPool, docState, p.documentMgr)
	})
	if err != nil {
		// currently, we have only Duplicate command error returned by the job pool
		// * When buffer is zero, we don't have issues as we do not acquire/release token in pool
		// * When buffer is > 0, we do acquire/release the token. In this case, the checkProcessorSubmissionAllowed would have been called at the beginning.
		//   Listing all possible combinations of states in Job pool for a document to discuss Duplicate command error:
		//   1) When job is in the job queue buffer and not yet processed - This case is not possible as we do not receive commands already in "job queue buffer".
		//   2) When job is released from job queue buffer and started processing - This case is also not possible as we do not receive commands already in "job store".
		p.cleanUpDocSubmissionOnError(docState)
		log.Error("CancelCommand failed", err)
	}
	return ""
}

// hasProcessorStopped checks whether the processor has stopped
func (p *EngineProcessor) hasProcessorStopped() bool {
	p.stopFlagMutex.RLock() // change to RWMutex
	defer p.stopFlagMutex.RUnlock()
	return p.isProcessorStopped
}

// hasProcessorStoppedAlready returns whether the processor stop is called once or not
func (p *EngineProcessor) hasProcessorStopCalledAlready() bool {
	p.stopFlagMutex.Lock()
	defer p.stopFlagMutex.Unlock()
	if p.isProcessorStopped {
		return true
	}
	p.isProcessorStopped = true
	return false
}

// Stop set the cancel flags of all the running jobs, which are to be captured by the command worker and shutdown gracefully
func (p *EngineProcessor) Stop() {
	if p.hasProcessorStopCalledAlready() {
		p.context.Log().Info("Processor stop called already")
		return
	}

	waitTimeout := time.Duration(p.context.AppConfig().Mds.StopTimeoutMillis) * time.Millisecond

	var wg sync.WaitGroup

	// shutdown the send command pool in a separate go routine
	wg.Add(1)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				p.context.Log().Errorf("Shutdown send command pool panic: %v", r)
				p.context.Log().Errorf("Stacktrace:\n%s", debug.Stack())
			}
		}()
		defer wg.Done()
		p.sendCommandPool.ShutdownAndWait(waitTimeout)
	}()

	// shutdown the cancel command pool in a separate go routine
	wg.Add(1)
	go func() {
		defer func() {
			if r := recover(); r != nil {
				p.context.Log().Errorf("Shutdown cancel command pool panic: %v", r)
				p.context.Log().Errorf("Stacktrace:\n%s", debug.Stack())
			}
		}()
		defer wg.Done()
		p.cancelCommandPool.ShutdownAndWait(waitTimeout)
	}()

	// wait for everything to shut down
	wg.Wait()
	// close the receiver channel only after we're sure all the ongoing jobs are stopped and no sender is on this channel
	close(p.resChan)
	p.context.Log().Info("processor closed")
}

// TODO remove the direct file dependency once we encapsulate docmanager package
func (p *EngineProcessor) processPendingDocuments(files []os.FileInfo) {
	log := p.context.Log()
	//iterate through all pending messages
	for _, f := range files {
		log.Infof("Found pending document - %v", f.Name())
		//inspect document state
		docState := p.documentMgr.GetDocumentState(f.Name(), appconfig.DefaultLocationOfPending)

		if p.isSupportedDocumentType(docState.DocumentType) {
			p.pushPersistedDocToJobPool(docState, appconfig.DefaultLocationOfPending, false)
		}
	}
}

func (p *EngineProcessor) getDocStateFiles(log log.T, docStateDir string) []os.FileInfo {
	var files []os.FileInfo
	instanceID, err := p.context.Identity().ShortInstanceID()
	if err != nil {
		log.Errorf("Failed to get short instanceID for process %v Documents: %v", docStateDir, err)
		return files
	}

	// process older documents from state folder
	docsLocation := docmanager.DocumentStateDir(instanceID, docStateDir)

	if isDirectoryEmpty, _ := fileutil.IsDirEmpty(docsLocation); isDirectoryEmpty {
		log.Debugf("No %v documents to process from %v", docStateDir, docsLocation)
		return files
	}

	//get all messages
	if files, err = fileutil.ReadDir(docsLocation); err != nil {
		log.Errorf("skipping reading %v documents from %v. unexpected error encountered - %v", docStateDir, docsLocation, err)
	}
	return files
}

// ProcessInProgressDocuments processes InProgress documents that have already dequeued and entered job pool
func (p *EngineProcessor) processInProgressDocuments(skipDocumentIfExpired bool) {
	log := p.context.Log()
	config := p.context.AppConfig()
	files := p.getDocStateFiles(log, appconfig.DefaultLocationOfCurrent)

	//iterate through all InProgress docs
	for _, f := range files {
		log.Infof("Found in-progress document - %v", f.Name())

		//inspect document state
		docState := p.documentMgr.GetDocumentState(f.Name(), appconfig.DefaultLocationOfCurrent)

		if p.isSupportedDocumentType(docState.DocumentType) {
			retryLimit := config.Mds.CommandRetryLimit
			if docState.DocumentInformation.RunCount >= retryLimit {
				p.documentMgr.MoveDocumentState(f.Name(), appconfig.DefaultLocationOfCurrent, appconfig.DefaultLocationOfCorrupt)
				continue
			}

			// increment the command run count
			docState.DocumentInformation.RunCount++

			p.documentMgr.PersistDocumentState(docState.DocumentInformation.DocumentID, appconfig.DefaultLocationOfCurrent, docState)

			log.Infof("Processing in-progress document %v", docState.DocumentInformation.DocumentID)
			if skipDocumentIfExpired && docState.DocumentInformation.CreatedDate != "" {
				createDate := times.ParseIso8601UTC(docState.DocumentInformation.CreatedDate)

				// Do not resume in-progress document is create date is 48 hours ago.
				if createDate.Add(maxDocumentTimeOutHour).Before(time.Now().UTC()) {
					log.Infof("Document %v expired %v, skipping", docState.DocumentInformation.DocumentID, docState.DocumentInformation.CreatedDate)
					p.documentMgr.MoveDocumentState(f.Name(), appconfig.DefaultLocationOfCurrent, appconfig.DefaultLocationOfCorrupt)
					continue
				}
			}

			//Submit the work to Job Pool so that we don't block for processing of new messages
			p.pushPersistedDocToJobPool(docState, appconfig.DefaultLocationOfCurrent, true)
		}
	}
}

// pushPersistedDocToJobPool pushes in-progress and pending documents to job pool during restart
func (p *EngineProcessor) pushPersistedDocToJobPool(docState contracts.DocumentState, docStateDir string, isInProgress bool) {
	logger := p.context.Log()
	// safety check
	defer func() {
		if r := recover(); r != nil {
			p.cleanUpDocSubmissionOnError(&docState)
			logger.Errorf("submitting to processor panicked %v %v", docState.DocumentInformation.DocumentID, r)
			logger.Errorf("stacktrace:\n%s", debug.Stack())
		}
	}()
	logger.Infof("Processing document %v from state dir %v", docState.DocumentInformation.DocumentID, docStateDir)
	for {
		var processorErrorCode ErrorCode
		if docState.DocumentType == p.startWorker.assignedDocType {
			processorErrorCode = p.submit(&docState, isInProgress)
		} else if docState.DocumentType == p.cancelWorker.assignedDocType {
			processorErrorCode = p.cancel(&docState, isInProgress)
		}
		if processorErrorCode == CommandBufferFull { // sleep only for command buffer full
			logger.Debugf("pausing in-progress submission for a second %v because of error code %v", docState.DocumentInformation.DocumentID, processorErrorCode)
			time.Sleep(time.Second)
			continue
		}
		if processorErrorCode != "" { // all errors except CommandBufferFull
			logger.Warnf("skipping in-progress document %v because of error code %v", docState.DocumentInformation.DocumentID, processorErrorCode)
		}
		break // break iteration for success and errors other than CommandBufferFull
	}
}

// isSupportedDocumentType returns whether the processor supports the document
func (p *EngineProcessor) isSupportedDocumentType(documentType contracts.DocumentType) bool {
	if documentType != "" {
		if p.startWorker.assignedDocType == documentType || p.cancelWorker.assignedDocType == documentType {
			return true
		}
	}
	return false
}

func processCommand(context context.T, executerCreator ExecuterCreator, cancelFlag task.CancelFlag, resChan chan contracts.DocumentResult, docState *contracts.DocumentState, docMgr docmanager.DocumentMgr) {
	log := context.Log()
	//persist the current running document
	docMgr.MoveDocumentState(
		docState.DocumentInformation.DocumentID,
		appconfig.DefaultLocationOfPending,
		appconfig.DefaultLocationOfCurrent)
	log.Debug("Running executer...")
	documentID := docState.DocumentInformation.DocumentID
	messageID := docState.DocumentInformation.MessageID
	e := executerCreator(context)
	docStore := executer.NewDocumentFileStore(documentID, appconfig.DefaultLocationOfCurrent, docState, docMgr, true)
	statusChan := e.Run(
		cancelFlag,
		&docStore,
	)
	// Listen for reboot
	var final *contracts.DocumentResult
	for res := range statusChan {
		func() {
			defer func() {
				if err := recover(); err != nil {
					log.Errorf("Failed to process status for document %s with error %v", documentID, err)
					log.Errorf("Stacktrace:\n%s", debug.Stack())
				}
			}()

			if res.LastPlugin == "" {
				log.Infof("sending document: %v complete response", documentID)
			} else {
				log.Infof("sending reply for plugin update: %v", res.LastPlugin)
			}

			final = &res
			handleCloudwatchPlugin(context, res.PluginResults, documentID)
			// when receiving the reply from workers, we do not have UpstreamServiceName populated
			// whenever we receive a response, we populate with the appropriate Upstream service
			// this is added to avoid changes in the workers
			res.UpstreamServiceName = docState.UpstreamServiceName
			// used to add topic to the payload in agent reply message in MGS interactor
			res.RelatedDocumentType = docState.DocumentType
			//hand off the message to Service
			resChan <- res

			log.Info("Done")
		}()
	}
	//TODO add shutdown as API call, move cancelFlag out of task pool; cancelFlag to contracts, nobody else above runplugins needs to create cancelFlag.
	// Shutdown/reboot detection
	if final == nil || final.LastPlugin != "" {
		log.Infof("document %v still in progress, shutting down...", messageID)
		return
	} else if final.Status == contracts.ResultStatusSuccessAndReboot {
		log.Infof("document %v requested reboot, need to resume", messageID)
		rebooter.RequestPendingReboot(context.Log())
		return
	}

	//persist : commands execution in completed folder (terminal state folder)
	log.Infof("execution of %v is over. Removing interimState from current folder", messageID)

	docMgr.RemoveDocumentState(
		documentID,
		appconfig.DefaultLocationOfCurrent)

}

// TODO CancelCommand is currently treated as a special type of Command by the Processor, but in general Cancel operation should be seen as a probe to existing commands
func processCancelCommand(context context.T, sendCommandPool task.Pool, docState *contracts.DocumentState, docMgr docmanager.DocumentMgr) {

	log := context.Log()
	//persist the final status of cancel-message in current folder
	docMgr.MoveDocumentState(
		docState.DocumentInformation.DocumentID,
		appconfig.DefaultLocationOfPending, appconfig.DefaultLocationOfCurrent)
	log.Debugf("Canceling job with id %v...", docState.CancelInformation.CancelMessageID)

	if found := sendCommandPool.Cancel(docState.CancelInformation.CancelMessageID); !found {
		log.Debugf("Job with id %v not found (possibly completed)", docState.CancelInformation.CancelMessageID)
		docState.CancelInformation.DebugInfo = fmt.Sprintf("Command %v couldn't be cancelled", docState.CancelInformation.CancelCommandID)
		docState.DocumentInformation.DocumentStatus = contracts.ResultStatusFailed
	} else {
		docState.CancelInformation.DebugInfo = fmt.Sprintf("Command %v cancelled", docState.CancelInformation.CancelCommandID)
		docState.DocumentInformation.DocumentStatus = contracts.ResultStatusSuccess
	}

	//persist : commands execution in completed folder (terminal state folder)
	log.Debugf("Execution of %v is over. Removing interimState file from Current folder", docState.DocumentInformation.MessageID)

	docMgr.RemoveDocumentState(
		docState.DocumentInformation.DocumentID,
		appconfig.DefaultLocationOfCurrent)

}

// TODO remove this once CloudWatch plugin is reworked
// temporary solution on plugins with shared responsibility with agent
func handleCloudwatchPlugin(context context.T, pluginResults map[string]*contracts.PluginResult, documentID string) {
	log := context.Log()
	instanceID, _ := context.Identity().InstanceID()
	//TODO once association service switches to use RC and CW goes away, remove this block
	for ID, pluginRes := range pluginResults {
		if pluginRes.PluginName == appconfig.PluginNameCloudWatch {
			log.Infof("Found %v to invoke lrpm invoker", pluginRes.PluginName)
			orchestrationRootDir := filepath.Join(
				appconfig.DefaultDataStorePath,
				instanceID,
				appconfig.DefaultDocumentRootDirName,
				context.AppConfig().Agent.OrchestrationRootDir)
			orchestrationDir := fileutil.BuildPath(orchestrationRootDir, documentID)
			manager.Invoke(context, ID, pluginRes, orchestrationDir)
		}
	}

}