/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or
* its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root of this
* distribution (the "License"). All use of this software is governed by the License,
* or, if provided, by the license below or the license accompanying this file. Do not
* remove or modify any license notices. This file is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/

#include "rccontroller.h"
#include <QStringList>
#include <QTimer>
#include <QThread>
#include <QtMath> // for qCeil
#include <QCoreApplication>
#include <QThreadPool>

#include "native/utilities/assetUtils.h"
#include "native/utilities/PlatformConfiguration.h"
#include "native/resourcecompiler/RCCommon.h"

#include <AzFramework/Asset/AssetProcessorMessages.h>
#include <AssetBuilderSDK/AssetBuilderBusses.h>
#include <AzCore/Casting/numeric_cast.h>

namespace AssetProcessor
{
    RCController::RCController(int cfg_minJobs, int cfg_maxJobs, QObject* parent)
        : QObject(parent)
        , m_dispatchingJobs(false)
        , m_shuttingDown(false)
    {
        AssetProcessorPlatformBus::Handler::BusConnect();

        // Determine a good starting value for max jobs
        int maxJobs = QThread::idealThreadCount();
        if (maxJobs == -1)
        {
            maxJobs = 3;
        }

        maxJobs = qMax<int>(maxJobs - 1, 1);

        // if the user has specified max jobs in the cfg file, then we obey their request
        // regardless of whether they have chosen something bad or not - they would have had to explicitly
        // pick this value (we ship with default 0 meaning auto), so if they've changed it, they intend it that way
        m_maxJobs = cfg_maxJobs ? qMax(cfg_minJobs, cfg_maxJobs) :  maxJobs;

        m_RCQueueSortModel.AttachToModel(&m_RCJobListModel);

        // make sure that the global thread pool has enough slots to accomidate your request though, since 
        // by default, the global thread pool has idealThreadCount() slots only.
        // leave an extra slot for non-job work.
        int currentMaxThreadCount = QThreadPool::globalInstance()->maxThreadCount();
        int newMaxThreadCount = qMax<int>(currentMaxThreadCount, m_maxJobs + 1);
        QThreadPool::globalInstance()->setMaxThreadCount(newMaxThreadCount);

        QObject::connect(this, &RCController::EscalateJobs, &m_RCQueueSortModel, &AssetProcessor::RCQueueSortModel::OnEscalateJobs);
    }

    RCController::~RCController()
    {
        AssetProcessorPlatformBus::Handler::BusDisconnect();
        m_RCQueueSortModel.AttachToModel(nullptr);
    }

    RCJobListModel* RCController::GetQueueModel()
    {
        return &m_RCJobListModel;
    }

    void RCController::StartJob(RCJob* rcJob)
    {
        Q_ASSERT(rcJob);
        // request to be notified when job is done
        QObject::connect(rcJob, &RCJob::Finished, this, [this, rcJob]()
        {
            FinishJob(rcJob);
        }, Qt::QueuedConnection);

        // Mark as "being processed" by moving to Processing list
        m_RCJobListModel.markAsProcessing(rcJob);
        m_RCJobListModel.markAsStarted(rcJob);
        Q_EMIT JobStatusChanged(rcJob->GetJobEntry(), AzToolsFramework::AssetSystem::JobStatus::InProgress);
        rcJob->Start();
        Q_EMIT JobStarted(rcJob->GetJobEntry().m_pathRelativeToWatchFolder, QString::fromUtf8(rcJob->GetPlatformInfo().m_identifier.c_str()));
    }

    void RCController::QuitRequested()
    {
        m_shuttingDown = true;

        if (m_RCJobListModel.jobsInFlight() == 0)
        {
            Q_EMIT ReadyToQuit(this);
            return;
        }

        QTimer::singleShot(10, this, SLOT(QuitRequested()));
    }

    int RCController::NumberOfPendingCriticalJobsPerPlatform(QString platform)
    {
        return m_pendingCriticalJobsPerPlatform[platform.toLower()];
    }

    int RCController::NumberOfPendingJobsPerPlatform(QString platform)
    {
        return m_jobsCountPerPlatform[platform.toLower()];
    }

