On Wednesday, January 08, 2014 02:16:39 PM Dylan Baker wrote: > On Tue, Jan 7, 2014 at 5:01 PM, Kenneth Graunke <[email protected]>wrote: > > On 01/03/2014 06:07 AM, Dylan Baker wrote: > > > This patch simplifies threading in piglit by removing the hand-rolled > > > threadpool, and instead using the Pool class from multiprocessing.dummy. > > > This provides a map interface, allowing for very clear succinct code. > > > > > > The previous implementation ran all tests out of thread pools, a serial > > > pool and a multi-threaded pool. This patch does the same thing for a > > > couple of reasons. First, the obvious solution is to use the map() > > > builtin for serial tests. However, map in python3 returns an iterator > > > instead of a list so calling map(f, x) will not actually run f(x) until > > > something tries to use those values. This would require considerable > > > restructuring to work around. Second, that could easily be split out > > > into another patch, and limits the number of changes in this patch. > > > > > > Multiproccessing.dummy is a wrapper around the Threading module, > > > providing the multiproccessing API, but with threads instead of > > > processes. > > > > > > Signed-off-by: Dylan Baker <[email protected]> > > > --- > > > > > > framework/core.py | 47 +++++++++++++++++----------------- > > > framework/threadpool.py | 67 > > > > ------------------------------------------------- > > > > > 2 files changed, 23 insertions(+), 91 deletions(-) > > > delete mode 100644 framework/threadpool.py > > > > > > diff --git a/framework/core.py b/framework/core.py > > > index 8bcda5b..1e06690 100644 > > > --- a/framework/core.py > > > +++ b/framework/core.py > > > @@ -36,14 +36,13 @@ from log import log > > > > > > from cStringIO import StringIO > > > from textwrap import dedent > > > from threads import synchronized_self > > > > > > -import threading > > > > > > import multiprocessing > > > > > > +import multiprocessing.dummy > > > > > > try: > > > import simplejson as json > > > > > > except ImportError: > > > import json > > > > > > -from threadpool import ThreadPool > > > > > > import status > > > > > > __all__ = ['Environment', > > > > > > @@ -566,31 +565,31 @@ class TestProfile: > > > self.prepare_test_list(env) > > > > > > - # If concurrency is set to 'all' run all tests out of a > > > > concurrent > > > > > - # threadpool, if it's none, then run evey test serially. > > > > otherwise mix > > > > > - # and match them > > > + def run(pair): > > > + """ Function to call test.execute from .map > > > + > > > + adds env and json_writer which are needed by Test.execute() > > > + > > > + """ > > > + name, test = pair > > > + test.execute(env, name, json_writer) > > > > You just defined a function called run()...IN a function called run(). > > > > *mind blown* > > > > probably shouldn't do that. That's even worse than doRun() and > > doRunRun() or whatever we used to have. > > > > Would love to see a v2 with this fixed. > > haha, ok. > > > > + > > > + # Multiprocessing.dummy is a wrapper around Threading that > > > > provides a > > > > > + # multiprocessing compatible API > > > + single = multiprocessing.dummy.Pool(1) > > > + multi = multiprocessing.dummy.Pool(multiprocessing.cpu_count()) > > > > You don't need multiprocessing.cpu_count() - that's the default. So you > > can just do: > > > > multi = multiprocessing.dummy.Pool() > > Okay, I'll add a comment to make it clear that that is the default since I > assumed that there was no default > > > > + > > > > > > if env.concurrent == "all": > > > - pool = ThreadPool(multiprocessing.cpu_count()) > > > - for (path, test) in self.test_list.items(): > > > - pool.add(test.execute, (env, path, json_writer)) > > > - pool.join() > > > + multi.map(run, self.test_list.iteritems()) > > > > I'm unclear whether we want map, imap, or imap_unordered here. I guess > > it seems to work. Still, thoughts? > > I'm not sure either, we're running into the limits of my understanding of > python's threads. Without digging into SO and reading up on it I suspect > that imap_unordered would be the best. I'll do some experimentation and see > what happens.
My poking at imap and friends leaves me believing that what we want is map(), imap and imap_unordered don't seem to run any faster, and require additional Pool.close() and Pool.join() calls (map joins by default). > > > > elif env.concurrent == "none": > > > - pool = ThreadPool(1) > > > - for (path, test) in self.test_list.items(): > > > - pool.add(test.execute, (env, path, json_writer)) > > > - pool.join() > > > + single.map(run, self.test_list.iteritems()) > > > > > > else: > > > - pool = ThreadPool(multiprocessing.cpu_count()) > > > - for (path, test) in self.test_list.items(): > > > - if test.runConcurrent: > > > - pool.add(test.execute, (env, path, json_writer)) > > > - pool.join() > > > - > > > - pool = ThreadPool(1) > > > - for (path, test) in self.test_list.items(): > > > - if not test.runConcurrent: > > > - pool.add(test.execute, (env, path, json_writer)) > > > - pool.join() > > > + # Filter and return only thread safe tests to the threaded > > > > pool > > > > > + multi.map(run, (x for x in self.test_list.iteritems() if > > > + x[1].runConcurrent)) > > > + # Filter and return the non thread safe tests to the single > > > > pool > > > > > + single.map(run, (x for x in self.test_list.iteritems() if > > > > not > > > > > + x[1].runConcurrent)) > > > > > > def remove_test(self, test_path): > > > """Remove a fully qualified test from the profile. > > > > > > diff --git a/framework/threadpool.py b/framework/threadpool.py > > > deleted file mode 100644 > > > index 5d1fc56..0000000 > > > --- a/framework/threadpool.py > > > +++ /dev/null > > > @@ -1,67 +0,0 @@ > > > -# Copyright (c) 2013 Intel Corporation > > > -# > > > -# Permission is hereby granted, free of charge, to any person obtaining > > > > a > > > > > -# copy of this software and associated documentation files (the > > > > "Software"), > > > > > -# to deal in the Software without restriction, including without > > > > limitation > > > > > -# the rights to use, copy, modify, merge, publish, distribute, > > > > sublicense, > > > > > -# and/or sell copies of the Software, and to permit persons to whom the > > > -# Software is furnished to do so, subject to the following conditions: > > > -# > > > -# The above copyright notice and this permission notice (including the > > > > next > > > > > -# paragraph) shall be included in all copies or substantial portions of > > > > the > > > > > -# Software. > > > -# > > > -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, > > > > EXPRESS OR > > > > > -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF > > > > MERCHANTABILITY, > > > > > -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT > > > > SHALL > > > > > -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > > > > OTHER > > > > > -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, > > > > ARISING > > > > > -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER > > > > DEALINGS > > > > > -# IN THE SOFTWARE. > > > - > > > -# This code is based on the MIT licensed code by Emilio Monti found > > > > here: > > > -# http://code.activestate.com/recipes/577187-python-thread-pool/ > > > - > > > -from Queue import Queue > > > -from threading import Thread > > > - > > > - > > > -class Worker(Thread): > > > - """ > > > - Simple worker thread > > > - > > > - This worker simply consumes tasks off of the queue until it is > > > > empty and > > > > > - then waits for more tasks. > > > - """ > > > - > > > - def __init__(self, queue): > > > - Thread.__init__(self) > > > - self.queue = queue > > > - self.daemon = True > > > - self.start() > > > - > > > - def run(self): > > > - """ This method is called in the constructor by self.start() > > > """ > > > - while True: > > > - func, args = self.queue.get() > > > - func(*args) # XXX: Does this need to be try/except-ed? > > > - self.queue.task_done() > > > - > > > - > > > -class ThreadPool(object): > > > - """ > > > - A simple ThreadPool class that maintains a Queue object and a set > > > > of Worker > > > > > - threads. > > > - """ > > > - > > > - def __init__(self, thread_count): > > > - self.queue = Queue(thread_count) > > > - self.threads = [Worker(self.queue) for _ in > > > > xrange(thread_count)] > > > > > - > > > - def add(self, func, args): > > > - """ Add a function and it's arguments to the queue as a tuple > > > > """ > > > > > - self.queue.put((func, args)) > > > - > > > - def join(self): > > > - """ Block until self.queue is empty """ > > > - self.queue.join() _______________________________________________ Piglit mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/piglit
