On 09/16/2013 09:09 AM, Dylan Baker wrote: [snip] > +# Copyright (c) 2013 Intel Corporation > +# > +# Permission is hereby granted, free of charge, to any person obtaining a > +# copy of this software and associated documentation files (the "Software"), > +# to deal in the Software without restriction, including without limitation > +# the rights to use, copy, modify, merge, publish, distribute, sublicense, > +# and/or sell copies of the Software, and to permit persons to whom the > +# Software is furnished to do so, subject to the following conditions: > +# > +# The above copyright notice and this permission notice (including the next > +# paragraph) shall be included in all copies or substantial portions of the > +# Software. > +# > +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR > +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER > +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER > DEALINGS > +# IN THE SOFTWARE. > + > +# This code is based on the MIT licensed code here: > +# http://code.activestate.com/recipes/577187-python-thread-pool/
I might say: # This code is based on Emilio Monti's MIT licensed snippet from: # http://code.activestate.com/recipes/577187-python-thread-pool/ I was going to say you should add his copyright, but you've rewritten most of the code anyway, so I don't think you really need to. Either way, this looks great. Reviewed-and-tested-by: Kenneth Graunke <[email protected]> > +from Queue import Queue > +from threading import Thread > + > + > +class Worker(Thread): > """ > - traceback.print_exception(*exc_info) > - > - > -# utility functions > -def makeRequests(callable_, args_list, callback=None, > - exc_callback=_handle_thread_exception): > - """Create several work requests for same callable with different > arguments. > - > - Convenience function for creating several work requests for the same > - callable where each invocation of the callable receives different values > - for its arguments. > - > - ``args_list`` contains the parameters for each invocation of callable. > - Each item in ``args_list`` should be either a 2-item tuple of the list of > - positional arguments and a dictionary of keyword arguments or a single, > - non-tuple argument. > - > - See docstring for ``WorkRequest`` for info on ``callback`` and > - ``exc_callback``. > + Simple worker thread > > + This worker simply consumes tasks off of the queue until it is empty and > + then waits for more tasks. > """ > - requests = [] > - for item in args_list: > - if isinstance(item, tuple): > - requests.append( > - WorkRequest(callable_, item[0], item[1], callback=callback, > - exc_callback=exc_callback) > - ) > - else: > - requests.append( > - WorkRequest(callable_, [item], None, callback=callback, > - exc_callback=exc_callback) > - ) > - return requests > - > - > -# classes > -class WorkerThread(threading.Thread): > - """Background thread connected to the requests/results queues. > - > - A worker thread sits in the background and picks up work requests from > - one queue and puts the results in another until it is dismissed. > > - """ > - > - def __init__(self, requests_queue, results_queue, poll_timeout=5, > **kwds): > - """Set up thread in daemonic mode and start it immediatedly. > - > - ``requests_queue`` and ``results_queue`` are instances of > - ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a > new > - worker thread. > - > - """ > - threading.Thread.__init__(self, **kwds) > - self.setDaemon(1) > - self._requests_queue = requests_queue > - self._results_queue = results_queue > - self._poll_timeout = poll_timeout > - self._dismissed = threading.Event() > + def __init__(self, queue): > + Thread.__init__(self) > + self.queue = queue > + self.daemon = True > self.start() > > def run(self): > - """Repeatedly process the job queue until told to exit.""" > + """ This method is called in the constructor by self.start() """ > while True: > - if self._dismissed.isSet(): > - # we are dismissed, break out of loop > - break > - # get next work request. If we don't get a new request from the > - # queue after self._poll_timout seconds, we jump to the start of > - # the while loop again, to give the thread a chance to exit. > - try: > - request = self._requests_queue.get(True, self._poll_timeout) > - except Queue.Empty: > - continue > - else: > - if self._dismissed.isSet(): > - # we are dismissed, put back request in queue and exit > loop > - self._requests_queue.put(request) > - break > - try: > - result = request.callable(*request.args, **request.kwds) > - self._results_queue.put((request, result)) > - except: > - request.exception = True > - self._results_queue.put((request, sys.exc_info())) > + func, args = self.queue.get() > + func(*args) # XXX: Does this need to be try/except-ed? > + self.queue.task_done() > > - def dismiss(self): > - """Sets a flag to tell the thread to exit when done with current > job.""" > - self._dismissed.set() > - > - > -class WorkRequest: > - """A request to execute a callable for putting in the request queue > later. > - > - See the module function ``makeRequests`` for the common case > - where you want to build several ``WorkRequest`` objects for the same > - callable but with different arguments for each call. > > +class ThreadPool(object): > """ > - > - def __init__(self, callable_, args=None, kwds=None, requestID=None, > - callback=None, exc_callback=_handle_thread_exception): > - """Create a work request for a callable and attach callbacks. > - > - A work request consists of the a callable to be executed by a > - worker thread, a list of positional arguments, a dictionary > - of keyword arguments. > - > - A ``callback`` function can be specified, that is called when the > - results of the request are picked up from the result queue. It must > - accept two anonymous arguments, the ``WorkRequest`` object and the > - results of the callable, in that order. If you want to pass > additional > - information to the callback, just stick it on the request object. > - > - You can also give custom callback for when an exception occurs with > - the ``exc_callback`` keyword parameter. It should also accept two > - anonymous arguments, the ``WorkRequest`` and a tuple with the > exception > - details as returned by ``sys.exc_info()``. The default implementation > - of this callback just prints the exception info via > - ``traceback.print_exception``. If you want no exception handler > - callback, just pass in ``None``. > - > - ``requestID``, if given, must be hashable since it is used by > - ``ThreadPool`` object to store the results of that work request in a > - dictionary. It defaults to the return value of ``id(self)``. > - > - """ > - if requestID is None: > - self.requestID = id(self) > - else: > - try: > - self.requestID = hash(requestID) > - except TypeError: > - raise TypeError("requestID must be hashable.") > - self.exception = False > - self.callback = callback > - self.exc_callback = exc_callback > - self.callable = callable_ > - self.args = args or [] > - self.kwds = kwds or {} > - > - def __str__(self): > - return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ > - (self.requestID, self.args, self.kwds, self.exception) > - > -class ThreadPool: > - """A thread pool, distributing work requests and collecting results. > - > - See the module docstring for more information. > - > + A simple ThreadPool class that maintains a Queue object and a set of > Worker > + threads. > """ > > - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): > - """Set up the thread pool and start num_workers worker threads. > - > - ``num_workers`` is the number of worker threads to start initially. > - > - If ``q_size > 0`` the size of the work *request queue* is limited and > - the thread pool blocks when the queue is full and it tries to put > - more work requests in it (see ``putRequest`` method), unless you also > - use a positive ``timeout`` value for ``putRequest``. > - > - If ``resq_size > 0`` the size of the *results queue* is limited and > the > - worker threads will block when the queue is full and they try to put > - new results in it. > - > - .. warning: > - If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is > - the possibilty of a deadlock, when the results queue is not > pulled > - regularly and too many jobs are put in the work requests queue. > - To prevent this, always set ``timeout > 0`` when calling > - ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. > - > - """ > - self._requests_queue = Queue.Queue(q_size) > - self._results_queue = Queue.Queue(resq_size) > - self.workers = [] > - self.dismissedWorkers = [] > - self.workRequests = {} > - self.createWorkers(num_workers, poll_timeout) > - > - def createWorkers(self, num_workers, poll_timeout=5): > - """Add num_workers worker threads to the pool. > - > - ``poll_timout`` sets the interval in seconds (int or float) for how > - ofte threads should check whether they are dismissed, while waiting > for > - requests. > - > - """ > - for i in range(num_workers): > - self.workers.append(WorkerThread(self._requests_queue, > - self._results_queue, poll_timeout=poll_timeout)) > - > - def dismissWorkers(self, num_workers, do_join=False): > - """Tell num_workers worker threads to quit after their current > task.""" > - dismiss_list = [] > - for i in range(min(num_workers, len(self.workers))): > - worker = self.workers.pop() > - worker.dismiss() > - dismiss_list.append(worker) > - > - if do_join: > - for worker in dismiss_list: > - worker.join() > - else: > - self.dismissedWorkers.extend(dismiss_list) > - > - def joinAllDismissedWorkers(self): > - """Perform Thread.join() on all worker threads that have been > dismissed. > - """ > - for worker in self.dismissedWorkers: > - worker.join() > - self.dismissedWorkers = [] > - > - def putRequest(self, request, block=True, timeout=None): > - """Put work request into work queue and save its id for later.""" > - assert isinstance(request, WorkRequest) > - # don't reuse old work requests > - assert not getattr(request, 'exception', None) > - self._requests_queue.put(request, block, timeout) > - self.workRequests[request.requestID] = request > - > - def poll(self, block=False): > - """Process any new results in the queue.""" > - while True: > - # still results pending? > - if not self.workRequests: > - raise NoResultsPending > - # are there still workers to process remaining requests? > - elif block and not self.workers: > - raise NoWorkersAvailable > - try: > - # get back next results > - request, result = self._results_queue.get(block=block) > - # has an exception occured? > - if request.exception and request.exc_callback: > - request.exc_callback(request, result) > - # hand results to callback, if any > - if request.callback and not \ > - (request.exception and request.exc_callback): > - request.callback(request, result) > - del self.workRequests[request.requestID] > - except Queue.Empty: > - break > - > - def wait(self): > - """Wait for results, blocking until all have arrived.""" > - while 1: > - try: > - self.poll(True) > - except NoResultsPending: > - break > - > - > -################ > -# USAGE EXAMPLE > -################ > - > -if __name__ == '__main__': > - import random > - import time > - > - # the work the threads will have to do (rather trivial in our example) > - def do_something(data): > - time.sleep(random.randint(1,5)) > - result = round(random.random() * data, 5) > - # just to show off, we throw an exception once in a while > - if result > 5: > - raise RuntimeError("Something extraordinary happened!") > - return result > - > - # this will be called each time a result is available > - def print_result(request, result): > - print "**** Result from request #%s: %r" % (request.requestID, > result) > - > - # this will be called when an exception occurs within a thread > - # this example exception handler does little more than the default > handler > - def handle_exception(request, exc_info): > - if not isinstance(exc_info, tuple): > - # Something is seriously wrong... > - print request > - print exc_info > - raise SystemExit > - print "**** Exception occured in request #%s: %s" % \ > - (request.requestID, exc_info) > - > - # assemble the arguments for each job to a list... > - data = [random.randint(1,10) for i in range(20)] > - # ... and build a WorkRequest object for each item in data > - requests = makeRequests(do_something, data, print_result, > handle_exception) > - # to use the default exception handler, uncomment next line and comment > out > - # the preceding one. > - #requests = makeRequests(do_something, data, print_result) > - > - # or the other form of args_lists accepted by makeRequests: ((,), {}) > - data = [((random.randint(1,10),), {}) for i in range(20)] > - requests.extend( > - makeRequests(do_something, data, print_result, handle_exception) > - #makeRequests(do_something, data, print_result) > - # to use the default exception handler, uncomment next line and > comment > - # out the preceding one. > - ) > - > - # we create a pool of 3 worker threads > - print "Creating thread pool with 3 worker threads." > - main = ThreadPool(3) > - > - # then we put the work requests in the queue... > - for req in requests: > - main.putRequest(req) > - print "Work request #%s added." % req.requestID > - # or shorter: > - # [main.putRequest(req) for req in requests] > + def __init__(self, thread_count): > + self.queue = Queue(thread_count) > + self.threads = [Worker(self.queue) for _ in xrange(thread_count)] > > - # ...and wait for the results to arrive in the result queue > - # by using ThreadPool.wait(). This would block until results for > - # all work requests have arrived: > - # main.wait() > + def add(self, func, args): > + """ Add a function and it's arguments to the queue as a tuple """ > + self.queue.put((func, args)) > > - # instead we can poll for results while doing something else: > - i = 0 > - while True: > - try: > - time.sleep(0.5) > - main.poll() > - print "Main thread working...", > - print "(active worker threads: %i)" % > (threading.activeCount()-1, ) > - if i == 10: > - print "**** Adding 3 more worker threads..." > - main.createWorkers(3) > - if i == 20: > - print "**** Dismissing 2 worker threads..." > - main.dismissWorkers(2) > - i += 1 > - except KeyboardInterrupt: > - print "**** Interrupted!" > - break > - except NoResultsPending: > - print "**** No pending results." > - break > - if main.dismissedWorkers: > - print "Joining all dismissed worker threads..." > - main.joinAllDismissedWorkers() > + def join(self): > + """ Block until self.queue is empty """ > + self.queue.join() > diff --git a/framework/threads.py b/framework/threads.py > index fcc266e..ef037d1 100644 > --- a/framework/threads.py > +++ b/framework/threads.py > @@ -24,7 +24,7 @@ > from weakref import WeakKeyDictionary > import multiprocessing > > -from threadpool import ThreadPool, WorkRequest > +from threadpool import ThreadPool > from patterns import Singleton > from threading import RLock > > @@ -53,8 +53,8 @@ class ConcurrentTestPool(Singleton): > self.pool = ThreadPool(multiprocessing.cpu_count()) > > @synchronized_self > - def put(self, callable_, args=None, kwds=None): > - self.pool.putRequest(WorkRequest(callable_, args=args, kwds=kwds)) > + def put(self, callable, args=None): > + self.pool.add(callable, args) > > def join(self): > - self.pool.wait() > + self.pool.join() > _______________________________________________ Piglit mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/piglit