    void RCController::FinishJob(RCJob* rcJob)
    {
        m_RCQueueSortModel.RemoveJobIdEntry(rcJob);
        QString platform = rcJob->GetPlatformInfo().m_identifier.c_str();
        auto found = m_jobsCountPerPlatform.find(platform);
        if (found != m_jobsCountPerPlatform.end())
        {
            int prevCount = found.value();
            if (prevCount > 0)
            {
                int newCount = prevCount - 1;
                m_jobsCountPerPlatform[platform] = newCount;
                Q_EMIT JobsInQueuePerPlatform(platform, newCount);
            }
        }

        if (rcJob->IsCritical())
        {
            int criticalJobsCount = m_pendingCriticalJobsPerPlatform[platform.toLower()] - 1;
            m_pendingCriticalJobsPerPlatform[platform.toLower()] = criticalJobsCount;
        }

        if (rcJob->GetState() == RCJob::cancelled)
        {
            Q_EMIT FileCancelled(rcJob->GetJobEntry());
        }
        else if (rcJob->GetState() != RCJob::completed)
        {
            Q_EMIT FileFailed(rcJob->GetJobEntry());
            Q_EMIT JobStatusChanged(rcJob->GetJobEntry(), AzToolsFramework::AssetSystem::JobStatus::Failed);
        }
        else
        {
            Q_EMIT FileCompiled(rcJob->GetJobEntry(), AZStd::move(rcJob->GetProcessJobResponse()));
            Q_EMIT JobStatusChanged(rcJob->GetJobEntry(), AzToolsFramework::AssetSystem::JobStatus::Completed);
        }
        
        // Move to Completed list which will mark as "completed"
        // unless a different state has been set.
        m_RCJobListModel.markAsCompleted(rcJob);

        if (!m_dispatchingPaused)
        {
            Q_EMIT ActiveJobsCountChanged(aznumeric_cast<unsigned int>(m_RCJobListModel.itemCount()));
        }

        if (!m_shuttingDown)
        {
            // Start next job only if we are not shutting down
            DispatchJobs();

            // if there is no next job, and nothing is in flight, we are done.
            if (IsIdle())
            {
                Q_EMIT BecameIdle();
            }
        }
    }

    bool RCController::IsIdle()
    {
        return ((!m_RCQueueSortModel.GetNextPendingJob()) && (m_RCJobListModel.jobsInFlight() == 0));
    }

    void RCController::JobSubmitted(JobDetails details)
    {
        AssetProcessor::QueueElementID checkFile(details.m_jobEntry.m_databaseSourceName, details.m_jobEntry.m_platformInfo.m_identifier.c_str(), details.m_jobEntry.m_jobKey);

        if (m_RCJobListModel.isInQueue(checkFile))
        {
            AZ_TracePrintf(AssetProcessor::DebugChannel, "Job is already in queue and has not started yet - ignored [%s, %s, %s]\n", checkFile.GetInputAssetName().toUtf8().data(), checkFile.GetPlatform().toUtf8().data(), checkFile.GetJobDescriptor().toUtf8().data());
            return;
        }

        if (m_RCJobListModel.isInFlight(checkFile))
        {
            // if the computed fingerprint is the same as the fingerprint of the in-flight job, this is okay.
            int existingJobIndex = m_RCJobListModel.GetIndexOfProcessingJob(checkFile);
            if (existingJobIndex != -1)
            {
                RCJob* job = m_RCJobListModel.getItem(existingJobIndex);
                bool cancelJob = false;

                if (job->GetJobEntry().m_computedFingerprint != details.m_jobEntry.m_computedFingerprint)
                {
                    AZ_TracePrintf(AssetProcessor::DebugChannel, "Cancelling Job [%s, %s, %s] with old FP %u, replacing with new FP %u \n", checkFile.GetInputAssetName().toUtf8().data(), checkFile.GetPlatform().toUtf8().data(), checkFile.GetJobDescriptor().toUtf8().data(), job->GetJobEntry().m_computedFingerprint, details.m_jobEntry.m_computedFingerprint);
                    cancelJob = true;
                }
                else if(!job->GetJobDependencies().empty())
                {
                    // If a job has dependencies, it's very likely it was re-queued as a result of a dependency being changed
                    // The in-flight job is probably going to fail at best, or use old data at worst, so cancel the in-flight job

                    AZ_TracePrintf(AssetProcessor::DebugChannel, "Cancelling Job with dependencies [%s, %s, %s], replacing with re-queued job\n", 
                        checkFile.GetInputAssetName().toUtf8().data(), checkFile.GetPlatform().toUtf8().data(), checkFile.GetJobDescriptor().toUtf8().data());
                    cancelJob = true;
                }
                else
                {
                    AZ_TracePrintf(AssetProcessor::DebugChannel, "Job is already in progress but has the same computed fingerprint (%u) - ignored [%s, %s, %s]\n", details.m_jobEntry.m_computedFingerprint,  checkFile.GetInputAssetName().toUtf8().data(), checkFile.GetPlatform().toUtf8().data(), checkFile.GetJobDescriptor().toUtf8().data());
                    return;
                }

                if(cancelJob)
                {
                    job->SetState(RCJob::JobState::cancelled);
                    AssetBuilderSDK::JobCommandBus::Event(job->GetJobEntry().m_jobRunKey, &AssetBuilderSDK::JobCommandBus::Events::Cancel);
                    m_RCJobListModel.UpdateRow(existingJobIndex);
                }
            }
        }

        RCJob* rcJob = new RCJob(&m_RCJobListModel);
        rcJob->Init(details); // note - move operation.  From this point on you must use the job details to refer to it.
        m_RCQueueSortModel.AddJobIdEntry(rcJob);
        m_RCJobListModel.addNewJob(rcJob);
        QString platformName = rcJob->GetPlatformInfo().m_identifier.c_str();// we need to get the actual platform from the rcJob
        if (rcJob->IsCritical())
        {
            int criticalJobsCount = m_pendingCriticalJobsPerPlatform[platformName.toLower()] + 1;
            m_pendingCriticalJobsPerPlatform[platformName.toLower()] = criticalJobsCount;
        }
        auto found = m_jobsCountPerPlatform.find(platformName);
        if (found != m_jobsCountPerPlatform.end())
        {
            int newCount = found.value() + 1;
            m_jobsCountPerPlatform[platformName] = newCount;
        }
        else
        {
            m_jobsCountPerPlatform[platformName] = 1;
        }
        Q_EMIT JobsInQueuePerPlatform(platformName, m_jobsCountPerPlatform[platformName]);
        Q_EMIT JobStatusChanged(rcJob->GetJobEntry(), AzToolsFramework::AssetSystem::JobStatus::Queued);

        if (!m_dispatchingPaused)
        {
            Q_EMIT ActiveJobsCountChanged(aznumeric_cast<unsigned int>(m_RCJobListModel.itemCount()));
        }

        // Start the job we just received if no job currently running
        if ((!m_shuttingDown) && (!m_dispatchingJobs))
        {
            DispatchJobs();
        }
    }

