/* * All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or * its licensors. * * For complete copyright and license terms please see the LICENSE at the root of this * distribution (the "License"). All use of this software is governed by the License, * or, if provided, by the license below or the license accompanying this file. Do not * remove or modify any license notices. This file is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * */ // Original file Copyright Crytek GMBH or its affiliates, used under license. // Description : Streaming Thread for IO #include #include "StreamIOThread.h" #include "StreamEngine.h" #include "../System.h" #include "IPlatformOS.h" extern SSystemCVars g_cvars; //#pragma("control %push O=0") // to disable optimization ////////////////////////////////////////////////////////////////////////// CStreamingIOThread::CStreamingIOThread(CStreamEngine* pStreamEngine, EStreamSourceMediaType mediaType, const char* name) { m_pStreamEngine = pStreamEngine; m_bCancelThreadRequest = false; m_bNeedSorting = false; m_bNeedReset = false; m_bNewRequests = false; m_name = name; m_eMediaType = mediaType; m_nFallbackMTs = 0; m_iUrgentRequests = 0; m_bPaused = false; m_bAbortReads = false; m_nReadCounter = 0; m_nStreamingCPU = -1; Start((unsigned)(1 << g_cvars.sys_streaming_cpu), name); } CStreamingIOThread::~CStreamingIOThread() { Cancel(); Stop(); WaitForThread(); } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::AddRequest(CAsyncIOFileRequest* pRequest, bool bStartImmidietly) { pRequest->AddRef(); // Acquire ownership on file request. pRequest->m_status = CAsyncIOFileRequest::eStatusInFileQueue; if (pRequest->m_eMediaType != eStreamSourceTypeMemory) { pRequest->m_eMediaType = m_eMediaType; } // does this ignore the tmp out of memory if (pRequest->IgnoreOutofTmpMem()) { CryInterlockedIncrement(&m_iUrgentRequests); } m_newFileRequests.push_back(pRequest); if (bStartImmidietly) { READ_WRITE_BARRIER m_bNewRequests = true; m_awakeEvent.Set(); } } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::SignalStartWork(bool bForce) { if (!m_newFileRequests.empty() || bForce) { READ_WRITE_BARRIER m_bNewRequests = true; m_awakeEvent.Set(); } } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::Pause(bool bPause) { m_bPaused = bPause; } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::Run() { SetName(m_name); CTimeValue t0 = gEnv->pTimer->GetAsyncTime(); m_nLastReadDiskOffset = 0; // // Main thread loop while (!m_bCancelThreadRequest) { if (m_nStreamingCPU != g_cvars.sys_streaming_cpu) { m_nStreamingCPU = g_cvars.sys_streaming_cpu; #if defined(AZ_RESTRICTED_PLATFORM) #undef AZ_RESTRICTED_SECTION #define STREAMIOTHREAD_CPP_SECTION_1 1 #define STREAMIOTHREAD_CPP_SECTION_2 2 #endif #if defined(AZ_RESTRICTED_PLATFORM) #define AZ_RESTRICTED_SECTION STREAMIOTHREAD_CPP_SECTION_1 #if defined(AZ_PLATFORM_XENIA) #include "Xenia/StreamIOThread_cpp_xenia.inl" #elif defined(AZ_PLATFORM_PROVO) #include "Provo/StreamIOThread_cpp_provo.inl" #elif defined(AZ_PLATFORM_SALEM) #include "Salem/StreamIOThread_cpp_salem.inl" #endif #endif } if (m_bNewRequests || !m_newFileRequests.empty()) { READ_WRITE_BARRIER ProcessNewRequests(); } else { #if defined(_RELEASE) m_awakeEvent.Wait(); #elif defined(STREAMENGINE_ENABLE_STATS) // compute max time to wait - revive thread every second at least once to update stats bool bWaiting = true; while (bWaiting) { CTimeValue t1 = gEnv->pTimer->GetAsyncTime(); CTimeValue deltaT = t1 - t0; uint64 msec = deltaT.GetMilliSecondsAsInt64(); if (msec < 1000) { bWaiting = !m_awakeEvent.Wait(1000 - (uint32)msec); } if (bWaiting) { // update the delta time again t1 = gEnv->pTimer->GetAsyncTime(); deltaT = t1 - t0; m_InMemoryStats.Update(deltaT); m_NotInMemoryStats.Update(deltaT); t0 = t1; } } #endif } if (m_bNeedReset) { ProcessReset(); } bool bIsOOM = false; while (!m_bCancelThreadRequest && !m_fileRequestQueue.empty()) { CAsyncIOFileRequest_TransferPtr pFileRequest(m_fileRequestQueue.back()); m_fileRequestQueue.pop_back(); assert (&*pFileRequest); if (pFileRequest->HasFailed()) { // check if request was high prio, then decr open count if (pFileRequest->IgnoreOutofTmpMem()) { CryInterlockedDecrement(&m_iUrgentRequests); } CAsyncIOFileRequest::JobFinalize_Read(pFileRequest, m_pStreamEngine->GetJobEngineState()); continue; } ////////////////////////////////////////////////////////////////////////// // When temporary memory goes out of budget we must loop here and wait until previous file requests are finished and free up memory. // Only allow processing of requests which are flagged for processing when out of tmp memory ////////////////////////////////////////////////////////////////////////// if (bIsOOM && !m_bCancelThreadRequest) { m_pStreamEngine->FlagTempMemOutOfBudget(); if (m_iUrgentRequests > 0) { if (m_bNewRequests || !m_newFileRequests.empty()) { READ_WRITE_BARRIER ProcessNewRequests(); } // readd the current request m_fileRequestQueue.push_back(pFileRequest.Relinquish()); // search for the first request which ignores the current out of mem state // Search for next highest priority request std::vector::reverse_iterator rit; for (rit = m_fileRequestQueue.rbegin(); rit != m_fileRequestQueue.rend(); ++rit) { if ((*rit)->IgnoreOutofTmpMem()) { pFileRequest = *rit; std::vector::iterator it(rit.base()); --it; m_fileRequestQueue.erase(it); break; } } } else { // read the current request m_fileRequestQueue.push_back(pFileRequest.Relinquish()); } } // Simply let the io thread sleep when paused before doing any actual IO while (m_bPaused) { CrySleep(10); } // If at this point, the filerequest is zero, the above prioritization of // urgent requests couldn't find a new task to displace the current // one. As the current one had been pushed back previously, we can safely // assume that restarting the loop will grab it again (eventually). if (!pFileRequest) { break; } // check if request was high prio, then decr open count if (pFileRequest->IgnoreOutofTmpMem()) { CryInterlockedDecrement(&m_iUrgentRequests); } bIsOOM = false; uint32 nSizeOnMedia = pFileRequest->m_nSizeOnMedia; uint32 nError = 0; // Handle file request. if (m_bAbortReads) { nError = ERROR_ABORTED_ON_SHUTDOWN; } else if (pFileRequest->m_bReadBegun) { nError = pFileRequest->ReadFileResume(this); } else { nError = pFileRequest->ReadFile(this); } #ifdef STREAMENGINE_ENABLE_STATS pFileRequest->m_nReadCounter = m_nReadCounter++; #endif if (nError == 0) { if (pFileRequest->m_eMediaType != eStreamSourceTypeMemory) { pFileRequest->m_nReadHeadOffsetKB = (int32)(((int64)pFileRequest->m_nDiskOffset - m_nLastReadDiskOffset) >> 10); // in KB m_nLastReadDiskOffset = pFileRequest->m_nDiskOffset + pFileRequest->m_nSizeOnMedia; #ifdef STREAMENGINE_ENABLE_STATS m_NotInMemoryStats.m_nTempReadOffset += abs(pFileRequest->m_nReadHeadOffsetKB); m_NotInMemoryStats.m_nTotalReadOffset += abs(pFileRequest->m_nReadHeadOffsetKB); m_NotInMemoryStats.m_nTempRequestCount++; // Calc IO bandwidth only for non memory files. m_NotInMemoryStats.m_nTempBytesRead += pFileRequest->m_nSizeOnMedia; m_NotInMemoryStats.m_TempReadTime += pFileRequest->m_readTime; #endif } else { #ifdef STREAMENGINE_ENABLE_STATS m_InMemoryStats.m_nTempRequestCount++; // Calc IO bandwidth only for in memory files. m_InMemoryStats.m_nTempBytesRead += pFileRequest->m_nSizeOnMedia; m_InMemoryStats.m_TempReadTime += pFileRequest->m_readTime; #endif } CAsyncIOFileRequest::JobFinalize_Read(pFileRequest, m_pStreamEngine->GetJobEngineState()); } else { switch (nError) { case ERROR_OUT_OF_MEMORY: bIsOOM = true; pFileRequest->SetPriority(estpPreempted); if (pFileRequest->IgnoreOutofTmpMem()) { CryInterlockedIncrement(&m_iUrgentRequests); } m_fileRequestQueue.push_back(pFileRequest.Relinquish()); m_bNewRequests = true; break; case ERROR_PREEMPTED: pFileRequest->SetPriority(estpPreempted); if (pFileRequest->IgnoreOutofTmpMem()) { CryInterlockedIncrement(&m_iUrgentRequests); } m_fileRequestQueue.push_back(pFileRequest.Relinquish()); m_bNewRequests = true; break; case ERROR_MISSCHEDULED: // Request tried to read a file that has changed media type. Reset the sort key // and reschedule. pFileRequest->m_bSortKeyComputed = 0; AddRequest(&*pFileRequest, false); break; default: pFileRequest->SyncWithDecrypt(); pFileRequest->SyncWithDecompress(); pFileRequest->Failed(nError); CAsyncIOFileRequest::JobFinalize_Read(pFileRequest, m_pStreamEngine->GetJobEngineState()); break; } } ////////////////////////////////////////////////////////////////////////// if (m_bNewRequests) { READ_WRITE_BARRIER ProcessNewRequests(); } if (m_bNeedReset) { ProcessReset(); } if (m_bNeedSorting) { SortRequests(); } ////////////////////////////////////////////////////////////////////////// #ifdef STREAMENGINE_ENABLE_STATS if (g_cvars.sys_streaming_max_bandwidth != 0) { CTimeValue t1 = gEnv->pTimer->GetAsyncTime(); CTimeValue deltaT = t1 - t0; // Sleep in case we are streaming too fast. const float fTheoreticalReadTime = float(nSizeOnMedia) / g_cvars.sys_streaming_max_bandwidth * 0.00000095367431640625f; // / (1024*1024) if (fTheoreticalReadTime - deltaT.GetSeconds() > FLT_EPSILON) { uint32 nSleepTime = uint32(1000.f * (fTheoreticalReadTime - deltaT.GetSeconds())); CrySleep(nSleepTime); } } CTimeValue t1 = gEnv->pTimer->GetAsyncTime(); CTimeValue deltaT = t1 - t0; // update the stats every second if (deltaT.GetMilliSecondsAsInt64() > 1000) { m_InMemoryStats.Update(deltaT); m_NotInMemoryStats.Update(deltaT); t0 = t1; } #endif } } } #ifdef STREAMENGINE_ENABLE_STATS void CStreamingIOThread::SStats::Update(const CTimeValue& deltaT) { m_nReadBytesInLastSecond = (uint32)m_nTempBytesRead; m_nRequestCountInLastSecond = m_nTempRequestCount; m_nTotalReadBytes += (uint32)m_nTempBytesRead; m_nTotalRequestCount += m_nTempRequestCount; m_TotalReadTime += m_TempReadTime; if (m_TempReadTime.GetValue() != 0) { m_nActualReadBandwith = (uint32)(m_nTempBytesRead / m_TempReadTime.GetSeconds()); } else { m_nActualReadBandwith = 0; } m_nCurrentReadBandwith = (uint32)(m_nTempBytesRead / deltaT.GetSeconds()); m_fReadingDuringLastSecond = m_TempReadTime.GetSeconds() / deltaT.GetSeconds() * 100; if (m_nTempRequestCount > 0) { m_nReadOffsetInLastSecond = m_nTempReadOffset / m_nTempRequestCount; } else { m_nReadOffsetInLastSecond = 0; } m_TempReadTime.SetValue(0); m_nTempBytesRead = 0; m_nTempReadOffset = 0; m_nTempRequestCount = 0; } #endif ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::Cancel() { m_bCancelThreadRequest = true; m_awakeEvent.Set(); } ////////////////////////////////////////////////////////////////////////// struct SCompareAsyncFileRequest { bool operator()(CAsyncIOFileRequest* pFile1, CAsyncIOFileRequest* pFile2) const { return pFile1->m_nSortKey > pFile2->m_nSortKey; } }; ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::SortRequests() { FUNCTION_PROFILER(gEnv->pSystem, PROFILE_SYSTEM); std::sort(m_fileRequestQueue.begin(), m_fileRequestQueue.end(), SCompareAsyncFileRequest()); /* int nStartOfQueue = 0; int64 nDiskOffsetLimit = m_nLastReadDiskOffset - 32*1024; // 32KB less only int nCount = (int)m_fileRequestQueue.size(); for (int i = nCount-1; i >= 0; i--) { if (m_fileRequestQueue[i]->m_nDiskOffset > nDiskOffsetLimit) { nStartOfQueue = i+1; break; } } if (nStartOfQueue < nCount && nStartOfQueue > 0) { int nElements = nCount - nStartOfQueue; // Move all elements up to nStartOfQueue, from begining of the request array to the end. m_temporaryArray.resize(0); // Copy to temp array elements up to nStartOfQueue m_temporaryArray.insert( m_temporaryArray.end(),m_fileRequestQueue.begin()+nStartOfQueue,m_fileRequestQueue.end() ); // Remove elements up to nStartOfQueue from request list m_fileRequestQueue.erase( m_fileRequestQueue.begin()+nStartOfQueue,m_fileRequestQueue.end() ); // Add elemenets at the end from temp array. m_fileRequestQueue.insert( m_fileRequestQueue.begin(),m_temporaryArray.begin(),m_temporaryArray.end() ); } */ m_bNeedSorting = false; } void CStreamingIOThread::NeedSorting() { m_bNeedSorting = true; } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::ProcessNewRequests() { m_bNewRequests = false; std::vector temporaryArray; temporaryArray.reserve(m_newFileRequests.size()); m_newFileRequests.swap(temporaryArray); std::vector& newFiles = temporaryArray; if (!newFiles.empty()) { uint64 nCurrentKeyInProgress = m_fileRequestQueue.size() ? m_fileRequestQueue.back()->m_nSortKey : 0; // Compute sorting key for new file entries. int iWakeFallback(0); const TFallbackIOVecConstIt itEnd = m_FallbackIOThreads.end(); const size_t fallbackNum = m_FallbackIOThreads.size(); PREFAST_SUPPRESS_WARNING(6255) uint8 * pFallbackSignals = fallbackNum ? (uint8*)alloca(fallbackNum) : NULL; for (uint32 fb = 0; fb < fallbackNum; ++fb) { pFallbackSignals[fb] = 0; } for (size_t i = 0, num = newFiles.size(); i < num; i++) { CAsyncIOFileRequest* pFilepRequest = newFiles[i]; pFilepRequest->ComputeSortKey(nCurrentKeyInProgress); static_cast(&*pFilepRequest->m_pReadStream)->ComputedMediaType(pFilepRequest->m_eMediaType); #ifdef STREAMENGINE_ENABLE_LISTENER IStreamEngineListener* pListener = m_pStreamEngine->GetListener(); if (pListener) { pListener->OnStreamComputedSortKey(pFilepRequest, pFilepRequest->m_nSortKey); } #endif bool bFallback = false; int idx = -1; for (TFallbackIOVecConstIt it = m_FallbackIOThreads.begin(); it != itEnd && !bFallback; ++it) { ++idx; if (it->second == pFilepRequest->GetMediaType()) { if (pFilepRequest->IgnoreOutofTmpMem()) { CryInterlockedDecrement(&m_iUrgentRequests); } (it->first)->AddRequest(pFilepRequest, true); pFilepRequest->Release(); // Release local ownership of request (moved to fallback IO thread) iWakeFallback++; bFallback = true; pFallbackSignals[idx] = 1; } } if (!bFallback) { m_fileRequestQueue.push_back(pFilepRequest); } } for (uint32 fb = 0; fb < fallbackNum; ++fb) { if (pFallbackSignals[fb] != 0) { (m_FallbackIOThreads[fb].first)->SignalStartWork(false); } } SortRequests(); /* if (m_fileRequestQueue.back() != pRequest && pRequest != 0) { // Highest priority changed. if (m_fileRequestQueue.back()->m_nDiskOffset < (m_nLastReadDiskOffset-32*1024)) { //CryLog( "Bad Offset in Queue" ); } } */ } } void CStreamingIOThread::ProcessReset() { if (!m_fileRequestQueue.empty()) { for (std::vector::iterator it = m_fileRequestQueue.begin(), itEnd = m_fileRequestQueue.end(); it != itEnd; ++it) { (*it)->Release(); } } stl::free_container(m_fileRequestQueue); if (!m_temporaryArray.empty()) { for (std::vector::iterator it = m_temporaryArray.begin(), itEnd = m_temporaryArray.end(); it != itEnd; ++it) { (*it)->Release(); } } stl::free_container(m_temporaryArray); m_bNeedReset = false; m_resetDoneEvent.Set(); } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::CancelAll() { { CryMT::vector::AutoLock lock(m_newFileRequests.get_lock()); if (!m_newFileRequests.empty()) { CAsyncIOFileRequest* const* it = &m_newFileRequests.front(); CAsyncIOFileRequest* const* itEnd = it + m_newFileRequests.size(); for (; it != itEnd; ++it) { (*it)->Release(); } } } m_newFileRequests.free_memory(); m_iUrgentRequests = 0; } void CStreamingIOThread::AbortAll(bool bAbort) { m_bAbortReads = bAbort; } void CStreamingIOThread::BeginReset() { CancelAll(); m_resetDoneEvent.Reset(); m_bNeedReset = true; m_awakeEvent.Set(); } void CStreamingIOThread::EndReset() { m_resetDoneEvent.Wait(); } ////////////////////////////////////////////////////////////////////////// void CStreamingIOThread::RegisterFallbackIOThread(EStreamSourceMediaType mediaType, CStreamingIOThread* pIOThread) { //check if media has not yet been registered if (!pIOThread) { return;//no need for NULL register anymore } const TFallbackIOVecConstIt itEnd = m_FallbackIOThreads.end(); for (TFallbackIOVecConstIt it = m_FallbackIOThreads.begin(); it != itEnd; ++it) { if (it->second == mediaType) { return; } } m_FallbackIOThreads.push_back(std::make_pair(pIOThread, mediaType)); m_nFallbackMTs |= 1 << mediaType; } bool CStreamingIOThread::HasUrgentRequests() { bool ret = false; if (m_iUrgentRequests > 0) { //lock to prevent list modification whilst traversing m_newFileRequests.get_lock().Lock(); int nRequests = m_newFileRequests.size(); if (nRequests) { for (int i = 0; i < nRequests; i++) { if (m_newFileRequests[i]->m_ePriority == estpUrgent) { //printf("Urgent task pending: %s\n", m_newFileRequests[i]->m_strFileName.c_str()); ret = true; break; } } } m_newFileRequests.get_lock().Unlock(); } return ret; } bool CStreamingIOThread::IsMisscheduled(EStreamSourceMediaType mt) const { if (mt == m_eMediaType) { return false; } if (m_nFallbackMTs & (1 << mt)) { return true; } return false; } ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// CStreamingWorkerThread::CStreamingWorkerThread(CStreamEngine* pStreamEngine, const char* name, EWorkerType type, SStreamRequestQueue* pQueue) { m_type = type; m_name = name; m_pStreamEngine = pStreamEngine; m_pQueue = pQueue; m_bCancelThreadRequest = false; m_bNeedsReset = false; Start((unsigned)1 << g_cvars.sys_streaming_cpu_worker, name); } CStreamingWorkerThread::~CStreamingWorkerThread() { Cancel(); Stop(); WaitForThread(); } ////////////////////////////////////////////////////////////////////////// void CStreamingWorkerThread::Run() { SetName(m_name); #if defined(AZ_RESTRICTED_PLATFORM) #define AZ_RESTRICTED_SECTION STREAMIOTHREAD_CPP_SECTION_2 #if defined(AZ_PLATFORM_XENIA) #include "Xenia/StreamIOThread_cpp_xenia.inl" #elif defined(AZ_PLATFORM_PROVO) #include "Provo/StreamIOThread_cpp_provo.inl" #elif defined(AZ_PLATFORM_SALEM) #include "Salem/StreamIOThread_cpp_salem.inl" #endif #endif // Main thread loop while (!m_bCancelThreadRequest) { m_pQueue->m_awakeEvent.Wait(); m_pQueue->m_awakeEvent.Reset(); CAsyncIOFileRequest_AutoPtr pFileRequest; while (!m_bCancelThreadRequest && !m_bNeedsReset && m_pQueue->TryPopRequest(pFileRequest)) { switch (m_type) { case eWorkerAsyncCallback: { float fTime = gEnv->pTimer->GetAsyncCurTime(); m_pStreamEngine->ReportAsyncFileRequestComplete(pFileRequest); float fTime1 = gEnv->pTimer->GetAsyncCurTime(); #ifdef STREAMENGINE_ENABLE_STATS CryInterlockedDecrement(&m_pStreamEngine->GetStreamingStatistics().nCurrentAsyncCount); #endif #ifndef _RELEASE if ((fTime1 - fTime) > 1.f && !pFileRequest->m_strFileName.empty()) { string str; str.Format("[ACALL] %s time=%.5f\n", pFileRequest->m_strFileName.c_str(), (fTime1 - fTime)); if (gEnv && gEnv->pSystem && gEnv->pLog) { gEnv->pLog->Log(str.c_str()); } } #endif } break; } } if (m_bNeedsReset) { m_pQueue->Reset(); m_bNeedsReset = false; m_resetDoneEvent.Set(); } } } ////////////////////////////////////////////////////////////////////////// void CStreamingWorkerThread::Cancel() { m_bCancelThreadRequest = true; m_pQueue->m_awakeEvent.Set(); } ////////////////////////////////////////////////////////////////////////// void CStreamingWorkerThread::CancelAll() { m_pQueue->Reset(); } void CStreamingWorkerThread::BeginReset() { CancelAll(); m_resetDoneEvent.Reset(); m_bNeedsReset = true; m_pQueue->m_awakeEvent.Set(); } void CStreamingWorkerThread::EndReset() { m_resetDoneEvent.Wait(); }