Revision: 2443 http://rigsofrods.svn.sourceforge.net/rigsofrods/?rev=2443&view=rev Author: rorthomas Date: 2012-02-04 13:58:18 +0000 (Sat, 04 Feb 2012) Log Message: ----------- threading improvements
Modified Paths: -------------- trunk/source/main/physics/threading/BeamWorker.cpp trunk/source/main/physics/threading/BeamWorkerManager.cpp trunk/source/main/physics/threading/BeamWorkerManager.h Modified: trunk/source/main/physics/threading/BeamWorker.cpp =================================================================== --- trunk/source/main/physics/threading/BeamWorker.cpp 2012-02-04 13:41:44 UTC (rev 2442) +++ trunk/source/main/physics/threading/BeamWorker.cpp 2012-02-04 13:58:18 UTC (rev 2443) @@ -26,6 +26,7 @@ #include "BeamThread.h" #include "errorutils.h" #include "InputEngine.h" +#include "utils.h" using namespace Ogre; @@ -61,6 +62,7 @@ } #endif //USE_CRASHRPT + LOG("TR| BeamWorker created: " + TOSTRING(ThreadID::getID())); try { // additional exception handler required, otherwise RoR just crashes upon exception @@ -74,20 +76,19 @@ } } catch(Ogre::Exception& e) { - // try to shutdown input system upon an error - if(InputEngine::singletonExists()) // this prevents the creating of it, if not existing - INPUTENGINE.prepareShutdown(); - - String url = "http://wiki.rigsofrods.com/index.php?title=Error_" + TOSTRING(e.getNumber())+"#"+e.getSource(); - showOgreWebError("An exception has occurred!", e.getFullDescription(), url); + LOG("TR| BeamWorker exception: " + TOSTRING(ThreadID::getID()) + ": " + e.getFullDescription()); + // TODO: error handling } + LOG("TR| BeamWorker exiting: " + TOSTRING(ThreadID::getID())); } void BeamWorker::_doWork() { - LOG("BeamWorker doing work: " + TOSTRING((unsigned int)pthread_self().p)); + int seconds = Ogre::Math::RangeRandom(1, 10); + LOG("TR| BeamWorker doing work: " + TOSTRING(ThreadID::getID()) + " sleeping " + TOSTRING(seconds) + " seconds"); - Sleep(rand() * 5000); + sleep(seconds); + #if 0 float dtperstep=dt/(Real)steps; @@ -124,4 +125,4 @@ ffhydro=affhydro/steps; if (free_hydro) ffhydro=ffhydro/free_hydro; #endif // 0 -} \ No newline at end of file +} Modified: trunk/source/main/physics/threading/BeamWorkerManager.cpp =================================================================== --- trunk/source/main/physics/threading/BeamWorkerManager.cpp 2012-02-04 13:41:44 UTC (rev 2442) +++ trunk/source/main/physics/threading/BeamWorkerManager.cpp 2012-02-04 13:58:18 UTC (rev 2443) @@ -25,6 +25,7 @@ #include <Ogre.h> #include "Settings.h" +#include "utils.h" using namespace Ogre; @@ -41,16 +42,16 @@ threads() , threadsSize(0) , done_count(0) - , work_mutex() - , work_cv() , done_count_mutex() , done_count_cv() { - pthread_mutex_init(&work_mutex, NULL); - pthread_cond_init(&work_cv, NULL); + pthread_mutex_init(&api_mutex, NULL); pthread_mutex_init(&done_count_mutex, NULL); pthread_cond_init(&done_count_cv, NULL); + + threads.clear(); + // start worker thread: pthread_create(&workerThread, NULL, threadWorkerManagerEntry, (void*)this); } @@ -58,60 +59,85 @@ BeamWorkerManager::~BeamWorkerManager() { + pthread_cond_destroy(&done_count_cv); + pthread_mutex_destroy(&api_mutex); + pthread_mutex_destroy(&done_count_mutex); } void BeamWorkerManager::_startWorkerLoop() { + LOG("worker manager started in thread " + TOSTRING(ThreadID::getID())); while (1) { MUTEX_LOCK(&done_count_mutex); while (done_count < threadsSize) { - LOG("worker loop continued, waiting for more threads | threads waiting: " + TOSTRING(done_count) + " / " + TOSTRING(threadsSize)); + LOG("TR| worker loop continued, waiting for more threads | threads waiting: " + TOSTRING(done_count) + " / " + TOSTRING(threadsSize)); pthread_cond_wait(&done_count_cv, &done_count_mutex); } MUTEX_UNLOCK(&done_count_mutex); - LOG("worker loop lock aquired, all threads sleeping..."); + LOG("TR| worker loop lock acquired, all threads sleeping..."); // we got through, all threads should be stopped by now // TODO: make modifications on truck array in here - Sleep(5000); + sleep(3); // reset counter, continue all threads via signal - LOG("worker loop: unpausing all threads"); + LOG("TR| worker loop: unpausing all threads"); MUTEX_LOCK(&done_count_mutex); done_count=0; // send signals to the threads - pthread_cond_signal(&work_cv); + threadMap::iterator it; + for(it = threads.begin(); it != threads.end(); ++it) + { + pthread_cond_signal(&it->second.work_cv); + } MUTEX_UNLOCK(&done_count_mutex); } } void BeamWorkerManager::addThread(BeamThread *bthread) { - threads.push_back(bthread); - threadsSize=threads.size(); + // threadID is not valid in this context! + MUTEX_LOCK(&api_mutex); + workerData_t wd; + pthread_mutex_init(&wd.work_mutex, NULL); + pthread_cond_init(&wd.work_cv, NULL); + wd.bthread = bthread; + wd.threadID = -1; + LOG("TR| add Thread: " + TOSTRING(bthread)); + threads[bthread] = wd; + threadsSize++; + MUTEX_UNLOCK(&api_mutex); } void BeamWorkerManager::removeThread(BeamThread *bthread) { - for(std::vector<BeamThread*>::iterator it = threads.begin(); it != threads.end(); ++it) - { - if(*it == bthread) - { - threads.erase(it); - threadsSize=threads.size(); - return; - } - } + // threadID is not valid in this context! + MUTEX_LOCK(&api_mutex); + pthread_cond_destroy(&threads[bthread].work_cv); + pthread_mutex_destroy(&threads[bthread].work_mutex); + LOG("TR| remove Thread: " + TOSTRING(bthread)); + threads[bthread].bthread = NULL; + threads[bthread].threadID = 0; + threadsSize--; + MUTEX_UNLOCK(&api_mutex); } void BeamWorkerManager::syncThreads(BeamThread *bthread) { - LOG("Thread visiting syncing point| class: " + TOSTRING((unsigned long)bthread) + " / thread: " + TOSTRING((unsigned long)pthread_self().p)); + unsigned int tid = ThreadID::getID(); + LOG("TR| Thread visiting syncing point| class: " + TOSTRING((unsigned long)bthread) + " / thread: " + TOSTRING(tid)); + threadMap::iterator it = threads.find(bthread); + if(it == threads.end()) + { + LOG("unknown thread calling: " + TOSTRING(tid) + " / class " + TOSTRING(bthread)); + return; + } - MUTEX_LOCK(&work_mutex); + workerData_t &wd = it->second; + MUTEX_LOCK(&wd.work_mutex); // count thread counter up: MUTEX_LOCK(&done_count_mutex); @@ -119,8 +145,9 @@ pthread_cond_signal(&done_count_cv); MUTEX_UNLOCK(&done_count_mutex); + // then wait for the signal that we can continue - pthread_cond_wait(&work_cv, &work_mutex); - MUTEX_UNLOCK(&work_mutex); + pthread_cond_wait(&wd.work_cv, &wd.work_mutex); + MUTEX_UNLOCK(&wd.work_mutex); // return to continue to do work } Modified: trunk/source/main/physics/threading/BeamWorkerManager.h =================================================================== --- trunk/source/main/physics/threading/BeamWorkerManager.h 2012-02-04 13:41:44 UTC (rev 2442) +++ trunk/source/main/physics/threading/BeamWorkerManager.h 2012-02-04 13:58:18 UTC (rev 2443) @@ -33,12 +33,25 @@ { friend class BeamThread; friend class RoRSingleton<BeamWorkerManager>; + static const int MAX_THREADS = 256; protected: - std::vector<BeamThread*> threads; + typedef struct workerData_t + { + pthread_mutex_t work_mutex; + pthread_cond_t work_cv; + int threadID; + BeamThread *bthread; + } workerData_t; + + typedef std::map <BeamThread*, workerData_t> threadMap; + threadMap threads; int threadsSize; + + void *classInstanceData[MAX_THREADS]; + int instanceSize; + int done_count; - pthread_mutex_t work_mutex; - pthread_cond_t work_cv; + pthread_mutex_t api_mutex; pthread_mutex_t done_count_mutex; pthread_cond_t done_count_cv; pthread_t workerThread; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ Try before you buy = See our experts in action! The most comprehensive online learning library for Microsoft developers is just $99.99! Visual Studio, SharePoint, SQL - plus HTML5, CSS3, MVC3, Metro Style Apps, more. Free future releases when you subscribe now! http://p.sf.net/sfu/learndevnow-dev2 _______________________________________________ Rigsofrods-devel mailing list Rigsofrods-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/rigsofrods-devel