    void RCController::SetDispatchPaused(bool pause)
    {
        if (m_dispatchingPaused != pause)
        {
            m_dispatchingPaused = pause;
            if (!pause)
            {
                if ((!m_shuttingDown) && (!m_dispatchingJobs))
                {
                    DispatchJobs();
                    Q_EMIT ActiveJobsCountChanged(aznumeric_cast<unsigned int>(m_RCJobListModel.itemCount()));
                }
            }
        }
    }

    void RCController::DispatchJobsImpl()
    {
        m_dispatchJobsQueued = false;
        if (!m_dispatchingJobs)
        {
            m_dispatchingJobs = true;
            RCJob* rcJob = m_RCQueueSortModel.GetNextPendingJob();
            
            while (m_RCJobListModel.jobsInFlight() < m_maxJobs && rcJob && !m_shuttingDown)
            {
                if (m_dispatchingPaused)
                {
                    // note, even if dispatching is "paused" we start all "auto fail jobs" so that user gets instant feedback on failure.
                    if (!rcJob->IsAutoFail())
                    {
                        break;
                    }
                }

                StartJob(rcJob);
                rcJob = m_RCQueueSortModel.GetNextPendingJob();
            }
            m_dispatchingJobs = false;
        }
    }
    void RCController::DispatchJobs()
    {
        if (!m_dispatchJobsQueued)
        {
            m_dispatchJobsQueued = true;
            QMetaObject::invokeMethod(this, "DispatchJobsImpl", Qt::QueuedConnection);
        }
    }

    void RCController::OnRequestCompileGroup(AssetProcessor::NetworkRequestID groupID, QString platform, QString searchTerm, AZ::Data::AssetId assetId, bool isStatusRequest)
    {
        // someone has asked for a compile group to be created that conforms to that search term.
        // the goal here is to use a heuristic to find any assets that match the search term and place them in a new group
        // then respond with the appropriate response.

        // lets do some minimal processing on the search term
        AssetProcessor::JobIdEscalationList escalationList;
        QSet<AssetProcessor::QueueElementID> results;

        if (assetId.IsValid())
        {
            m_RCJobListModel.PerformUUIDSearch(assetId.m_guid, platform, results, escalationList, isStatusRequest);
        }
        else
        {
            m_RCJobListModel.PerformHeuristicSearch(AssetUtilities::NormalizeAndRemoveAlias(searchTerm), platform, results, escalationList, isStatusRequest);
        }

        if (results.isEmpty())
        {
            // nothing found
            Q_EMIT CompileGroupCreated(groupID, AzFramework::AssetSystem::AssetStatus_Unknown);

            AZ_TracePrintf(AssetProcessor::DebugChannel, "OnRequestCompileGroup:  %s - %s requested, but no matching source assets found.\n", searchTerm.toUtf8().constData(), assetId.ToString<AZStd::string>().c_str());
        }
        else
        {
            // it is not necessary to denote the search terms or list of results here because
            // PerformHeursticSearch already prints out the results.
            m_RCQueueSortModel.OnEscalateJobs(escalationList);
            
            m_activeCompileGroups.push_back(AssetCompileGroup());
            m_activeCompileGroups.back().m_groupMembers.swap(results);
            m_activeCompileGroups.back().m_requestID = groupID;

            Q_EMIT CompileGroupCreated(groupID, AzFramework::AssetSystem::AssetStatus_Queued);
        }
    }

