I discovered ZeroMQ through a friend after being dissapointed that Intel's "Threading Building Blocks" didn't mean "threads" in the context that you or I think, but is rather a "get every last ounce out of your Intel CPU with hyper parallelism" system.
What I was looking for was a convenient C++ way of creating work-splitting tasks and work-delegating tasks. Having cut my teeth on ZeroMQ I immediately to making a worker pool system. The result is Async::Worker. Download: http://www.kfs.org/async/worker/ Documentation: http://www.kfs.org/async/worker/manual/ It combines concepts from OpenMP and Intel's TBB. However: an important point of note with this implementation. The messages I pass are pointers to objects rather than objects themselves. It is not intended, as it stands, for inter-process or inter-machine communications. The main classes are: Async::FireAndForget, a base for workloads that can be executed by a worker and destroyed, such as writing to a log file, etc. Async::RunAndReturn, a base class for workloads that do reductions or need completion back in the master thread. RunAndReturn is somewhat along the lines of #pragma omp task. The Work() member is executed by an available worker thread and the pointer is sent back to the master thread, which executes the Result() member when you get around to calling Async::GetResults(). So, consider the following: typedef std::vector<int> Numbers ; struct NumberCruncher : public Async::RunAndReturn { NumberCruncher(Numbers::iterator startRange, Numbers::iterator endRange, int* finalResultHere) : mStartRange(startRange), mEndRange(endRange), mFinalResultHere(finalResultHere), mPrivateSum(0) {} virtual void Work() const { // This gets executed in parallel by a worker thread for ( auto it = mStartRange ; it != mEndRange ; ++it ) *mPrivateSum += *it ; } virtual void Result() { // This gets executed in serial by whomever calls Async::GetResults() *mFinalResultHere += mPrivateSum ; } } ; int main(int argc, char* argv[]) { Numbers numbers(200000000) ; for ( int i = 0 ; i< numbers.size() ; ++i ) numbers[i] = rand(); int total = 0 ; Numbers::iterator endChunk = numbers.begin() ; do { // Each task we dispatch will be to add a block // of upto 8192 numbers. Numbers::iterator startChunk = endChunk ; endChunk = std::min(numbers.end(), it + 8192) ; Async::Queue(new NumberCruncher(startChunk, endChunk,&total)) ; } while ( endChunk != numbers.end() ) ; // Start receiving and adding the results. Async::GetResults() ; printf("Sum of our %u numbers = %d\n", numbers.size(), total) ; } This is a somewhat weak example because the work being done by the worker is so trivial, but even so on a virtual quad-core machine building with -O0 I see a 35-40% reduction in processing time. I ran a second test with a ridiculously complicated formula involving int-to-float, float-to-double, double-to-bool, sin, cos, abs, modulo etc operations, and benchmarked running the array in serial vs parallel. Serial: 6784 miliseconds Paralle: 1401 miliseconds. And that's without any effort to take things like cache alignment, cache lines etc into account, or any effort to tweak the number of tasks per workload. I hope some of you will find this useful. If you want to provide feedback or questions, either follow up here, to my email address or on the blog post linked from the manual page. Happy threading :) - Oliver _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
