Hi,
A "return" statement was missing in MachinePool::setUpOnLocalHost()
and prevents from compiling under windows.
cheers
--
Jean-Christophe Lombardo
/* -*-c++-*- VirtualPlanetBuilder - Copyright (C) 1998-2007 Robert Osfield
*
* This library is open source and may be redistributed and/or modified under
* the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
* (at your option) any later version. The full license is in LICENSE file
* included with this distribution, and on the openscenegraph.org website.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* OpenSceneGraph Public License for more details.
*/
#include <vpb/MachinePool>
#include <vpb/Task>
#include <vpb/TaskManager>
#include <vpb/System>
#include <osg/GraphicsThread>
#include <osg/Timer>
#include <osgDB/Input>
#include <osgDB/Output>
#include <osgDB/FileUtils>
#include <signal.h>
#include <iostream>
using namespace vpb;
/////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// MachineOperation
//
MachineOperation::MachineOperation(Task* task):
osg::Operation(task->getFileName(), false),
_task(task)
{
_task->setStatus(Task::PENDING);
_task->write();
}
void MachineOperation::operator () (osg::Object* object)
{
Machine* machine = dynamic_cast<Machine*>(object);
if (machine)
{
std::string application;
if (_task->getProperty("application",application))
{
osg::Timer_t startTick = osg::Timer::instance()->tick();
_task->setProperty("hostname",machine->getHostName());
_task->setStatus(Task::RUNNING);
_task->setWithCurrentDate("date");
_task->write();
machine->startedTask(_task.get());
int result = machine->exec(application);
machine->endedTask(_task.get());
// read any updates to the task written to file by the application.
_task->read();
double duration;
if (!_task->getProperty("duration",duration))
{
duration = osg::Timer::instance()->delta_s(startTick,
osg::Timer::instance()->tick());
}
if (result==0)
{
// success
_task->setStatus(Task::COMPLETED);
_task->write();
}
else
{
// failure
_task->setStatus(Task::FAILED);
_task->write();
// tell the machine about this task failure.
machine->taskFailed(_task.get(), result);
}
machine->log(osg::NOTICE,"%s : completed in %f seconds : %s
result=%d",machine->getHostName().c_str(),duration,application.c_str(),result);
}
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// BlockOperation
//
BlockOperation::BlockOperation():
osg::Operation("Block", false)
{
}
void BlockOperation::release()
{
Block::release();
}
void BlockOperation::operator () (osg::Object* object)
{
Block::release();
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// Machine
//
Machine::Machine():
_machinePool(0)
{
}
Machine::Machine(const Machine& m, const osg::CopyOp& copyop):
osg::Object(m, copyop),
_machinePool(m._machinePool),
_hostname(m._hostname),
_commandPrefix(m._commandPrefix),
_commandPostfix(m._commandPostfix)
{
}
Machine::Machine(const std::string& hostname,const std::string& cacheDirectory,
const std::string& commandPrefix, const std::string& commandPostfix, int
numThreads):
_machinePool(0),
_cacheDirectory(cacheDirectory),
_hostname(hostname),
_commandPrefix(commandPrefix),
_commandPostfix(commandPostfix)
{
if (numThreads<0)
{
// autodetect
numThreads = 1;
}
for(int i=0; i<numThreads; ++i)
{
osg::OperationThread* thread = new osg::OperationThread;
thread->setParent(this);
_threads.push_back(thread);
}
}
Machine::~Machine()
{
log(osg::INFO,"Machine::~Machine()");
}
int Machine::exec(const std::string& application)
{
bool runningRemotely = getHostName()!=getLocalHostName() &&
getHostName()!="localhost";
std::string executionString;
if (!getCommandPrefix().empty())
{
executionString = getCommandPrefix() + std::string(" ") + application;
}
else if (runningRemotely)
{
executionString = std::string("ssh ") +
getHostName() +
std::string(" \"") +
application +
std::string("\"");
}
else
{
executionString = application;
}
if (!getCommandPostfix().empty())
{
executionString += std::string(" ") + getCommandPostfix();
}
log(osg::NOTICE,"%s : running
%s",getHostName().c_str(),executionString.c_str());
return system(executionString.c_str());
}
void Machine::startThreads()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadsMutex);
log(osg::INFO,"Machine::startThreads() hostname=%s,
threads=%d",_hostname.c_str(),_threads.size());
for(Threads::iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
log(osg::INFO," Started thread");
(*itr)->setDone(false);
(*itr)->startThread();
}
}
void Machine::cancelThreads()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadsMutex);
log(osg::NOTICE,"Machine::cancelThreads() hostname=%s,
threads=%d",_hostname.c_str(),_threads.size());
for(Threads::iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
log(osg::NOTICE," Cancel thread");
(*itr)->cancel();
// assign a new thread as OpenThreads doesn't currently allow cancelled
threads to be restarted.
osg::OperationThread* thread = new osg::OperationThread;
thread->setParent(this);
thread->setOperationQueue(_machinePool->getOperationQueue());
(*itr) = thread;
}
log(osg::NOTICE,"Completed Machine::cancelThreads() hostname=%s,
threads=%d",_hostname.c_str(),_threads.size());
}
void Machine::setOperationQueue(osg::OperationQueue* queue)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadsMutex);
for(Threads::iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
(*itr)->setOperationQueue(queue);
}
}
unsigned int Machine::getNumThreadsActive() const
{
#if 1
return _runningTasks.size();
#else
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadsMutex);
unsigned int numThreadsActive = 0;
for(Threads::const_iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
if ((*itr)->getCurrentOperation().valid() ||
!(*itr)->getOperationQueue()->empty())
{
++numThreadsActive;
}
}
return numThreadsActive;
#endif
}
unsigned int Machine::getNumThreadsRunning() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadsMutex);
unsigned int numThreadsRunning = 0;
for(Threads::const_iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
if ((*itr)->isRunning())
{
++numThreadsRunning;
}
}
return numThreadsRunning;
}
unsigned int Machine::getNumThreadsNotDone() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadsMutex);
unsigned int numThreadsNotDone = 0;
for(Threads::const_iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
if (!(*itr)->getDone())
{
++numThreadsNotDone;
}
}
return numThreadsNotDone;
}
void Machine::startedTask(Task* task)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_runningTasksMutex);
_runningTasks[task] = osg::Timer::instance()->time_s();
}
void Machine::endedTask(Task* task)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_runningTasksMutex);
RunningTasks::iterator itr = _runningTasks.find(task);
if (itr != _runningTasks.end())
{
double runningTime = osg::Timer::instance()->time_s() - itr->second;
_runningTasks.erase(itr);
std::string taskType;
task->getProperty("type",taskType);
_taskStatsMap[taskType].logTime(runningTime);
}
}
void Machine::taskFailed(Task* task, int result)
{
log(osg::INFO,"%s : taskFailed(%d)", getHostName().c_str(), result);
if (_machinePool)
{
switch(_machinePool->getTaskFailureOperation())
{
case(MachinePool::IGNORE_FAILED_TASK):
{
log(osg::INFO," IGNORE");
break;
}
case(MachinePool::BLACKLIST_MACHINE_AND_RESUBMIT_TASK):
{
log(osg::NOTICE,"Task %s has failed, blacklisting machine %s
and resubmitting task",task->getFileName().c_str(),getHostName().c_str());
setDone(true);
//setOperationQueue(0);
_machinePool->run(task);
_machinePool->release();
break;
}
case(MachinePool::COMPLETE_RUNNING_TASKS_THEN_EXIT):
{
log(osg::NOTICE," COMPLETE_RUNNING_TASKS_THEN_EXIT");
_machinePool->setTaskFailureOperation(MachinePool::IGNORE_FAILED_TASK);
System::instance()->getTaskManager()->setDone(true);
_machinePool->removeAllOperations();
_machinePool->release();
break;
}
case(MachinePool::TERMINATE_RUNNING_TASKS_THEN_EXIT):
{
log(osg::NOTICE," TERMINATE_RUNNING_TASKS_THEN_EXIT");
_machinePool->setTaskFailureOperation(MachinePool::IGNORE_FAILED_TASK);
System::instance()->getTaskManager()->setDone(true);
_machinePool->removeAllOperations();
_machinePool->signal(SIGTERM);
_machinePool->release();
break;
}
}
}
}
void Machine::signal(int signal)
{
log(osg::NOTICE,"Machine::signal(%d)",signal);
RunningTasks tasks;
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_runningTasksMutex);
tasks = _runningTasks;
}
for(RunningTasks::iterator itr = tasks.begin();
itr != tasks.end();
++itr)
{
Task* task = itr->first;
task->read();
std::string pid;
if (task->getProperty("pid", pid))
{
std::stringstream signalcommand;
signalcommand << "kill -" << signal<<" "<<pid;
exec(signalcommand.str());
}
}
}
void Machine::setDone(bool done)
{
for(Threads::const_iterator itr = _threads.begin();
itr != _threads.end();
++itr)
{
(*itr)->setDone(done);
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// MachinePool
//
MachinePool::MachinePool():
_taskFailureOperation(IGNORE_FAILED_TASK),
_done(false)
{
//_taskFailureOperation = IGNORE_FAILED_TASK;
_taskFailureOperation = BLACKLIST_MACHINE_AND_RESUBMIT_TASK;
//_taskFailureOperation = COMPLETE_RUNNING_TASKS_THEN_EXIT;
//_taskFailureOperation = TERMINATE_RUNNING_TASKS_THEN_EXIT;
_operationQueue = new osg::OperationQueue;
_blockOp = new BlockOperation;
}
MachinePool::~MachinePool()
{
log(osg::INFO,"MachinePool::~MachinePool()");
}
void MachinePool::setBuildLog(BuildLog* bl)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
Logger::setBuildLog(bl);
for(Machines::const_iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
(*itr)->setBuildLog(bl);
}
}
void MachinePool::addMachine(const std::string& hostname, const std::string&
cacheDirectory, const std::string& commandPrefix, const std::string&
commandPostfix, int numThreads)
{
log(osg::INFO,"addMachine(");
log(osg::INFO," hostname = %s",hostname.c_str());
log(osg::INFO," cacheDirectory = %s",cacheDirectory.c_str());
log(osg::INFO," commandPrefix = %s",commandPrefix.c_str());
log(osg::INFO," commandPostfix = %s",commandPostfix.c_str());
log(osg::INFO," numThreads = %d)",numThreads);
addMachine(new Machine(hostname, cacheDirectory, commandPrefix,
commandPostfix, numThreads));
}
void MachinePool::addMachine(Machine* machine)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
machine->_machinePool = this;
machine->setBuildLog(getBuildLog());
machine->setOperationQueue(_operationQueue.get());
machine->startThreads();
_machines.push_back(machine);
}
Machine* MachinePool::getMachine(const std::string& hostname)
{
for(Machines::iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
if ((*itr)->getHostName()==hostname) return itr->get();
}
return 0;
}
void MachinePool::run(Task* task)
{
log(osg::INFO, "Adding Task to MachinePool::OperationQueue
%s",task->getFileName().c_str());
_operationQueue->add(new MachineOperation(task));
}
void MachinePool::waitForCompletion()
{
_blockOp->reset();
_operationQueue->add(_blockOp.get());
// wait till block is complete i.e. the operation queue has been cleared up
to the block
_blockOp->block();
if (!done()) OpenThreads::Thread::YieldCurrentThread();
// there can still be operations running though so need to double check.
while(getNumThreadsActive()>0 && !done())
{
// log(osg::INFO, "MachinePool::waitForCompletion : Waiting for threads
to complete = %d",getNumThreadsActive());
#if 1
OpenThreads::Thread::microSleep(1000000);
#else
OpenThreads::Thread::YieldCurrentThread();
#endif
}
log(osg::INFO, "MachinePool::waitForCompletion : done %d",done());
log(osg::INFO, " : getNumThreadsActive()
%d",int(getNumThreadsActive()));
log(osg::INFO, " : empty
%d",int(_operationQueue->empty()));
}
unsigned int MachinePool::getNumThreads() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
unsigned int numThreads = 0;
for(Machines::const_iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
numThreads += (*itr)->getNumThreads();
}
return numThreads;
}
unsigned int MachinePool::getNumThreadsActive() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
unsigned int numThreadsActive = 0;
for(Machines::const_iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
numThreadsActive += (*itr)->getNumThreadsActive();
}
return numThreadsActive;
}
unsigned int MachinePool::getNumThreadsRunning() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
unsigned int numThreadsRunning = 0;
for(Machines::const_iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
numThreadsRunning += (*itr)->getNumThreadsRunning();
}
return numThreadsRunning;
}
unsigned int MachinePool::getNumThreadsNotDone() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
unsigned int numThreadsNotDone = 0;
for(Machines::const_iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
numThreadsNotDone += (*itr)->getNumThreadsNotDone();
}
return numThreadsNotDone;
}
void MachinePool::clear()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
_machines.clear();
}
bool MachinePool::read(const std::string& filename)
{
std::string foundFile = osgDB::findDataFile(filename);
if (foundFile.empty())
{
log(osg::WARN, "Error: could not find machine specification file
'%s'",filename.c_str());
return false;
}
std::ifstream fin(foundFile.c_str());
if (fin)
{
_machinePoolFileName = foundFile;
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
_machines.clear();
}
osgDB::Input fr;
fr.attach(&fin);
while(!fr.eof())
{
bool itrAdvanced = false;
std::string readFilename;
if (fr.read("file",readFilename))
{
read(readFilename);
++itrAdvanced;
}
if (fr.matchSequence("Machine {"))
{
int local_entry = fr[0].getNoNestedBrackets();
fr += 2;
std::string hostname;
std::string cacheDirectory;
std::string prefix;
std::string postfix;
int numThreads=-1;
while (!fr.eof() && fr[0].getNoNestedBrackets()>local_entry)
{
bool localAdvanced = false;
if (fr.read("hostname",hostname)) localAdvanced = true;
if (fr.read("cache",cacheDirectory)) localAdvanced = true;
if (fr.read("prefix",prefix)) localAdvanced = true;
if (fr.read("postfix",postfix)) localAdvanced = true;
if (fr.read("threads",numThreads)) localAdvanced = true;
if (fr.read("processes",numThreads)) localAdvanced = true;
if (!localAdvanced) ++fr;
}
addMachine(hostname,cacheDirectory,prefix,postfix,numThreads);
++fr;
itrAdvanced = true;
}
if (!itrAdvanced) ++fr;
}
}
return true;
}
bool MachinePool::write(const std::string& filename) const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
osgDB::Output fout(filename.c_str());
for(Machines::const_iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
const Machine* machine = itr->get();
if (itr != _machines.begin()) fout.indent()<<std::endl;
fout.indent()<<"Machine {"<<std::endl;
fout.moveIn();
if (!machine->getHostName().empty()) fout.indent()<<"hostname
"<<machine->getHostName()<<std::endl;
if (!machine->getCacheDirectory().empty()) fout.indent()<<"cache
"<<machine->getCacheDirectory()<<std::endl;
if (!machine->getCommandPrefix().empty()) fout.indent()<<"prefix
"<<machine->getCommandPrefix()<<std::endl;
if (!machine->getCommandPostfix().empty()) fout.indent()<<"postfix
"<<machine->getCommandPostfix()<<std::endl;
if (machine->getNumThreads()>0) fout.indent()<<"processes
"<<machine->getNumThreads()<<std::endl;
fout.moveOut();
fout.indent()<<"}"<<std::endl;
}
return true;
}
bool MachinePool::setUpOnLocalHost()
{
log(osg::NOTICE,"Setting up MachinePool to use all %i cores on this
machine.",OpenThreads::GetNumberOfProcessors());
addMachine(vpb::getLocalHostName(),vpb::getCacheFileName(),std::string(),std::string(),OpenThreads::GetNumberOfProcessors());
return true;
}
void MachinePool::removeAllOperations()
{
_operationQueue->removeAllOperations();
}
void MachinePool::signal(int signal)
{
log(osg::NOTICE,"MachinePool::signal(%d)",signal);
for(Machines::iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
(*itr)->signal(signal);
}
}
void MachinePool::setDone(bool done)
{
log(osg::NOTICE,"MachinePool::setDone(%d)",(int)done);
_done = done;
if (_done) removeAllOperations();
for(Machines::iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
(*itr)->setDone(done);
}
}
void MachinePool::release()
{
if (_blockOp.valid()) _blockOp->release();
}
void MachinePool::startThreads()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
for(Machines::iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
(*itr)->startThreads();
}
}
void MachinePool::cancelThreads()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_machinesMutex);
for(Machines::iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
(*itr)->cancelThreads();
}
}
void MachinePool::resetMachinePool()
{
log(osg::NOTICE,"MachinePool::resetMachinePool()");
// remove all pending tasks
removeAllOperations();
// stopped threads.
setDone(true);
cancelThreads();
setDone(false);
// restart any stopped threads.
startThreads();
}
void MachinePool::updateMachinePool()
{
log(osg::NOTICE,"MachinePool::updateMachinePool()");
log(osg::NOTICE,"MachinePool::resetMachinePool()");
// remove all pending tasks
removeAllOperations();
// stopped threads.
setDone(true);
cancelThreads();
read(_machinePoolFileName);
setDone(false);
// restart any stopped threads.
startThreads();
}
void MachinePool::reportTimingStats()
{
log(osg::NOTICE,"MachinePool::reportTimingStats()");
for(Machines::iterator itr = _machines.begin();
itr != _machines.end();
++itr)
{
Machine* machine = itr->get();
TaskStatsMap& taskStatsMap = machine->getTaskStatsMap();
log(osg::NOTICE," Machine : %s",machine->getHostName().c_str());
for(TaskStatsMap::iterator titr = taskStatsMap.begin();
titr != taskStatsMap.end();
++titr)
{
TaskStats& stats = titr->second;
log(osg::NOTICE,"
Task::type='%s'\tminTime=%f\tmaxTime=%f\taverageTime=%f\ttotalComputeTime=%f\tnumTasks=%d",
titr->first.c_str(),
stats.minTime(),
stats.maxTime(),
stats.averageTime(),
stats.totalTime(),
stats.numTasks());
}
}
}
_______________________________________________
osg-submissions mailing list
[email protected]
http://lists.openscenegraph.org/listinfo.cgi/osg-submissions-openscenegraph.org