On Tue, Jan 7, 2014 at 5:01 PM, Kenneth Graunke <[email protected]>wrote:
> On 01/03/2014 06:07 AM, Dylan Baker wrote: > > This patch simplifies threading in piglit by removing the hand-rolled > > threadpool, and instead using the Pool class from multiprocessing.dummy. > > This provides a map interface, allowing for very clear succinct code. > > > > The previous implementation ran all tests out of thread pools, a serial > > pool and a multi-threaded pool. This patch does the same thing for a > > couple of reasons. First, the obvious solution is to use the map() > > builtin for serial tests. However, map in python3 returns an iterator > > instead of a list so calling map(f, x) will not actually run f(x) until > > something tries to use those values. This would require considerable > > restructuring to work around. Second, that could easily be split out > > into another patch, and limits the number of changes in this patch. > > > > Multiproccessing.dummy is a wrapper around the Threading module, > > providing the multiproccessing API, but with threads instead of > > processes. > > > > Signed-off-by: Dylan Baker <[email protected]> > > --- > > framework/core.py | 47 +++++++++++++++++----------------- > > framework/threadpool.py | 67 > ------------------------------------------------- > > 2 files changed, 23 insertions(+), 91 deletions(-) > > delete mode 100644 framework/threadpool.py > > > > diff --git a/framework/core.py b/framework/core.py > > index 8bcda5b..1e06690 100644 > > --- a/framework/core.py > > +++ b/framework/core.py > > @@ -36,14 +36,13 @@ from log import log > > from cStringIO import StringIO > > from textwrap import dedent > > from threads import synchronized_self > > -import threading > > import multiprocessing > > +import multiprocessing.dummy > > try: > > import simplejson as json > > except ImportError: > > import json > > > > -from threadpool import ThreadPool > > import status > > > > __all__ = ['Environment', > > @@ -566,31 +565,31 @@ class TestProfile: > > > > self.prepare_test_list(env) > > > > - # If concurrency is set to 'all' run all tests out of a > concurrent > > - # threadpool, if it's none, then run evey test serially. > otherwise mix > > - # and match them > > + def run(pair): > > + """ Function to call test.execute from .map > > + > > + adds env and json_writer which are needed by Test.execute() > > + > > + """ > > + name, test = pair > > + test.execute(env, name, json_writer) > > You just defined a function called run()...IN a function called run(). > > *mind blown* > > probably shouldn't do that. That's even worse than doRun() and > doRunRun() or whatever we used to have. > > Would love to see a v2 with this fixed. > haha, ok. > > > + > > + # Multiprocessing.dummy is a wrapper around Threading that > provides a > > + # multiprocessing compatible API > > + single = multiprocessing.dummy.Pool(1) > > + multi = multiprocessing.dummy.Pool(multiprocessing.cpu_count()) > > You don't need multiprocessing.cpu_count() - that's the default. So you > can just do: > > multi = multiprocessing.dummy.Pool() > Okay, I'll add a comment to make it clear that that is the default since I assumed that there was no default > > > + > > if env.concurrent == "all": > > - pool = ThreadPool(multiprocessing.cpu_count()) > > - for (path, test) in self.test_list.items(): > > - pool.add(test.execute, (env, path, json_writer)) > > - pool.join() > > + multi.map(run, self.test_list.iteritems()) > > I'm unclear whether we want map, imap, or imap_unordered here. I guess > it seems to work. Still, thoughts? > I'm not sure either, we're running into the limits of my understanding of python's threads. Without digging into SO and reading up on it I suspect that imap_unordered would be the best. I'll do some experimentation and see what happens. > > > elif env.concurrent == "none": > > - pool = ThreadPool(1) > > - for (path, test) in self.test_list.items(): > > - pool.add(test.execute, (env, path, json_writer)) > > - pool.join() > > + single.map(run, self.test_list.iteritems()) > > else: > > - pool = ThreadPool(multiprocessing.cpu_count()) > > - for (path, test) in self.test_list.items(): > > - if test.runConcurrent: > > - pool.add(test.execute, (env, path, json_writer)) > > - pool.join() > > - > > - pool = ThreadPool(1) > > - for (path, test) in self.test_list.items(): > > - if not test.runConcurrent: > > - pool.add(test.execute, (env, path, json_writer)) > > - pool.join() > > + # Filter and return only thread safe tests to the threaded > pool > > + multi.map(run, (x for x in self.test_list.iteritems() if > > + x[1].runConcurrent)) > > + # Filter and return the non thread safe tests to the single > pool > > + single.map(run, (x for x in self.test_list.iteritems() if > not > > + x[1].runConcurrent)) > > > > def remove_test(self, test_path): > > """Remove a fully qualified test from the profile. > > diff --git a/framework/threadpool.py b/framework/threadpool.py > > deleted file mode 100644 > > index 5d1fc56..0000000 > > --- a/framework/threadpool.py > > +++ /dev/null > > @@ -1,67 +0,0 @@ > > -# Copyright (c) 2013 Intel Corporation > > -# > > -# Permission is hereby granted, free of charge, to any person obtaining > a > > -# copy of this software and associated documentation files (the > "Software"), > > -# to deal in the Software without restriction, including without > limitation > > -# the rights to use, copy, modify, merge, publish, distribute, > sublicense, > > -# and/or sell copies of the Software, and to permit persons to whom the > > -# Software is furnished to do so, subject to the following conditions: > > -# > > -# The above copyright notice and this permission notice (including the > next > > -# paragraph) shall be included in all copies or substantial portions of > the > > -# Software. > > -# > > -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, > EXPRESS OR > > -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF > MERCHANTABILITY, > > -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT > SHALL > > -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, > ARISING > > -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER > DEALINGS > > -# IN THE SOFTWARE. > > - > > -# This code is based on the MIT licensed code by Emilio Monti found > here: > > -# http://code.activestate.com/recipes/577187-python-thread-pool/ > > - > > -from Queue import Queue > > -from threading import Thread > > - > > - > > -class Worker(Thread): > > - """ > > - Simple worker thread > > - > > - This worker simply consumes tasks off of the queue until it is > empty and > > - then waits for more tasks. > > - """ > > - > > - def __init__(self, queue): > > - Thread.__init__(self) > > - self.queue = queue > > - self.daemon = True > > - self.start() > > - > > - def run(self): > > - """ This method is called in the constructor by self.start() """ > > - while True: > > - func, args = self.queue.get() > > - func(*args) # XXX: Does this need to be try/except-ed? > > - self.queue.task_done() > > - > > - > > -class ThreadPool(object): > > - """ > > - A simple ThreadPool class that maintains a Queue object and a set > of Worker > > - threads. > > - """ > > - > > - def __init__(self, thread_count): > > - self.queue = Queue(thread_count) > > - self.threads = [Worker(self.queue) for _ in > xrange(thread_count)] > > - > > - def add(self, func, args): > > - """ Add a function and it's arguments to the queue as a tuple > """ > > - self.queue.put((func, args)) > > - > > - def join(self): > > - """ Block until self.queue is empty """ > > - self.queue.join() > > > >
_______________________________________________ Piglit mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/piglit
