On 01/03/2014 06:07 AM, Dylan Baker wrote: > This patch simplifies threading in piglit by removing the hand-rolled > threadpool, and instead using the Pool class from multiprocessing.dummy. > This provides a map interface, allowing for very clear succinct code. > > The previous implementation ran all tests out of thread pools, a serial > pool and a multi-threaded pool. This patch does the same thing for a > couple of reasons. First, the obvious solution is to use the map() > builtin for serial tests. However, map in python3 returns an iterator > instead of a list so calling map(f, x) will not actually run f(x) until > something tries to use those values. This would require considerable > restructuring to work around. Second, that could easily be split out > into another patch, and limits the number of changes in this patch. > > Multiproccessing.dummy is a wrapper around the Threading module, > providing the multiproccessing API, but with threads instead of > processes. > > Signed-off-by: Dylan Baker <[email protected]> > --- > framework/core.py | 47 +++++++++++++++++----------------- > framework/threadpool.py | 67 > ------------------------------------------------- > 2 files changed, 23 insertions(+), 91 deletions(-) > delete mode 100644 framework/threadpool.py > > diff --git a/framework/core.py b/framework/core.py > index 8bcda5b..1e06690 100644 > --- a/framework/core.py > +++ b/framework/core.py > @@ -36,14 +36,13 @@ from log import log > from cStringIO import StringIO > from textwrap import dedent > from threads import synchronized_self > -import threading > import multiprocessing > +import multiprocessing.dummy > try: > import simplejson as json > except ImportError: > import json > > -from threadpool import ThreadPool > import status > > __all__ = ['Environment', > @@ -566,31 +565,31 @@ class TestProfile: > > self.prepare_test_list(env) > > - # If concurrency is set to 'all' run all tests out of a concurrent > - # threadpool, if it's none, then run evey test serially. otherwise > mix > - # and match them > + def run(pair): > + """ Function to call test.execute from .map > + > + adds env and json_writer which are needed by Test.execute() > + > + """ > + name, test = pair > + test.execute(env, name, json_writer)
You just defined a function called run()...IN a function called run(). *mind blown* probably shouldn't do that. That's even worse than doRun() and doRunRun() or whatever we used to have. Would love to see a v2 with this fixed. > + > + # Multiprocessing.dummy is a wrapper around Threading that provides a > + # multiprocessing compatible API > + single = multiprocessing.dummy.Pool(1) > + multi = multiprocessing.dummy.Pool(multiprocessing.cpu_count()) You don't need multiprocessing.cpu_count() - that's the default. So you can just do: multi = multiprocessing.dummy.Pool() > + > if env.concurrent == "all": > - pool = ThreadPool(multiprocessing.cpu_count()) > - for (path, test) in self.test_list.items(): > - pool.add(test.execute, (env, path, json_writer)) > - pool.join() > + multi.map(run, self.test_list.iteritems()) I'm unclear whether we want map, imap, or imap_unordered here. I guess it seems to work. Still, thoughts? > elif env.concurrent == "none": > - pool = ThreadPool(1) > - for (path, test) in self.test_list.items(): > - pool.add(test.execute, (env, path, json_writer)) > - pool.join() > + single.map(run, self.test_list.iteritems()) > else: > - pool = ThreadPool(multiprocessing.cpu_count()) > - for (path, test) in self.test_list.items(): > - if test.runConcurrent: > - pool.add(test.execute, (env, path, json_writer)) > - pool.join() > - > - pool = ThreadPool(1) > - for (path, test) in self.test_list.items(): > - if not test.runConcurrent: > - pool.add(test.execute, (env, path, json_writer)) > - pool.join() > + # Filter and return only thread safe tests to the threaded pool > + multi.map(run, (x for x in self.test_list.iteritems() if > + x[1].runConcurrent)) > + # Filter and return the non thread safe tests to the single pool > + single.map(run, (x for x in self.test_list.iteritems() if not > + x[1].runConcurrent)) > > def remove_test(self, test_path): > """Remove a fully qualified test from the profile. > diff --git a/framework/threadpool.py b/framework/threadpool.py > deleted file mode 100644 > index 5d1fc56..0000000 > --- a/framework/threadpool.py > +++ /dev/null > @@ -1,67 +0,0 @@ > -# Copyright (c) 2013 Intel Corporation > -# > -# Permission is hereby granted, free of charge, to any person obtaining a > -# copy of this software and associated documentation files (the "Software"), > -# to deal in the Software without restriction, including without limitation > -# the rights to use, copy, modify, merge, publish, distribute, sublicense, > -# and/or sell copies of the Software, and to permit persons to whom the > -# Software is furnished to do so, subject to the following conditions: > -# > -# The above copyright notice and this permission notice (including the next > -# paragraph) shall be included in all copies or substantial portions of the > -# Software. > -# > -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR > -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER > -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER > DEALINGS > -# IN THE SOFTWARE. > - > -# This code is based on the MIT licensed code by Emilio Monti found here: > -# http://code.activestate.com/recipes/577187-python-thread-pool/ > - > -from Queue import Queue > -from threading import Thread > - > - > -class Worker(Thread): > - """ > - Simple worker thread > - > - This worker simply consumes tasks off of the queue until it is empty and > - then waits for more tasks. > - """ > - > - def __init__(self, queue): > - Thread.__init__(self) > - self.queue = queue > - self.daemon = True > - self.start() > - > - def run(self): > - """ This method is called in the constructor by self.start() """ > - while True: > - func, args = self.queue.get() > - func(*args) # XXX: Does this need to be try/except-ed? > - self.queue.task_done() > - > - > -class ThreadPool(object): > - """ > - A simple ThreadPool class that maintains a Queue object and a set of > Worker > - threads. > - """ > - > - def __init__(self, thread_count): > - self.queue = Queue(thread_count) > - self.threads = [Worker(self.queue) for _ in xrange(thread_count)] > - > - def add(self, func, args): > - """ Add a function and it's arguments to the queue as a tuple """ > - self.queue.put((func, args)) > - > - def join(self): > - """ Block until self.queue is empty """ > - self.queue.join() > _______________________________________________ Piglit mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/piglit
