On Monday, September 16, 2013 09:09:34 AM Dylan Baker wrote: > This patch completely rewrites the threadpool module. This new > implementation is less complicated than the previous version. It also is > not an external project pulled in, but an original implementation. > > This tries to be largely API compatible with the previous ThreadPool > implementation. And for piglit's use case it is, however, it does not > implement the full API supported by the previous implementation. > > Signed-off-by: Dylan Baker <[email protected]> > --- > framework/core.py | 3 +- > framework/threadpool.py | 455 > ++++++------------------------------------------ framework/threads.py | > 8 +- > 3 files changed, 57 insertions(+), 409 deletions(-) > > diff --git a/framework/core.py b/framework/core.py > index 150a70c..bebe1b8 100644 > --- a/framework/core.py > +++ b/framework/core.py > @@ -428,9 +428,8 @@ class Test: > > See ``Test.doRun`` for a description of the parameters. > ''' > - args = (env, path, json_writer) > if self.runConcurrent: > - ConcurrentTestPool().put(self.doRun, args=args) > + ConcurrentTestPool().put(self.doRun, args=(env, path, > json_writer)) > > def doRun(self, env, path, json_writer): > ''' > diff --git a/framework/threadpool.py b/framework/threadpool.py > index 1b4c12c..ffe8123 100644 > --- a/framework/threadpool.py > +++ b/framework/threadpool.py > @@ -1,418 +1,67 @@ > -# -*- coding: UTF-8 -*- > -"""Easy to use object-oriented thread pool framework. > - > -A thread pool is an object that maintains a pool of worker threads to > perform -time consuming operations in parallel. It assigns jobs to the > threads -by putting them in a work request queue, where they are picked up > by the -next available thread. This then performs the requested operation > in the -background and puts the results in another queue. > - > -The thread pool object can then collect the results from all threads from > -this queue as soon as they become available or after all threads have > -finished their work. It's also possible, to define callbacks to handle > -each result as it comes in. > - > -The basic concept and some code was taken from the book "Python in a > Nutshell, -2nd edition" by Alex Martelli, O'Reilly 2006, ISBN > 0-596-10046-9, from section -14.5 "Threaded Program Architecture". I > wrapped the main program logic in the -ThreadPool class, added the > WorkRequest class and the callback system and -tweaked the code here and > there. Kudos also to Florent Aide for the exception -handling mechanism. > - > -Basic usage:: > - > - >>> pool = ThreadPool(poolsize) > - >>> requests = makeRequests(some_callable, list_of_args, callback) > - >>> [pool.putRequest(req) for req in requests] > - >>> pool.wait() > - > -See the end of the module code for a brief, annotated usage example. > - > -Website : http://chrisarndt.de/projects/threadpool/ > - > -""" > -__docformat__ = "restructuredtext en" > - > -__all__ = [ > - 'makeRequests', > - 'NoResultsPending', > - 'NoWorkersAvailable', > - 'ThreadPool', > - 'WorkRequest', > - 'WorkerThread' > -] > - > -__author__ = "Christopher Arndt" > -__version__ = '1.2.7' > -__revision__ = "$Revision: 416 $" > -__date__ = "$Date: 2009-10-07 05:41:27 +0200 (Wed, 07 Oct 2009) $" > -__license__ = "MIT license" > - > - > -# standard library modules > -import sys > -import threading > -import Queue > -import traceback > - > - > -# exceptions > -class NoResultsPending(Exception): > - """All work requests have been processed.""" > - pass > - > -class NoWorkersAvailable(Exception): > - """No worker threads available to process remaining requests.""" > - pass > - > - > -# internal module helper functions > -def _handle_thread_exception(request, exc_info): > - """Default exception handler callback function. > - > - This just prints the exception info via ``traceback.print_exception``. > - > +# Copyright (c) 2013 Intel Corporation > +# > +# Permission is hereby granted, free of charge, to any person obtaining a > +# copy of this software and associated documentation files (the > "Software"), +# to deal in the Software without restriction, including > without limitation +# the rights to use, copy, modify, merge, publish, > distribute, sublicense, +# and/or sell copies of the Software, and to > permit persons to whom the +# Software is furnished to do so, subject to > the following conditions: +# > +# The above copyright notice and this permission notice (including the next > +# paragraph) shall be included in all copies or substantial portions of > the +# Software. > +# > +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF > MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. > IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY > CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, > TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE > SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. > + > +# This code is based on the MIT licensed code here: > +# http://code.activestate.com/recipes/577187-python-thread-pool/ > + > +from Queue import Queue > +from threading import Thread > + > + > +class Worker(Thread): > """ > - traceback.print_exception(*exc_info) > - > - > -# utility functions > -def makeRequests(callable_, args_list, callback=None, > - exc_callback=_handle_thread_exception): > - """Create several work requests for same callable with different > arguments. - > - Convenience function for creating several work requests for the same > - callable where each invocation of the callable receives different > values - for its arguments. > - > - ``args_list`` contains the parameters for each invocation of callable. > - Each item in ``args_list`` should be either a 2-item tuple of the list > of - positional arguments and a dictionary of keyword arguments or a > single, - non-tuple argument. > - > - See docstring for ``WorkRequest`` for info on ``callback`` and > - ``exc_callback``. > + Simple worker thread > > + This worker simply consumes tasks off of the queue until it is empty > and + then waits for more tasks. > """ > - requests = [] > - for item in args_list: > - if isinstance(item, tuple): > - requests.append( > - WorkRequest(callable_, item[0], item[1], callback=callback, > - exc_callback=exc_callback) > - ) > - else: > - requests.append( > - WorkRequest(callable_, [item], None, callback=callback, > - exc_callback=exc_callback) > - ) > - return requests > - > - > -# classes > -class WorkerThread(threading.Thread): > - """Background thread connected to the requests/results queues. > - > - A worker thread sits in the background and picks up work requests from > - one queue and puts the results in another until it is dismissed. > > - """ > - > - def __init__(self, requests_queue, results_queue, poll_timeout=5, > **kwds): - """Set up thread in daemonic mode and start it > immediatedly. - > - ``requests_queue`` and ``results_queue`` are instances of > - ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates > a new - worker thread. > - > - """ > - threading.Thread.__init__(self, **kwds) > - self.setDaemon(1) > - self._requests_queue = requests_queue > - self._results_queue = results_queue > - self._poll_timeout = poll_timeout > - self._dismissed = threading.Event() > + def __init__(self, queue): > + Thread.__init__(self) > + self.queue = queue > + self.daemon = True > self.start() > > def run(self): > - """Repeatedly process the job queue until told to exit.""" > + """ This method is called in the constructor by self.start() """ > while True: > - if self._dismissed.isSet(): > - # we are dismissed, break out of loop > - break > - # get next work request. If we don't get a new request from the > - # queue after self._poll_timout seconds, we jump to the start > of - # the while loop again, to give the thread a chance to > exit. - try: > - request = self._requests_queue.get(True, > self._poll_timeout) - except Queue.Empty: > - continue > - else: > - if self._dismissed.isSet(): > - # we are dismissed, put back request in queue and exit > loop - self._requests_queue.put(request) > - break > - try: > - result = request.callable(*request.args, > **request.kwds) - self._results_queue.put((request, > result)) > - except: > - request.exception = True > - self._results_queue.put((request, sys.exc_info())) > + func, args = self.queue.get() > + func(*args) # XXX: Does this need to be try/except-ed? > + self.queue.task_done() > > - def dismiss(self): > - """Sets a flag to tell the thread to exit when done with current > job.""" - self._dismissed.set() > - > - > -class WorkRequest: > - """A request to execute a callable for putting in the request queue > later. - > - See the module function ``makeRequests`` for the common case > - where you want to build several ``WorkRequest`` objects for the same > - callable but with different arguments for each call. > > +class ThreadPool(object): > """ > - > - def __init__(self, callable_, args=None, kwds=None, requestID=None, > - callback=None, exc_callback=_handle_thread_exception): > - """Create a work request for a callable and attach callbacks. > - > - A work request consists of the a callable to be executed by a > - worker thread, a list of positional arguments, a dictionary > - of keyword arguments. > - > - A ``callback`` function can be specified, that is called when the > - results of the request are picked up from the result queue. It must > - accept two anonymous arguments, the ``WorkRequest`` object and the > - results of the callable, in that order. If you want to pass > additional - information to the callback, just stick it on the > request object. - > - You can also give custom callback for when an exception occurs with > - the ``exc_callback`` keyword parameter. It should also accept two > - anonymous arguments, the ``WorkRequest`` and a tuple with the > exception - details as returned by ``sys.exc_info()``. The default > implementation - of this callback just prints the exception info via > - ``traceback.print_exception``. If you want no exception handler > - callback, just pass in ``None``. > - > - ``requestID``, if given, must be hashable since it is used by > - ``ThreadPool`` object to store the results of that work request in > a - dictionary. It defaults to the return value of ``id(self)``. - > - """ > - if requestID is None: > - self.requestID = id(self) > - else: > - try: > - self.requestID = hash(requestID) > - except TypeError: > - raise TypeError("requestID must be hashable.") > - self.exception = False > - self.callback = callback > - self.exc_callback = exc_callback > - self.callable = callable_ > - self.args = args or [] > - self.kwds = kwds or {} > - > - def __str__(self): > - return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ > - (self.requestID, self.args, self.kwds, self.exception) > - > -class ThreadPool: > - """A thread pool, distributing work requests and collecting results. > - > - See the module docstring for more information. > - > + A simple ThreadPool class that maintains a Queue object and a set of > Worker + threads. > """ > > - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): > - """Set up the thread pool and start num_workers worker threads. - > - ``num_workers`` is the number of worker threads to start initially. > - > - If ``q_size > 0`` the size of the work *request queue* is limited > and - the thread pool blocks when the queue is full and it tries to > put - more work requests in it (see ``putRequest`` method), unless > you also - use a positive ``timeout`` value for ``putRequest``. > - > - If ``resq_size > 0`` the size of the *results queue* is limited and > the - worker threads will block when the queue is full and they try > to put - new results in it. > - > - .. warning: > - If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there > is - the possibilty of a deadlock, when the results queue is not > pulled - regularly and too many jobs are put in the work > requests queue. - To prevent this, always set ``timeout > 0`` > when calling - ``ThreadPool.putRequest()`` and catch > ``Queue.Full`` exceptions. - > - """ > - self._requests_queue = Queue.Queue(q_size) > - self._results_queue = Queue.Queue(resq_size) > - self.workers = [] > - self.dismissedWorkers = [] > - self.workRequests = {} > - self.createWorkers(num_workers, poll_timeout) > - > - def createWorkers(self, num_workers, poll_timeout=5): > - """Add num_workers worker threads to the pool. > - > - ``poll_timout`` sets the interval in seconds (int or float) for how > - ofte threads should check whether they are dismissed, while > waiting for - requests. > - > - """ > - for i in range(num_workers): > - self.workers.append(WorkerThread(self._requests_queue, > - self._results_queue, poll_timeout=poll_timeout)) > - > - def dismissWorkers(self, num_workers, do_join=False): > - """Tell num_workers worker threads to quit after their current > task.""" - dismiss_list = [] > - for i in range(min(num_workers, len(self.workers))): > - worker = self.workers.pop() > - worker.dismiss() > - dismiss_list.append(worker) > - > - if do_join: > - for worker in dismiss_list: > - worker.join() > - else: > - self.dismissedWorkers.extend(dismiss_list) > - > - def joinAllDismissedWorkers(self): > - """Perform Thread.join() on all worker threads that have been > dismissed. - """ > - for worker in self.dismissedWorkers: > - worker.join() > - self.dismissedWorkers = [] > - > - def putRequest(self, request, block=True, timeout=None): > - """Put work request into work queue and save its id for later.""" > - assert isinstance(request, WorkRequest) > - # don't reuse old work requests > - assert not getattr(request, 'exception', None) > - self._requests_queue.put(request, block, timeout) > - self.workRequests[request.requestID] = request > - > - def poll(self, block=False): > - """Process any new results in the queue.""" > - while True: > - # still results pending? > - if not self.workRequests: > - raise NoResultsPending > - # are there still workers to process remaining requests? > - elif block and not self.workers: > - raise NoWorkersAvailable > - try: > - # get back next results > - request, result = self._results_queue.get(block=block) > - # has an exception occured? > - if request.exception and request.exc_callback: > - request.exc_callback(request, result) > - # hand results to callback, if any > - if request.callback and not \ > - (request.exception and request.exc_callback): > - request.callback(request, result) > - del self.workRequests[request.requestID] > - except Queue.Empty: > - break > - > - def wait(self): > - """Wait for results, blocking until all have arrived.""" > - while 1: > - try: > - self.poll(True) > - except NoResultsPending: > - break > - > - > -################ > -# USAGE EXAMPLE > -################ > - > -if __name__ == '__main__': > - import random > - import time > - > - # the work the threads will have to do (rather trivial in our example) > - def do_something(data): > - time.sleep(random.randint(1,5)) > - result = round(random.random() * data, 5) > - # just to show off, we throw an exception once in a while > - if result > 5: > - raise RuntimeError("Something extraordinary happened!") > - return result > - > - # this will be called each time a result is available > - def print_result(request, result): > - print "**** Result from request #%s: %r" % (request.requestID, > result) - > - # this will be called when an exception occurs within a thread > - # this example exception handler does little more than the default > handler - def handle_exception(request, exc_info): > - if not isinstance(exc_info, tuple): > - # Something is seriously wrong... > - print request > - print exc_info > - raise SystemExit > - print "**** Exception occured in request #%s: %s" % \ > - (request.requestID, exc_info) > - > - # assemble the arguments for each job to a list... > - data = [random.randint(1,10) for i in range(20)] > - # ... and build a WorkRequest object for each item in data > - requests = makeRequests(do_something, data, print_result, > handle_exception) - # to use the default exception handler, uncomment > next line and comment out - # the preceding one. > - #requests = makeRequests(do_something, data, print_result) > - > - # or the other form of args_lists accepted by makeRequests: ((,), {}) > - data = [((random.randint(1,10),), {}) for i in range(20)] > - requests.extend( > - makeRequests(do_something, data, print_result, handle_exception) > - #makeRequests(do_something, data, print_result) > - # to use the default exception handler, uncomment next line and > comment - # out the preceding one. > - ) > - > - # we create a pool of 3 worker threads > - print "Creating thread pool with 3 worker threads." > - main = ThreadPool(3) > - > - # then we put the work requests in the queue... > - for req in requests: > - main.putRequest(req) > - print "Work request #%s added." % req.requestID > - # or shorter: > - # [main.putRequest(req) for req in requests] > + def __init__(self, thread_count): > + self.queue = Queue(thread_count) > + self.threads = [Worker(self.queue) for _ in xrange(thread_count)] > > - # ...and wait for the results to arrive in the result queue > - # by using ThreadPool.wait(). This would block until results for > - # all work requests have arrived: > - # main.wait() > + def add(self, func, args): > + """ Add a function and it's arguments to the queue as a tuple """ > + self.queue.put((func, args)) > > - # instead we can poll for results while doing something else: > - i = 0 > - while True: > - try: > - time.sleep(0.5) > - main.poll() > - print "Main thread working...", > - print "(active worker threads: %i)" % > (threading.activeCount()-1, ) - if i == 10: > - print "**** Adding 3 more worker threads..." > - main.createWorkers(3) > - if i == 20: > - print "**** Dismissing 2 worker threads..." > - main.dismissWorkers(2) > - i += 1 > - except KeyboardInterrupt: > - print "**** Interrupted!" > - break > - except NoResultsPending: > - print "**** No pending results." > - break > - if main.dismissedWorkers: > - print "Joining all dismissed worker threads..." > - main.joinAllDismissedWorkers() > + def join(self): > + """ Block until self.queue is empty """ > + self.queue.join() > diff --git a/framework/threads.py b/framework/threads.py > index fcc266e..ef037d1 100644 > --- a/framework/threads.py > +++ b/framework/threads.py > @@ -24,7 +24,7 @@ > from weakref import WeakKeyDictionary > import multiprocessing > > -from threadpool import ThreadPool, WorkRequest > +from threadpool import ThreadPool > from patterns import Singleton > from threading import RLock > > @@ -53,8 +53,8 @@ class ConcurrentTestPool(Singleton): > self.pool = ThreadPool(multiprocessing.cpu_count()) > > @synchronized_self > - def put(self, callable_, args=None, kwds=None): > - self.pool.putRequest(WorkRequest(callable_, args=args, kwds=kwds)) > + def put(self, callable, args=None): > + self.pool.add(callable, args) > > def join(self): > - self.pool.wait() > + self.pool.join()
I'm hoping to get someone to at least test this before it lands.
signature.asc
Description: This is a digitally signed message part.
_______________________________________________ Piglit mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/piglit