    void RCController::OnEscalateJobsBySearchTerm(QString platform, QString searchTerm)
    {
        AssetProcessor::JobIdEscalationList escalationList;
        QSet<AssetProcessor::QueueElementID> results;
        m_RCJobListModel.PerformHeuristicSearch(AssetUtilities::NormalizeAndRemoveAlias(searchTerm), platform, results, escalationList, true);

        if (!results.isEmpty())
        {
            // it is not necessary to denote the search terms or list of results here because
            // PerformHeursticSearch already prints out the results.
            m_RCQueueSortModel.OnEscalateJobs(escalationList);
        }
        // do not print a warning out when this fails, its fine for things to escalate jobs as a matter of course just to "make sure" they are escalated
        // and its fine if none are in the build queue.
    }

    void RCController::OnEscalateJobsBySourceUUID(QString platform, AZ::Uuid sourceUuid)
    {
        AssetProcessor::JobIdEscalationList escalationList;
        QSet<AssetProcessor::QueueElementID> results;
        m_RCJobListModel.PerformUUIDSearch(sourceUuid, platform, results, escalationList, true);

        if (!results.isEmpty())
        {
            for (const AssetProcessor::QueueElementID& result : results)
            {
                AZ_TracePrintf(AssetProcessor::DebugChannel, "OnEscalateJobsBySourceUUID:  %s --> %s\n", sourceUuid.ToString<AZStd::string>().c_str(), result.GetInputAssetName().toUtf8().constData());
            }
            m_RCQueueSortModel.OnEscalateJobs(escalationList);
        }
        // do not print a warning out when this fails, its fine for things to escalate jobs as a matter of course just to "make sure" they are escalated
        // and its fine if none are in the build queue.
    }

    void RCController::OnJobComplete(JobEntry completeEntry, AzToolsFramework::AssetSystem::JobStatus state)
    {
        if (m_activeCompileGroups.empty())
        {
            return;
        }

        QueueElementID jobQueueId(completeEntry.m_databaseSourceName, completeEntry.m_platformInfo.m_identifier.c_str(), completeEntry.m_jobKey);

        // only the 'completed' status means success:
        bool statusSucceeded = (state == AzToolsFramework::AssetSystem::JobStatus::Completed);

        // start at the end so that we can actually erase the compile groups and not skip any:
        for (int groupIdx = m_activeCompileGroups.size() - 1; groupIdx >= 0; --groupIdx)
        {
            AssetCompileGroup& compileGroup = m_activeCompileGroups[groupIdx];
            auto it = compileGroup.m_groupMembers.find(jobQueueId);
            if (it != compileGroup.m_groupMembers.end())
            {
                compileGroup.m_groupMembers.erase(it);
                if ((compileGroup.m_groupMembers.isEmpty()) || (!statusSucceeded))
                {
                    // if we get here, we're either empty (and succeeded) or we failed one and have now failed
                    Q_EMIT CompileGroupFinished(compileGroup.m_requestID, statusSucceeded ? AzFramework::AssetSystem::AssetStatus_Compiled: AzFramework::AssetSystem::AssetStatus_Failed);
                    m_activeCompileGroups.removeAt(groupIdx);
                }
            }
        }
    }
    
    void RCController::RemoveJobsBySource(QString relSourceFileDatabaseName)
    {
        // some jobs may have not been started yet, these need to be removed manually
        AZStd::vector<RCJob*> pendingJobs;

        m_RCJobListModel.EraseJobs(relSourceFileDatabaseName, pendingJobs);

        // force finish all pending jobs
        for (auto* rcJob : pendingJobs)
        {
            FinishJob(rcJob);
        }
    }

    void RCController::OnAddedToCatalog(JobEntry jobEntry)
    {
        AssetProcessor::QueueElementID checkFile(jobEntry.m_databaseSourceName, jobEntry.m_platformInfo.m_identifier.c_str(), jobEntry.m_jobKey);

        m_RCJobListModel.markAsCataloged(checkFile);

        DispatchJobs();
    }

} // Namespace AssetProcessor

#include <native/resourcecompiler/rccontroller.moc>