/* * All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or * its licensors. * * For complete copyright and license terms please see the LICENSE at the root of this * distribution (the "License"). All use of this software is governed by the License, * or, if provided, by the license below or the license accompanying this file. Do not * remove or modify any license notices. This file is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * */ // Original file Copyright Crytek GMBH or its affiliates, used under license. // Description : Streaming Engine implementation #include "StdAfx.h" #include "StreamEngine.h" //CReadStream is 99% unused since the introduction of AZRequestReadStream. //In the near future CReadStream will disappear altogether and StreamReadStream cpp/h // won't be needed anymore. #include "StreamReadStream.h" #include "AZRequestReadStream.h" #include "../DiskProfiler.h" #include "../System.h" #include "IPlatformOS.h" #include <AzCore/IO/Streamer.h> #include <AzCore/std/sort.h> #include <AzFramework/IO/FileOperations.h> #include <AzFramework/Input/Devices/Keyboard/InputDeviceKeyboard.h> #define MAX_HEAVY_ASSETS 20 #if defined(STREAMENGINE_ENABLE_STATS) SStreamEngineStatistics* g_pStreamingStatistics = 0; #endif SStreamEngineOpenStats* g_pStreamingOpenStatistics = 0; extern CMTSafeHeap* g_pPakHeap; ////////////////////////////////////////////////////////////////////////// CStreamEngine::CStreamEngine() : AzFramework::InputChannelEventListener(AzFramework::InputChannelEventListener::GetPriorityDebug()) { m_nBatchMode = 0; m_bShutDown = false; m_bUseOpticalDriveThread = g_cvars.sys_streaming_use_optical_drive_thread != 0; m_nPausedDataTypesMask = 0; m_bStreamDataOnHDD = gEnv->pCryPak->IsInstalledToHDD(); #ifdef STREAMENGINE_ENABLE_STATS g_pStreamingStatistics = &m_Statistics; m_Statistics.nPendingReadBytes = 0; m_Statistics.nCurrentAsyncCount = 0; m_Statistics.nCurrentDecryptCount = 0; m_Statistics.nCurrentDecompressCount = 0; m_Statistics.nCurrentFinishedCount = 0; memset(&m_decompressStats, 0, sizeof(m_decompressStats)); m_nUnzipBandwidth = 0; m_nUnzipBandwidthAverage = 0; m_bStreamingStatsPaused = false; m_bInputCallback = false; m_bTempMemOutOfBudget = false; ClearStatistics(); #endif memset(&m_OpenStatistics, 0, sizeof(m_OpenStatistics)); g_pStreamingOpenStatistics = &m_OpenStatistics; #ifdef STREAMENGINE_ENABLE_LISTENER m_pListener = NULL; #endif StartThreads(); // register system listener GetISystem()->GetISystemEventDispatcher()->RegisterListener(this); } ////////////////////////////////////////////////////////////////////////// // MT: Main thread only CStreamEngine::~CStreamEngine() { #ifdef STREAMENGINE_ENABLE_STATS g_pStreamingStatistics = 0; if (m_bInputCallback) { AzFramework::InputChannelEventListener::Disconnect(); } #endif g_pStreamingOpenStatistics = NULL; Shutdown(); } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::BeginReadGroup() { CryInterlockedIncrement(&m_nBatchMode); } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::EndReadGroup() { CryInterlockedDecrement(&m_nBatchMode); for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { // New requests accomulated untill all Start stream requests are submitted and can be properly sorted. m_pThreadIO[i]->SignalStartWork(false); } } } AZ::IO::Request::PriorityType CStreamEngine::CryStreamPriorityToAZStreamPriority(EStreamTaskPriority cryPriority) { switch (cryPriority) { case estpUrgent: return AZ::IO::Request::PriorityType::DR_PRIORITY_CRITICAL; case estpAboveNormal: return AZ::IO::Request::PriorityType::DR_PRIORITY_ABOVE_NORMAL; case estpNormal: return AZ::IO::Request::PriorityType::DR_PRIORITY_NORMAL; } // estpBelowNormal = 4, // estpPreempted = 1, //For internal use only // estpIdle = 5, return AZ::IO::Request::PriorityType::DR_PRIORITY_BELOW_NORMAL; } AZStd::chrono::microseconds CStreamEngine::AZDeadlineFromReadParams(const StreamReadParams& params) { AZ::IO::Request::PriorityType azPriority = CryStreamPriorityToAZStreamPriority(params.ePriority); switch (azPriority) { case AZ::IO::Request::PriorityType::DR_PRIORITY_NORMAL: return AZ::IO::ExecuteWhenIdle; case AZ::IO::Request::PriorityType::DR_PRIORITY_CRITICAL: // Fallthrough case AZ::IO::Request::PriorityType::DR_PRIORITY_ABOVE_NORMAL: return AZStd::chrono::microseconds(params.nLoadTime * 1000); default: return AZStd::chrono::microseconds((params.nLoadTime + params.nMaxLoadTime) * 1000); } } ////////////////////////////////////////////////////////////////////////// // Starts asynchronous read from the specified file // It is expected that the callbacks are called from Main Thread only when // the async data loading is finished. IReadStreamPtr CStreamEngine::StartRead (const EStreamTaskType tSource, const char* szFilePath, IStreamCallback* pCallback, const StreamReadParams* pParams) { if (!szFilePath) { CryFatalError("Use of the stream engine without a file is deprecated! Use the job system."); return NULL; } if (gEnv->IsDedicated()) { CryWarning(VALIDATOR_MODULE_SYSTEM, VALIDATOR_WARNING, "Attempting to use the stream engine on a dedicated server! Don't do that!"); return NULL; } if (!m_bShutDown) { AZRequestReadStream* pStream = AZRequestReadStream::Allocate(tSource, szFilePath, pCallback, pParams); if (!pStream) { CryFatalError("Failed to create Request Stream for %s", szFilePath); return nullptr; } size_t offset = pParams ? pParams->nOffset : 0; AZStd::chrono::microseconds deadline = pParams ? AZDeadlineFromReadParams(*pParams) : AZStd::chrono::microseconds(0); AZ::IO::Request::PriorityType priority = pParams ? CryStreamPriorityToAZStreamPriority(pParams->ePriority) : AZ::IO::Request::PriorityType::DR_PRIORITY_CRITICAL; // Add a ref to stream before binding to the callback. Callback will release the reference when it's invoked. pStream->AddRef(); auto callback = [this, pStream](const AZStd::shared_ptr<AZ::IO::Request>&, AZ::IO::SizeType numBytesRead, void* buffer, AZ::IO::Request::StateType state) { QueueRequestCompleteJob(pStream, numBytesRead, buffer, state); // Release reference that was taken above in order to hold onto stream while job is queued pStream->Release(); }; // Register stream and start file request. IReadStreamPtr result = static_cast<IReadStreamPtr>(pStream); AZStd::shared_ptr<AZ::IO::Request> azRequest = AZ::IO::Streamer::Instance().CreateAsyncRead(szFilePath, offset, pStream->GetFileSize(), pStream->GetFileReadBuffer(), callback, deadline, priority); pStream->SetFileRequest(azRequest); AZ::IO::Streamer::Instance().QueueRequest(azRequest); return result; } return NULL; } //It is NOT necessary to schedule the callbacks on the main thread. //Regular async calls is OK. size_t CStreamEngine::StartBatchRead(IReadStreamPtr* pStreamsOut, const StreamReadBatchParams* pReqs, size_t numReqs, AZStd::function<void()>* preRequestCallback) { FUNCTION_PROFILER(GetISystem(), PROFILE_SYSTEM); size_t nValidStreams = 0; if (!m_bShutDown) { enum { MaxStreamsPerBatch = 32 }; size_t nReqIdx = 0; // we have requests to evaluate, call the callback before enqueing the requests if (numReqs > 0 && preRequestCallback != nullptr) { (*preRequestCallback)(); } while (numReqs > 0) { size_t nStreamsInBatch = 0; while (numReqs > 0 && nStreamsInBatch < MaxStreamsPerBatch) { const StreamReadBatchParams& args = pReqs[nReqIdx]; if (!args.szFile) { CryFatalError("Use of the stream engine without a file is deprecated! Use the job system."); } AZRequestReadStream* pStream; { FRAME_PROFILER_FAST("CStreamEngine::StartBatchRead_AllocReadStream", gEnv->pSystem, PROFILE_SYSTEM, gEnv->bProfilerEnabled); pStream = AZRequestReadStream::Allocate(args.tSource, args.szFile, args.pCallback, &args.params); } if (pStream) { pStreamsOut[nValidStreams++] = pStream; // Add a ref to stream before binding to the callback. Callback will release the reference when it's invoked. pStream->AddRef(); auto callback = [this, pStream](const AZStd::shared_ptr<AZ::IO::Request>&, AZ::IO::SizeType numBytesRead, void* buffer, AZ::IO::Request::StateType state) { QueueRequestCompleteJob(pStream, numBytesRead, buffer, state); // Release reference that was taken above in order to hold onto stream while job is queued pStream->Release(); }; AZStd::shared_ptr<AZ::IO::Request> azRequest = AZ::IO::Streamer::Instance().CreateAsyncRead(args.szFile, args.params.nOffset, pStream->GetFileSize(), pStream->GetFileReadBuffer(), callback, AZDeadlineFromReadParams(args.params), CryStreamPriorityToAZStreamPriority(args.params.ePriority)); pStream->SetFileRequest(azRequest); AZ::IO::Streamer::Instance().QueueRequest(azRequest); } else { CryFatalError("Failed to create Request Stream for %s at mip number %d", args.szFile, (int)nReqIdx); } --numReqs; ++nReqIdx; } } } return nValidStreams; } void CStreamEngine::QueueRequestCompleteJob(AZRequestReadStream* stream, AZ::IO::SizeType numBytesRead, void* buffer, AZ::IO::Request::StateType requestState) { // Some graphics APIs don't support multiple threads instancing resources such as textures. To work around this limitation // the jobs that complete a streaming request are queued and a previous request will kick off the next one. This will cause // only one job that finishes a streaming request to ever be active without causing mutexes to cause stalls in the job system. // Add a ref to stream before binding to the callback. Callback will release the reference when it's invoked. stream->AddRef(); auto jobFunction = [this, stream, numBytesRead, buffer, requestState]() { stream->OnRequestComplete(numBytesRead, buffer, requestState); // Release reference that was taken above in order to hold onto stream while job is queued stream->Release(); CryAutoLock<CryCriticalSection> lock(m_pendingRequestCompletionsLock); AZ_Assert(!m_pendingRequestCompletions.empty(), "CStreamEngine::QueueRequestCompleteJob expects at least one job in the queue as this is this is the job ran from the callback.") // The top request is always the one that's running, so pop that one of the queue and start any other pending jobs. m_pendingRequestCompletions.pop(); if (!m_pendingRequestCompletions.empty()) { m_pendingRequestCompletions.front()->Start(); } }; AZ::Job* job = AZ::CreateJobFunction(jobFunction, true, AZ::JobContext::GetGlobalContext()); CryAutoLock<CryCriticalSection> lock(m_pendingRequestCompletionsLock); if (m_pendingRequestCompletions.empty()) { m_pendingRequestCompletions.push(job); job->Start(); } else { m_pendingRequestCompletions.push(job); } } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::ResumePausedStreams_PauseLocked() { for (size_t i = 0; i < (size_t)m_pausedStreams.size(); ) { CReadStream* pStream = (CReadStream*)(IReadStream*)m_pausedStreams[i]; int nStreamMask = 1 << (uint32)pStream->m_Type; if (0 == (nStreamMask & m_nPausedDataTypesMask)) { if (pStream->GetError() == 0) // If was not aborted { // This stream must be resumed m_streams.insert(pStream); CAsyncIOFileRequest* pFileRequest = pStream->CreateFileRequest(); if (!StartFileRequest(pFileRequest)) { pFileRequest->Release(); } } m_pausedStreams.erase(m_pausedStreams.begin() + i); } else { i++; } } } ////////////////////////////////////////////////////////////////////////// bool CStreamEngine::StartFileRequest(CAsyncIOFileRequest* pFileRequest) { bool bStartImmidietly = m_nBatchMode == 0; EStreamSourceMediaType eMediaType = pFileRequest->GetMediaType(); bool bQueued = false; CStreamingIOThread* pIO = m_pThreadIO[0]; for (size_t i = 1; i < eIOThread_Last; ++i) { CStreamingIOThread* pAltIO = m_pThreadIO[i]; if (pAltIO && pAltIO->GetMediaType() == eMediaType) { pIO = pAltIO; break; } } if (pIO) { #ifdef STREAMENGINE_ENABLE_LISTENER if (m_pListener) { m_pListener->OnStreamEnqueue(pFileRequest, pFileRequest->m_strFileName.c_str(), pFileRequest->m_pReadStream->GetCallerType(), pFileRequest->m_pReadStream->GetParams()); } #endif pIO->AddRequest(pFileRequest, bStartImmidietly); bQueued = true; } if (!bQueued) { assert(0); // No IO thread. return false; } #ifdef STREAMENGINE_ENABLE_STATS m_Statistics.typeInfo[pFileRequest->m_eType].nTotalStreamingRequestCount++; if (g_cvars.sys_streaming_debug == 3) { const char* const sFileFilter = g_cvars.sys_streaming_debug_filter_file_name->GetString(); if (!pFileRequest->m_strFileName.empty() && !m_bStreamingStatsPaused) { if (!sFileFilter || !sFileFilter[0] || strstr(pFileRequest->m_strFileName.c_str(), sFileFilter)) { CryAutoCriticalSection lock(m_csStats); m_statsRequestList.insert(m_statsRequestList.begin(), pFileRequest); } } } #endif return true; } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::SignalToStartWork(EIOThread e, bool bForce) { if ((e >= 0) && (e < eIOThread_Last)) { if (m_pThreadIO[e]) { m_pThreadIO[e]->SignalStartWork(bForce); } } } ////////////////////////////////////////////////////////////////////////// #ifdef STREAMENGINE_ENABLE_STATS void UpdateIOThreadStats( SStreamEngineStatistics::SMediaTypeInfo* pNotInMemoryInfo, SStreamEngineStatistics::SMediaTypeInfo* pInMemoryInfo, CStreamingIOThread* pIOThread, float fSecSinceLastReset) { if (pNotInMemoryInfo == 0 || pIOThread == 0) { return; } // not in memory reading pNotInMemoryInfo->fActiveDuringLastSecond = pIOThread->m_NotInMemoryStats.m_fReadingDuringLastSecond; pNotInMemoryInfo->fAverageActiveTime = pIOThread->m_NotInMemoryStats.m_TotalReadTime.GetSeconds() / fSecSinceLastReset * 100; pNotInMemoryInfo->nBytesRead = pIOThread->m_NotInMemoryStats.m_nReadBytesInLastSecond; pNotInMemoryInfo->nRequestCount = pIOThread->m_NotInMemoryStats.m_nRequestCountInLastSecond; pNotInMemoryInfo->nTotalBytesRead = pIOThread->m_NotInMemoryStats.m_nTotalReadBytes; pNotInMemoryInfo->nTotalRequestCount = pIOThread->m_NotInMemoryStats.m_nTotalRequestCount; pNotInMemoryInfo->nSeekOffsetLastSecond = pIOThread->m_NotInMemoryStats.m_nReadOffsetInLastSecond; if (pIOThread->m_NotInMemoryStats.m_nTotalRequestCount > 0) { pNotInMemoryInfo->nAverageSeekOffset = pIOThread->m_NotInMemoryStats.m_nTotalReadOffset / pIOThread->m_NotInMemoryStats.m_nTotalRequestCount; } else { pNotInMemoryInfo->nAverageSeekOffset = 0; } pNotInMemoryInfo->nCurrentReadBandwidth = pIOThread->m_NotInMemoryStats.m_nCurrentReadBandwith; pNotInMemoryInfo->nSessionReadBandwidth = (uint32)(pNotInMemoryInfo->nTotalBytesRead / fSecSinceLastReset); pNotInMemoryInfo->nActualReadBandwidth = pIOThread->m_NotInMemoryStats.m_nActualReadBandwith; float fTotalReadTime = pIOThread->m_NotInMemoryStats.m_TotalReadTime.GetSeconds(); if (fTotalReadTime > 0.0f) { pNotInMemoryInfo->nAverageActualReadBandwidth = (uint32)(pNotInMemoryInfo->nTotalBytesRead / fTotalReadTime); } // in memory reading if (pInMemoryInfo) { pInMemoryInfo->nBytesRead = pIOThread->m_InMemoryStats.m_nReadBytesInLastSecond; pInMemoryInfo->nRequestCount = pIOThread->m_InMemoryStats.m_nRequestCountInLastSecond; pInMemoryInfo->nTotalBytesRead = pIOThread->m_InMemoryStats.m_nTotalReadBytes; pInMemoryInfo->nTotalRequestCount = pIOThread->m_InMemoryStats.m_nTotalRequestCount; } } #endif void CStreamEngine::Update(uint32 nUpdateTypesBitmask) { FUNCTION_PROFILER_LEGACYONLY(GetISystem(), PROFILE_SYSTEM); AZ_TRACE_METHOD(); LOADING_TIME_PROFILE_SECTION(gEnv->pSystem); // Dispatch completed callbacks. MainThread_FinalizeIOJobs(nUpdateTypesBitmask); } // Gets called regularly, to finalize those proxies whose jobs have // already been executed (e.g. to call the callbacks) // - to be called from the main thread only // - starts new jobs in the single-threaded model void CStreamEngine::Update() { FUNCTION_PROFILER(GetISystem(), PROFILE_SYSTEM); LOADING_TIME_PROFILE_SECTION(gEnv->pSystem); // Dispatch completed callbacks. MainThread_FinalizeIOJobs(); #ifdef STREAMENGINE_ENABLE_STATS if (g_cvars.sys_streaming_resetstats) { ClearStatistics(); g_cvars.sys_streaming_resetstats = 0; } CTimeValue t = gEnv->pTimer->GetAsyncTime(); if ((t - m_nLastBandwidthUpdateTime).GetMilliSecondsAsInt64() > 1000) { // Repeat every second. m_nUnzipBandwidth = m_decompressStats.m_tempUnzipTime.GetValue() == 0 ? 0 : (uint32)(m_decompressStats.m_nTempBytesUnziped / m_decompressStats.m_tempUnzipTime.GetSeconds()); m_nDecryptBandwidth = m_decompressStats.m_tempDecryptTime.GetValue() == 0 ? 0 : (uint32)(m_decompressStats.m_nTempBytesDecrypted / m_decompressStats.m_tempDecryptTime.GetSeconds()); m_nVerifyBandwidth = m_decompressStats.m_tempVerifyTime.GetValue() == 0 ? 0 : (uint32)(m_decompressStats.m_nTempBytesVerified / m_decompressStats.m_tempVerifyTime.GetSeconds()); m_decompressStats.m_tempUnzipTime.SetValue(0); m_decompressStats.m_nTempBytesUnziped = 0; m_decompressStats.m_tempDecryptTime.SetValue(0); m_decompressStats.m_nTempBytesDecrypted = 0; m_decompressStats.m_tempVerifyTime.SetValue(0); m_decompressStats.m_nTempBytesVerified = 0; m_nLastBandwidthUpdateTime = t; } if (m_decompressStats.m_totalUnzipTime.GetValue() != 0) { m_nUnzipBandwidthAverage = (uint32)(m_decompressStats.m_nTotalBytesUnziped / m_decompressStats.m_totalUnzipTime.GetSeconds()); } if (m_decompressStats.m_totalDecryptTime.GetValue() != 0) { m_nDecryptBandwidthAverage = (uint32)(m_decompressStats.m_nTotalBytesDecrypted / m_decompressStats.m_totalDecryptTime.GetSeconds()); } if (m_decompressStats.m_totalVerifyTime.GetValue() != 0) { m_nVerifyBandwidthAverage = (uint32)(m_decompressStats.m_nTotalBytesVerified / m_decompressStats.m_totalVerifyTime.GetSeconds()); } m_Statistics.nDecompressBandwidth = m_nUnzipBandwidth; m_Statistics.nDecryptBandwidth = m_nDecryptBandwidth; m_Statistics.nVerifyBandwidth = m_nVerifyBandwidth; m_Statistics.nDecompressBandwidthAverage = m_nUnzipBandwidthAverage; m_Statistics.nDecryptBandwidthAverage = m_nDecryptBandwidthAverage; m_Statistics.nVerifyBandwidthAverage = m_nVerifyBandwidthAverage; CTimeValue currentTime = gEnv->pTimer->GetAsyncTime(); CTimeValue timeSinceLastReset = currentTime - m_TimeOfLastReset; float fSecSinceLastReset = timeSinceLastReset.GetSeconds(); CTimeValue timeSinceLastUpdate = currentTime - m_TimeOfLastUpdate; // update the stats every second if (timeSinceLastUpdate.GetMilliSecondsAsInt64() > 1000) { UpdateIOThreadStats(&m_Statistics.hddInfo, &m_Statistics.memoryInfo, m_pThreadIO[eIOThread_HDD], fSecSinceLastReset); UpdateIOThreadStats(&m_Statistics.discInfo, 0, m_pThreadIO[eIOThread_Optical], fSecSinceLastReset); UpdateIOThreadStats(&m_Statistics.memoryInfo, 0, m_pThreadIO[eIOThread_InMemory], fSecSinceLastReset); SStreamEngineStatistics::SRequestTypeInfo totals; // update stats on all types for (int i = 0; i < eStreamTaskTypeCount; i++) { SStreamEngineStatistics::SRequestTypeInfo& info = m_Statistics.typeInfo[i]; if (info.nTotalStreamingRequestCount) { info.fAverageCompletionTime = info.fTotalCompletionTime / info.nTotalStreamingRequestCount; } else { info.fAverageCompletionTime = 0; } info.nSessionReadBandwidth = (uint32)(info.nTotalReadBytes / fSecSinceLastReset); info.nCurrentReadBandwidth = (uint32)(info.nTmpReadBytes / timeSinceLastUpdate.GetSeconds()); info.fAverageRequestCount = info.nTotalStreamingRequestCount / fSecSinceLastReset; totals.Merge(info); info.nTmpReadBytes = 0; } if (totals.nTotalStreamingRequestCount > 0) { m_Statistics.fAverageCompletionTime = totals.fTotalCompletionTime / totals.nTotalStreamingRequestCount; } m_Statistics.nTotalSessionReadBandwidth = (uint32)(totals.nTotalReadBytes / fSecSinceLastReset); m_Statistics.nTotalCurrentReadBandwidth = (uint32)(totals.nTmpReadBytes / timeSinceLastUpdate.GetSeconds()); m_Statistics.fAverageRequestCount = totals.nTotalStreamingRequestCount / fSecSinceLastReset; m_Statistics.nTotalRequestCount = totals.nTotalRequestCount; m_Statistics.nTotalStreamingRequestCount = totals.nTotalStreamingRequestCount; m_Statistics.nTotalBytesRead = totals.nTotalReadBytes; // update this flag only once a second to be sure it's visible in display info m_Statistics.bTempMemOutOfBudget = m_bTempMemOutOfBudget; m_bTempMemOutOfBudget = false; m_TimeOfLastUpdate = currentTime; } int nTmpAllocated = m_tempMem.m_nTempAllocatedMemoryFrameMax; m_Statistics.nMaxTempMemory = max(m_Statistics.nMaxTempMemory, nTmpAllocated); m_Statistics.nTempMemory = nTmpAllocated; m_tempMem.m_nTempAllocatedMemoryFrameMax = m_tempMem.m_nTempAllocatedMemory; if (m_Statistics.vecHeavyAssets.size() > MAX_HEAVY_ASSETS) { AZStd::sort(m_Statistics.vecHeavyAssets.begin(), m_Statistics.vecHeavyAssets.end()); m_Statistics.vecHeavyAssets.resize(MAX_HEAVY_ASSETS); } if (g_cvars.sys_streaming_debug) { DrawStatistics(); if (!m_bInputCallback) { AzFramework::InputChannelEventListener::Connect(); m_bInputCallback = true; } } #endif } ////////////////////////////////////////////////////////////////////////// // Only waits at most the specified amount of time for some IO to complete void CStreamEngine::UpdateAndWait(bool bAbortAll) { // for stream->Wait sync LOADING_TIME_PROFILE_SECTION(gEnv->pSystem); if (bAbortAll) { for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->AbortAll(bAbortAll); } } } while (!m_finishedStreams.empty() || !m_streams.empty()) { Update(); // In case we still have cancelled or aborted streams in the queue, // we wake the io threads here to ensure they are removed correctly; for (uint32 i = 0; i < (uint32)eIOThread_Last; ++i) { SignalToStartWork((EIOThread)i, true); } CrySleep(10); } if (bAbortAll) { for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->AbortAll(false); } } } } // In the Multi-Threaded model (with the IO Worker thread) // removes the proxies from the IO Queue as needed, and the proxies may call their callbacks void CStreamEngine::MainThread_FinalizeIOJobs(uint32 type) { static bool bNoReentrant = false; if (!bNoReentrant) { bNoReentrant = true; FUNCTION_PROFILER(GetISystem(), PROFILE_SYSTEM); LOADING_TIME_PROFILE_SECTION(gEnv->pSystem); #ifdef STREAMENGINE_ENABLE_STATS m_Statistics.nMainStreamingThreadWait = CryGetTicks(); #endif int nCount = 0; CryMT::vector<CReadStream_AutoPtr> finishedStreams; // Dispatch completed callbacks. CReadStream_AutoPtr pStream(0); while (m_finishedStreams.try_pop_front(pStream)) { if (pStream->m_Type & type) { pStream->MainThread_Finalize(); #ifdef STREAMENGINE_ENABLE_STATS // update statistics CryInterlockedDecrement(&m_Statistics.nCurrentFinishedCount); UpdateStatistics(pStream); #endif m_streams.erase(pStream); nCount++; // perform time slicing if requested if (g_cvars.sys_streaming_max_finalize_per_frame > 0 && nCount > g_cvars.sys_streaming_max_finalize_per_frame) { break; } } else { finishedStreams.push_back(pStream); } } bNoReentrant = false; while (finishedStreams.try_pop_front(pStream)) { m_finishedStreams.push_back(pStream); } #ifdef STREAMENGINE_ENABLE_STATS m_Statistics.nMainStreamingThreadWait = CryGetTicks() - m_Statistics.nMainStreamingThreadWait; #endif } } // In the Multi-Threaded model (with the IO Worker thread) // removes the proxies from the IO Queue as needed, and the proxies may call their callbacks void CStreamEngine::MainThread_FinalizeIOJobs() { static bool bNoReentrant = false; if (!bNoReentrant) { bNoReentrant = true; FUNCTION_PROFILER(GetISystem(), PROFILE_SYSTEM); LOADING_TIME_PROFILE_SECTION(gEnv->pSystem); #ifdef STREAMENGINE_ENABLE_STATS m_Statistics.nMainStreamingThreadWait = CryGetTicks(); #endif int nCount = 0; //Optim: swap finished streams out into a non MT vector //avoid expensive push / pop operations. m_tempFinishedStreams.clear(); m_finishedStreams.swap(m_tempFinishedStreams); int numFinishedStreams = m_tempFinishedStreams.size(); // Dispatch completed callbacks. for (int i = 0; i < numFinishedStreams; i++) { CReadStream_AutoPtr pStream = m_tempFinishedStreams[i]; //Check for a certain type of error that we need to handle in a TRC compliant way if (pStream->GetError() == ERROR_VERIFICATION_FAIL) { #if !defined(_RELEASE) CryWarning(VALIDATOR_MODULE_SYSTEM, VALIDATOR_COMMENT, "Stream error detected."); #endif //!_RELEASE } pStream->MainThread_Finalize(); #ifdef STREAMENGINE_ENABLE_STATS // update statistics CryInterlockedDecrement(&m_Statistics.nCurrentFinishedCount); UpdateStatistics(pStream); #endif m_streams.erase(pStream); nCount++; // AM: Optim, no longer support this behavior // perform time slicing if requested if (g_cvars.sys_streaming_max_finalize_per_frame > 0 && nCount > g_cvars.sys_streaming_max_finalize_per_frame) { CryLogAlways("sys_streaming_max_finalize_per_frame is now deprecated"); //break; } } m_tempFinishedStreams.clear(); bNoReentrant = false; #ifdef STREAMENGINE_ENABLE_STATS m_Statistics.nMainStreamingThreadWait = CryGetTicks() - m_Statistics.nMainStreamingThreadWait; #endif } } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::UpdateJobPriority(IReadStreamPtr pJobStream) { for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->NeedSorting(); } } } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::StopThreads() { for (int i = 0; i < eIOThread_Last; i++) { m_pThreadIO[i] = 0; } m_asyncCallbackThreads.clear(); m_tempMem.m_nWakeEvents = 0; } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::StartThreads() { StopThreads(); m_tempMem.m_nWakeEvents = 0; m_pThreadIO[eIOThread_HDD] = new CStreamingIOThread(this, eStreamSourceTypeHDD, "Streaming File IO HDD");//, 160); m_tempMem.m_wakeEvents[m_tempMem.m_nWakeEvents++] = &m_pThreadIO[eIOThread_HDD]->GetWakeEvent(); if (!(gEnv->IsDedicated())) { if (m_bUseOpticalDriveThread) { m_pThreadIO[eIOThread_Optical] = new CStreamingIOThread(this, eStreamSourceTypeDisc, "Streaming File IO Optical"); m_tempMem.m_wakeEvents[m_tempMem.m_nWakeEvents++] = &m_pThreadIO[eIOThread_Optical]->GetWakeEvent(); } m_pThreadIO[eIOThread_InMemory] = new CStreamingIOThread(this, eStreamSourceTypeMemory, "Streaming File IO InMemory"); m_tempMem.m_wakeEvents[m_tempMem.m_nWakeEvents++] = &m_pThreadIO[eIOThread_InMemory]->GetWakeEvent(); } // Initialise fallback thread matrix, needed for rescheduling for (int i = 0; i < eIOThread_Last; ++i) { if (!m_pThreadIO[i]) { continue; } for (int j = 0; j < eIOThread_Last; ++j) { if (i == j) { continue; } if (!m_pThreadIO[j]) { continue; } m_pThreadIO[i]->RegisterFallbackIOThread(m_pThreadIO[j]->GetMediaType(), m_pThreadIO[j]); } } // More decompress threads can be added here. m_asyncCallbackQueues.push_back(new SStreamRequestQueue); m_asyncCallbackThreads.push_back(new CStreamingWorkerThread(this, "Streaming AsyncCallback", CStreamingWorkerThread::eWorkerAsyncCallback, m_asyncCallbackQueues.back())); //m_asyncCallbackThreads.push_back( new CStreamingWorkerThread(this,"Streaming AsyncCallback Pak 1",CStreamingWorkerThread::eWorkerAsyncCallback, m_asyncCallbackQueues[eStreamTaskTypePak]) ); } //! Puts the memory statistics into the given sizer object //! According to the specifications in interface ICrySizer void CStreamEngine::GetMemoryStatistics(ICrySizer* pSizer) { SIZER_COMPONENT_NAME(pSizer, "CRefStreamEngine"); size_t nSize = sizeof(*this); pSizer->AddObject(this, nSize); } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::AbortJob(CReadStream* pStream) { if (m_finishedStreams.try_remove((CReadStream*)pStream)) { #ifdef STREAMENGINE_ENABLE_STATS CryInterlockedDecrement(&m_Statistics.nCurrentFinishedCount); #endif } { CryAutoLock<CryCriticalSection> pausedLock(m_pausedLock); if (!m_pausedStreams.empty()) { std::vector<CReadStream_AutoPtr>::iterator it = std::find(m_pausedStreams.begin(), m_pausedStreams.end(), pStream); if (it != m_pausedStreams.end()) { m_pausedStreams.erase(it); } } } m_streams.erase(pStream); } #if defined(STREAMENGINE_ENABLE_STATS) SStreamEngineStatistics& CStreamEngine::GetStreamingStatistics() { return m_Statistics; } #endif #ifdef STREAMENGINE_ENABLE_STATS void CStreamEngine::UpdateStatistics(CReadStream* pReadStream) { uint32 nBytesRead = pReadStream->m_nBytesRead; SStreamEngineStatistics::SRequestTypeInfo& info = m_Statistics.typeInfo[pReadStream->m_Type]; info.nTotalRequestCount++; // only add to stats if request was valid const string& name = pReadStream->GetName(); if (name.length() > 0) { info.nTotalReadBytes += nBytesRead; info.nTmpReadBytes += nBytesRead; info.nTotalRequestDataSize += pReadStream->m_Params.nSize; CTimeValue completionTime = gEnv->pTimer->GetAsyncTime() - pReadStream->GetRequestTime(); float fCompletionTime = completionTime.GetMilliSeconds(); info.fTotalCompletionTime += fCompletionTime; size_t splitter = name.find_last_of("."); if (splitter != string::npos) { string extension = name.substr(splitter + 1); TExtensionInfoMap::iterator findRes = m_PerExtensionInfo.find(extension); if (findRes == m_PerExtensionInfo.end()) { m_PerExtensionInfo[extension] = SExtensionInfo(); findRes = m_PerExtensionInfo.find(extension); } SExtensionInfo& extensionInfo = findRes->second; extensionInfo.m_fTotalReadTime += pReadStream->m_ReadTime.GetMilliSeconds(); extensionInfo.m_nTotalRequests++; extensionInfo.m_nTotalReadSize += nBytesRead; extensionInfo.m_nTotalRequestSize += pReadStream->m_Params.nSize; } } if (nBytesRead > 64 * 1024) { m_Statistics.vecHeavyAssets.push_back(SStreamEngineStatistics::SAsset(pReadStream->m_strFileName, nBytesRead)); } } #endif void CStreamEngine::Shutdown() { m_bShutDown = true; // make sure we don't have queued paused streams during shutdown for the audio system // or we can suffer from deadlocks uint32 nPauseMask = GetPauseMask(); uint32 nUnPauseMask = ~(nPauseMask & ~STREAM_TASK_TYPE_AUDIO_ALL); PauseStreaming(false, nUnPauseMask); PauseStreaming(true, nPauseMask); UpdateAndWait(true); CancelAll(); StopThreads(); m_streams.clear(); m_finishedStreams.clear(); // unregister system listener GetISystem()->GetISystemEventDispatcher()->RemoveListener(this); } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::CancelAll() { for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->BeginReset(); } } for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->EndReset(); } } for (size_t i = 0; i < m_asyncCallbackThreads.size(); ++i) { m_asyncCallbackThreads[i]->BeginReset(); } for (size_t i = 0; i < m_asyncCallbackThreads.size(); ++i) { m_asyncCallbackThreads[i]->EndReset(); } // make sure we don't check for canceled tasks when destroying the m_finishedStreams container m_streams.clear(); stl::free_container(m_finishedStreams); stl::free_container(m_tempFinishedStreams); { CryAutoLock<CryCriticalSection> lock(m_pausedLock); std::vector<CReadStream_AutoPtr> paused; paused.swap(m_pausedStreams); for (std::vector<CReadStream_AutoPtr>::iterator it = paused.begin(), itEnd = paused.end(); it != itEnd; ++it) { CReadStream* pStream = &**it; pStream->AbortShutdown(); } } CReadStream::Flush(); CAsyncIOFileRequest::Flush(); } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::ReportAsyncFileRequestComplete(CAsyncIOFileRequest_AutoPtr pFileRequest) { if (!pFileRequest->IsCancelled()) { #ifdef STREAMENGINE_ENABLE_LISTENER if (m_pListener) { m_pListener->OnStreamBeginAsyncCallback(&*pFileRequest); } #endif if (pFileRequest->m_pCallback) { pFileRequest->m_pCallback->OnAsyncFinished(pFileRequest); } if (pFileRequest->m_pReadStream) { CReadStream_AutoPtr pStream = (CReadStream*)(IReadStream*)pFileRequest->m_pReadStream; pStream->OnAsyncFileRequestComplete(); m_finishedStreams.push_back(pStream); #ifdef STREAMENGINE_ENABLE_STATS CryInterlockedIncrement(&m_Statistics.nCurrentFinishedCount); #endif } #ifdef STREAMENGINE_ENABLE_LISTENER if (m_pListener) { m_pListener->OnStreamEndAsyncCallback(&*pFileRequest); } #endif #ifdef STREAMENGINE_ENABLE_STATS if (g_cvars.sys_streaming_debug != 0) { if (g_cvars.sys_streaming_debug == 2 || g_cvars.sys_streaming_debug == 4) { const char* const sFileFilter = g_cvars.sys_streaming_debug_filter_file_name->GetString(); if (!pFileRequest->m_strFileName.empty() && !m_bStreamingStatsPaused) { if (!sFileFilter || !sFileFilter[0] || strstr(pFileRequest->m_strFileName.c_str(), sFileFilter)) { CryAutoCriticalSection lock(m_csStats); m_statsRequestList.insert(m_statsRequestList.begin(), pFileRequest); } } } } #endif } } ////////////////////////////////////////////////////////////////////////// const char* CStreamEngine::GetStreamTaskTypeName(EStreamTaskType type) { switch (type) { case eStreamTaskTypeMusic: return "Music"; case eStreamTaskTypeAnimation: return "Animation"; case eStreamTaskTypeGeometry: return "Geometry"; case eStreamTaskTypeSound: return "Sound"; case eStreamTaskTypeTexture: return "Texture"; case eStreamTaskTypeShader: return "Shader"; case eStreamTaskTypeTerrain: return "Terrain"; case eStreamTaskTypeVideo: return "Video"; case eStreamTaskTypeFlash: return "Flash"; case eStreamTaskTypePak: return "Pak"; case eStreamTaskTypeGeomCache: return "GeomCache"; case eStreamTaskTypeMergedMesh: return "MergedMesh"; } return ""; } SStreamJobEngineState CStreamEngine::GetJobEngineState() { m_tempMem.m_nTempMemoryBudget = g_cvars.sys_streaming_memory_budget * 1024; SStreamJobEngineState state; state.pReportQueues = &m_asyncCallbackQueues; #ifdef STREAMENGINE_ENABLE_STATS state.pStats = &m_Statistics; state.pDecompressStats = &m_decompressStats; #endif state.pHeap = g_pPakHeap; state.pTempMem = &m_tempMem; return state; } #ifdef STREAMENGINE_ENABLE_STATS void CStreamEngine::GetBandwidthStats(EStreamTaskType type, float* bandwidth) { *bandwidth = m_Statistics.typeInfo[type].nCurrentReadBandwidth / 1024.0f; } #endif void CStreamEngine::GetStreamingOpenStatistics(SStreamEngineOpenStats& openStatsOut) { openStatsOut = m_OpenStatistics; } #ifdef STREAMENGINE_ENABLE_LISTENER void CStreamEngine::SetListener(IStreamEngineListener* pListener) { m_pListener = pListener; } #endif #ifdef STREAMENGINE_ENABLE_LISTENER IStreamEngineListener* CStreamEngine::GetListener() { return m_pListener; } #endif ////////////////////////////////////////////////////////////////////////// void* CStreamEngine::TempAlloc(size_t nSize, const char* szDbgSource, bool bFallBackToMalloc, bool bUrgent, uint32 align) { return m_tempMem.TempAlloc(g_pPakHeap, nSize, szDbgSource, bFallBackToMalloc, bUrgent, align); } void CStreamEngine::TempFree(void* p, size_t nSize) { m_tempMem.TempFree(g_pPakHeap, p, nSize); } namespace { #ifdef STREAMENGINE_ENABLE_STATS void DrawText(const float x, const float y, ColorF c, const char* format, ...) { va_list args; va_start(args, format); SDrawTextInfo ti; ti.flags = eDrawText_FixedSize | eDrawText_2D | eDrawText_Monospace; ti.xscale = ti.yscale = 1.2f; ti.color[0] = c.r; ti.color[1] = c.g; ti.color[2] = c.b; ti.color[3] = c.a; gEnv->pRenderer->DrawTextQueued(Vec3(x, y, 1.0f), ti, format, args); va_end(args); } #endif void WriteToStreamingLog(const char* str) { #ifdef STREAMENGINE_ENABLE_STATS if (g_cvars.sys_streaming_debug == 4) { // ignore invalid file access when logging steaming data CDebugAllowFileAccess ignoreInvalidFileAccess; static string sFileName; static bool bFirstTime = true; if (bFirstTime) { char path[ICryPak::g_nMaxPath]; path[sizeof(path) - 1] = 0; gEnv->pCryPak->AdjustFileName("@cache@\\TestResults\\StreamingLog.txt", path, AZ_ARRAY_SIZE(path), ICryPak::FLAGS_PATH_REAL | ICryPak::FLAGS_FOR_WRITING); sFileName = path; } AZ::IO::HandleType fileHandle = fxopen(sFileName, (bFirstTime) ? "wt" : "at"); bFirstTime = false; if (fileHandle != AZ::IO::InvalidHandle) { AZ::IO::Print(fileHandle, "%s\n", str); gEnv->pFileIO->Close(fileHandle); } } #endif } } #ifdef STREAMENGINE_ENABLE_STATS ////////////////////////////////////////////////////////////////////////// void CStreamEngine::DrawStatistics() { std::vector<CAsyncIOFileRequest_AutoPtr> tempRequests; if (g_cvars.sys_streaming_debug == 4) { float tx = 0; float ty = 30; float ystep = 12.0f; ColorF clText(1, 0, 0, 1); DrawText(tx, ty += ystep, clText, "Recording streaming stats to file ..."); { CryAutoCriticalSection lock(m_csStats); tempRequests.swap(m_statsRequestList); } const char* const sFileFilter = g_cvars.sys_streaming_debug_filter_file_name->GetString(); if (!tempRequests.empty()) { for (int i = (int)tempRequests.size() - 1; i >= 0; i--) { CAsyncIOFileRequest* pFileRequest = tempRequests[i]; if (g_cvars.sys_streaming_debug_filter > 0 && pFileRequest->m_eType != g_cvars.sys_streaming_debug_filter) { continue; } if (g_cvars.sys_streaming_debug_filter == -1 && pFileRequest->m_eMediaType == eStreamSourceTypeMemory) { continue; } if (g_cvars.sys_streaming_debug_filter_min_time && (pFileRequest->m_readTime.GetMilliSeconds() < (float)g_cvars.sys_streaming_debug_filter_min_time)) { continue; } if (sFileFilter && sFileFilter[0] && !strstr(pFileRequest->m_strFileName.c_str(), sFileFilter)) { continue; } const char* sFlags = (pFileRequest->m_eMediaType == eStreamSourceTypeHDD) ? "HDD" : ((pFileRequest->m_eMediaType == eStreamSourceTypeMemory) ? "mem" : "DVD"); const char* sPriority = ""; switch (pFileRequest->m_ePriority) { case estpUrgent: sPriority = " Urgent"; break; case estpNormal: sPriority = " Normal"; break; case estpIdle: sPriority = " Idle"; break; case estpPreempted: sPriority = " Preempted"; break; case estpBelowNormal: sPriority = "BelowNormal"; break; case estpAboveNormal: sPriority = "AboveNormal"; break; default: sPriority = " Unknown"; break; } string str; str.Format("[N%6d] [%+8d] [%8d] [%6.2f ms] (%5d|%5d) [%5.3fs] <%3d> <%s> <%s> <%s> %s:", pFileRequest->m_nReadCounter, pFileRequest->m_nReadHeadOffsetKB, pFileRequest->m_nDiskOffset >> 10, pFileRequest->m_readTime.GetMilliSeconds(), pFileRequest->m_nSizeOnMedia / 1024, ((pFileRequest->m_nRequestedSize) ? pFileRequest->m_nRequestedSize : pFileRequest->m_nFileSize) / 1024, (pFileRequest->m_completionTime - pFileRequest->m_startTime).GetSeconds(), pFileRequest->m_nTimeGroup, sPriority, sFlags, pFileRequest->m_pakFile.c_str(), pFileRequest->m_strFileName.c_str()); WriteToStreamingLog(str.c_str()); } } return; } { CryAutoCriticalSection lock(m_csStats); tempRequests = m_statsRequestList; size_t nMaxRequests = g_cvars.sys_streaming_debug_filter_min_time ? 1000 : 100; if (m_statsRequestList.size() > nMaxRequests) { m_statsRequestList.resize(nMaxRequests); } } std::vector<CAsyncIOFileRequest_AutoPtr>& requests = tempRequests; stack_string temp; float tx = 0; float ty = 30; float ystep = 12.0f; float xColumn = 80; ColorF clText(0, 1, 1, 1); SStreamEngineStatistics& stats = m_Statistics; SStreamEngineOpenStats openStats = m_OpenStatistics; const char* sMediaType = m_bStreamDataOnHDD ? "HDD" : "DVD"; const char* sStatus = (m_bStreamingStatsPaused) ? "Paused" : ""; DrawText(tx, ty += ystep, clText, "Streaming IO: %.2f|%.2fMB/s, ACT: %3dmsec, Unzip: %.2fMB/s, Decrypt: %.2fMB/s, Verify: %.2fMB/s, Jobs:%5d (%4d) %s %s", (float)stats.nTotalCurrentReadBandwidth / (1024 * 1024), (float)stats.nTotalSessionReadBandwidth / (1024 * 1024), (uint32)stats.fAverageCompletionTime, (float)stats.nDecompressBandwidth / (1024 * 1024), (float)stats.nDecryptBandwidth / (1024 * 1024), (float)stats.nVerifyBandwidth / (1024 * 1024), (uint32)stats.nTotalStreamingRequestCount, (uint32)(stats.nTotalRequestCount - stats.nTotalStreamingRequestCount), sMediaType, sStatus); DrawText(tx, ty += ystep, clText, "\t Request: Active:%2d (%2.1fMB) Live:%2d Decrypt:%2d Decompress:%2d Async:%2d Finished:%2d Temp Pool Max:%2.1fMB", openStats.nOpenRequestCount, (float)stats.nPendingReadBytes / (1024 * 1024), CAsyncIOFileRequest::s_nLiveRequests, stats.nCurrentDecryptCount, stats.nCurrentDecompressCount, stats.nCurrentAsyncCount, stats.nCurrentFinishedCount, (float)stats.nMaxTempMemory / (1024 * 1024)); ty += ystep; // HDD stats if (stats.hddInfo.nTotalRequestCount > 0) { DrawText(tx, ty += ystep, clText, "HDD : Request: %3d|%5d (%4d MB|%3d KB) - BW: %1.2f|%1.2f Mb/s (Eff: %2.1f|%2.1f Mb/s) \n", stats.hddInfo.nRequestCount, stats.hddInfo.nTotalRequestCount, (uint32)(stats.hddInfo.nTotalBytesRead / (1024 * 1024)), (uint32)(stats.hddInfo.nTotalBytesRead / (1024 * stats.hddInfo.nTotalRequestCount)), (float)stats.hddInfo.nCurrentReadBandwidth / (1024 * 1024), (float)stats.hddInfo.nSessionReadBandwidth / (1024 * 1024), (float)stats.hddInfo.nActualReadBandwidth / (1024 * 1024), (float)stats.hddInfo.nAverageActualReadBandwidth / (1024 * 1024)); DrawText(tx, ty += ystep, clText, "\t Seek: %1.2f GB - Active: %2.1f%%(%2.1f%%)", (float)stats.hddInfo.nAverageSeekOffset / (1024 * 1024), stats.hddInfo.fActiveDuringLastSecond, stats.hddInfo.fAverageActiveTime); } // Optical stats if (stats.discInfo.nTotalRequestCount > 0) { DrawText(tx, ty += ystep, clText, "Disc: Request: %3d|%5d (%4d MB|%3d KB) - BW: %1.2f|%1.2f Mb/s (Eff: %2.1f|%2.1f Mb/s) \n", stats.discInfo.nRequestCount, stats.discInfo.nTotalRequestCount, (uint32)(stats.discInfo.nTotalBytesRead / (1024 * 1024)), (uint32)(stats.discInfo.nTotalBytesRead / (1024 * stats.discInfo.nTotalRequestCount)), (float)stats.discInfo.nCurrentReadBandwidth / (1024 * 1024), (float)stats.discInfo.nSessionReadBandwidth / (1024 * 1024), (float)stats.discInfo.nActualReadBandwidth / (1024 * 1024), (float)stats.discInfo.nAverageActualReadBandwidth / (1024 * 1024)); DrawText(tx, ty += ystep, clText, "\t Seek: %1.2f GB - Active: %2.1f%%(%2.1f%%)", (float)stats.discInfo.nAverageSeekOffset / (1024 * 1024), stats.discInfo.fActiveDuringLastSecond, stats.discInfo.fAverageActiveTime); } DrawText(tx, ty += ystep, clText, "Mem : Request: %3d|%5d (%4d MB)", stats.memoryInfo.nRequestCount, stats.memoryInfo.nTotalRequestCount, (stats.memoryInfo.nTotalBytesRead / (1024 * 1024))); ty += ystep; for (int i = eStreamTaskTypeCount - 1; i >= 1; i--) { EStreamTaskType eTaskType = (EStreamTaskType)i; SStreamEngineStatistics::SRequestTypeInfo info = stats.typeInfo[eTaskType]; if (g_cvars.sys_streaming_debug > 1 || info.nTotalRequestCount > 0) { DrawText(tx, ty += ystep, clText, "%9s: BSize:%3dKb Read:%4dMb BW:%1.2f|%1.2f Mb/s ACT:%5dms %2d(%2.1fMB)|%5d", gEnv->pSystem->GetStreamEngine()->GetStreamTaskTypeName(eTaskType), (uint32)(info.nTotalReadBytes / max((uint32)1, info.nTotalStreamingRequestCount) / 1024), (uint32)(info.nTotalReadBytes / (1024 * 1024)), (float)info.nCurrentReadBandwidth / (1024 * 1024), (float)info.nSessionReadBandwidth / (1024 * 1024), (uint32)info.fAverageCompletionTime, openStats.nOpenRequestCountByType[eTaskType], (float)info.nPendingReadBytes / (1024 * 1024), (uint32)info.nTotalStreamingRequestCount); } } if (g_cvars.sys_streaming_debug == 5) { ty += ystep; ty += ystep; DrawText(tx, ty += ystep, clText, "Name | Time(s) | Size(Kb) | Read(Mb) | ReqS(Mb) | Count"); for (TExtensionInfoMap::iterator it = m_PerExtensionInfo.begin(); it != m_PerExtensionInfo.end(); ++it) { SExtensionInfo& extensionInfo = it->second; DrawText(tx, ty += ystep, clText, "%4s | %7.3f | %8d | %8.3f | %8.3f | %5d", it->first.c_str(), extensionInfo.m_fTotalReadTime / 1000, (uint32)(extensionInfo.m_nTotalReadSize / max((size_t)1, extensionInfo.m_nTotalRequests) / 1024), extensionInfo.m_nTotalReadSize / (1024.0f * 1024.0f), extensionInfo.m_nTotalRequestSize / (1024.0f * 1024.0f), extensionInfo.m_nTotalRequests); } } else if (g_cvars.sys_streaming_debug > 1) { ty += ystep; DrawText(tx, ty += ystep, clText, "[Offset KB]"); DrawText(tx + xColumn, ty, clText, "[io ms]\t(read | size) [t sec] [Grp] < Priority> <Disk> Filename"); ty += ystep; const char* const sFileFilter = g_cvars.sys_streaming_debug_filter_file_name->GetString(); for (size_t i = 0, nRequests = requests.size(); i < nRequests; i++) { CAsyncIOFileRequest* pFileRequest = requests[i]; if (g_cvars.sys_streaming_debug_filter > 0 && pFileRequest->m_eType != g_cvars.sys_streaming_debug_filter) { continue; } if (g_cvars.sys_streaming_debug_filter == -1 && pFileRequest->m_eMediaType == eStreamSourceTypeMemory) { continue; } if (g_cvars.sys_streaming_debug_filter_min_time && (pFileRequest->m_readTime.GetMilliSeconds() < (float)g_cvars.sys_streaming_debug_filter_min_time)) { continue; } if (sFileFilter != 0 && sFileFilter[0] && !strstr(pFileRequest->m_strFileName.c_str(), sFileFilter)) { continue; } { float fMillis = pFileRequest->m_readTime.GetMilliSeconds(); const char* sFlags = ""; switch (pFileRequest->m_eMediaType) { case eStreamSourceTypeHDD: sFlags = "HDD"; break; case eStreamSourceTypeDisc: sFlags = "DVD"; break; case eStreamSourceTypeMemory: sFlags = "MEM"; break; } const char* sPriority = ""; switch (pFileRequest->m_ePriority) { case estpUrgent: sPriority = " Urgent"; break; case estpNormal: sPriority = " Normal"; break; case estpIdle: sPriority = " Idle"; break; case estpPreempted: sPriority = " Preempted"; break; case estpBelowNormal: sPriority = "BelowNormal"; break; case estpAboveNormal: sPriority = "AboveNormal"; break; default: sPriority = " Unknown"; break; } uint32 nRequestedSize = (pFileRequest->m_nRequestedSize != 0) ? pFileRequest->m_nRequestedSize : pFileRequest->m_nFileSize; ////////////////////////////////////////////////////////////////////////// ColorF colOffset; if (pFileRequest->m_nReadHeadOffsetKB >= 0) { colOffset = ColorF (0, 1, 0, 1); // Green if (pFileRequest->m_nReadHeadOffsetKB > 32) { colOffset = ColorF (0.5f, 1.f, 0, 1.f); // Cyan } } else { colOffset = ColorF (1, 0, 0, 1); // Red } if (pFileRequest->m_eMediaType != eStreamSourceTypeMemory) { DrawText(tx, ty, colOffset, "[%+d]", pFileRequest->m_nReadHeadOffsetKB); } ////////////////////////////////////////////////////////////////////////// DrawText(tx + xColumn, ty, clText, "[%6.2f]\t(%5d|%5d) [%5.2f] [%3d] <%s> <%s>\t%s", fMillis, pFileRequest->m_nSizeOnMedia / 1024, nRequestedSize / 1024, (pFileRequest->m_completionTime - pFileRequest->m_startTime).GetSeconds(), pFileRequest->m_nTimeGroup, sPriority, sFlags, pFileRequest->m_strFileName.c_str()); ty += ystep; } } } } #endif //STREAMENGINE_ENABLE_STATS #ifdef STREAMENGINE_ENABLE_STATS void CStreamEngine::ClearStatistics() { m_TimeOfLastReset = gEnv->pTimer->GetAsyncTime(); m_TimeOfLastUpdate = m_TimeOfLastReset; m_Statistics.hddInfo.ResetStats(); m_Statistics.discInfo.ResetStats(); m_PerExtensionInfo.clear(); m_Statistics.nDecompressBandwidth = 0; m_Statistics.nDecryptBandwidth = 0; m_Statistics.nVerifyBandwidth = 0; m_Statistics.nDecompressBandwidthAverage = 0; m_Statistics.nDecryptBandwidthAverage = 0; m_Statistics.nVerifyBandwidthAverage = 0; m_Statistics.nTotalBytesRead = 0; m_Statistics.nTotalRequestCount = 0; m_Statistics.nTotalStreamingRequestCount = 0; m_Statistics.nMaxTempMemory = 0; m_Statistics.fAverageCompletionTime = 0; for (int i = 0; i < eStreamTaskTypeCount; i++) { m_Statistics.typeInfo[i].ResetStats(); } m_Statistics.vecHeavyAssets.clear(); for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->m_InMemoryStats.Reset(); m_pThreadIO[i]->m_NotInMemoryStats.Reset(); } } } #endif ////////////////////////////////////////////////////////////////////////// bool CStreamEngine::OnInputChannelEventFiltered(const AzFramework::InputChannel& inputChannel) { #ifdef STREAMENGINE_ENABLE_STATS if (g_cvars.sys_streaming_debug) { if (inputChannel.GetInputChannelId() == AzFramework::InputDeviceKeyboard::Key::Function11) { m_bStreamingStatsPaused = true; } if (inputChannel.GetInputChannelId() == AzFramework::InputDeviceKeyboard::Key::Function12) { m_bStreamingStatsPaused = false; } } #endif return false; } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::OnSystemEvent(ESystemEvent event, UINT_PTR wparam, UINT_PTR lparam) { switch (event) { case ESYSTEM_EVENT_GAME_POST_INIT_DONE: { // unpause the streaming engine, when init phase is done PauseStreaming(false, -1); break; } case ESYSTEM_EVENT_LEVEL_LOAD_PREPARE: #if defined(STREAMENGINE_ENABLE_STATS) ClearStatistics(); #endif WriteToStreamingLog("*LEVEL_LOAD_PREPARE"); break; case ESYSTEM_EVENT_LEVEL_LOAD_START: { WriteToStreamingLog("*LEVEL_LOAD_START"); break; } case ESYSTEM_EVENT_LEVEL_LOAD_END: { WriteToStreamingLog("*LEVEL_LOAD_END"); break; } case ESYSTEM_EVENT_LEVEL_PRECACHE_START: { WriteToStreamingLog("*LEVEL_LOAD_PRECACHE_START"); break; } case ESYSTEM_EVENT_LEVEL_PRECACHE_END: { WriteToStreamingLog("*LEVEL_LOAD_PRECACHE_END"); break; } case ESYSTEM_EVENT_LEVEL_UNLOAD: { UpdateAndWait(true); CancelAll(); #if defined(STREAMENGINE_ENABLE_STATS) ClearStatistics(); #endif break; } case ESYSTEM_EVENT_LEVEL_POST_UNLOAD: { UpdateAndWait(true); CancelAll(); #if defined(STREAMENGINE_ENABLE_STATS) ClearStatistics(); #endif } break; case ESYSTEM_EVENT_FAST_SHUTDOWN: { Shutdown(); break; } } } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::PauseStreaming(bool bPause, uint32 nPauseTypesBitmask) { CryAutoLock<CryCriticalSection> pausedLock(m_pausedLock); if (bPause) { m_nPausedDataTypesMask |= nPauseTypesBitmask; } else { m_nPausedDataTypesMask &= ~nPauseTypesBitmask; ResumePausedStreams_PauseLocked(); } } ////////////////////////////////////////////////////////////////////////// void CStreamEngine::PauseIO(bool bPause) { for (int i = 0; i < eIOThread_Last; i++) { if (m_pThreadIO[i]) { m_pThreadIO[i]->Pause(bPause); } } } //////////////////////////////////////////////////////////////////////////