Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py (122496 => 122497)
--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py 2012-07-12 20:19:04 UTC (rev 122496)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py 2012-07-12 20:22:46 UTC (rev 122497)
@@ -31,8 +31,6 @@
import cPickle
import logging
import multiprocessing
-import optparse
-import os
import Queue
import sys
import time
@@ -47,13 +45,6 @@
_log = logging.getLogger(__name__)
-#
-# Topic names for Manager <-> Worker messaging
-#
-MANAGER_TOPIC = 'managers'
-ANY_WORKER_TOPIC = 'workers'
-
-
def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
"""Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host)
@@ -65,9 +56,17 @@
self._worker_factory = worker_factory
self._num_workers = num_workers
self._worker_startup_delay_secs = worker_startup_delay_secs
- self._worker_states = {}
+ self._workers = []
+ self._workers_stopped = set()
self._host = host
- self.name = 'manager'
+ self._name = 'manager'
+ self._running_inline = (self._num_workers == 1)
+ if self._running_inline:
+ self._messages_to_worker = Queue.Queue()
+ self._messages_to_manager = Queue.Queue()
+ else:
+ self._messages_to_worker = multiprocessing.Queue()
+ self._messages_to_manager = multiprocessing.Queue()
def __enter__(self):
return self
@@ -77,63 +76,61 @@
return False
def run(self, shards):
- manager_connection = _get_broker(self._num_workers, self, self._worker_factory, self._host)
- for worker_number in xrange(self._num_workers):
- worker_connection = manager_connection.start_worker(worker_number)
- worker_state = _WorkerState(worker_number, worker_connection)
- self._worker_states[worker_connection.name] = worker_state
- time.sleep(self._worker_startup_delay_secs)
+ """Posts a list of messages to the pool and waits for them to complete."""
+ for message in shards:
+ self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
- messages = list(shards)
- for message in messages:
- manager_connection.post_message(*message)
-
for _ in xrange(self._num_workers):
- manager_connection.post_message('stop')
+ self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
- self.wait(manager_connection)
+ self.wait()
- def wait(self, manager_connection):
+ def _start_workers(self):
+ assert not self._workers
+ self._workers_stopped = set()
+ host = None
+ if self._running_inline or self._can_pickle(self._host):
+ host = self._host
+
+ for worker_number in xrange(self._num_workers):
+ worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None)
+ self._workers.append(worker)
+ worker.start()
+ if self._worker_startup_delay_secs:
+ time.sleep(self._worker_startup_delay_secs)
+
+ def wait(self):
try:
- while not self.is_done():
- manager_connection.run_message_loop(delay_secs=1.0)
+ self._start_workers()
+ if self._running_inline:
+ self._workers[0].run()
+ self._loop(block=False)
+ else:
+ self._loop(block=True)
finally:
self._close()
- def is_done(self):
- worker_states = self._worker_states.values()
- return worker_states and all(worker_state.done for worker_state in worker_states)
-
def _close(self):
- for worker_state in self._worker_states.values():
- if worker_state.worker_connection.is_alive():
- worker_state.worker_connection.cancel()
- _log.debug('Waiting for worker %d to exit' % worker_state.number)
- worker_state.worker_connection.join(5.0)
- if worker_state.worker_connection.is_alive():
- _log.error('Worker %d did not exit in time.' % worker_state.number)
+ for worker in self._workers:
+ if worker.is_alive():
+ worker.terminate()
+ worker.join()
+ self._workers = []
+ if not self._running_inline:
+ if self._messages_to_worker:
+ self._messages_to_worker.close()
+ self._messages_to_worker = None
+ if self._messages_to_manager:
+ self._messages_to_manager.close()
+ self._messages_to_manager = None
- def handle_done(self, source, log_messages=None):
- worker_state = self._worker_states[source]
- worker_state.done = True
- self._log_messages(log_messages)
-
- def handle_started_test(self, *args):
- self._caller.handle('started_test', *args)
-
- def handle_finished_test(self, *args):
- self._caller.handle('finished_test', *args)
-
- def handle_finished_test_list(self, *args):
- self._caller.handle('finished_test_list', *args)
-
- def handle_exception(self, *args):
- self._handle_worker_exception(*args)
-
def _log_messages(self, messages):
for message in messages:
logging.root.handle(message)
+ def _handle_done(self, source):
+ self._workers_stopped.add(source)
+
@staticmethod
def _handle_worker_exception(source, exception_type, exception_value, stack):
if exception_type == KeyboardInterrupt:
@@ -142,433 +139,161 @@
source, exception_value.__class__.__name__, str(exception_value)))
raise WorkerException(str(exception_value))
+ def _can_pickle(self, host):
+ try:
+ cPickle.dumps(host)
+ return True
+ except TypeError:
+ return False
-class _WorkerState(object):
- def __init__(self, number, worker_connection):
- self.worker_connection = worker_connection
- self.number = number
- self.done = False
+ def _loop(self, block):
+ try:
+ while True:
+ if len(self._workers_stopped) == len(self._workers):
+ block = False
+ message = self._messages_to_manager.get(block)
+ self._log_messages(message.logs)
+ if message.from_user:
+ self._caller.handle(message.name, message.src, *message.args)
+ continue
+ method = getattr(self, '_handle_' + message.name)
+ assert method, 'bad message %s' % repr(message)
+ method(message.src, *message.args)
+ except Queue.Empty:
+ pass
- def __repr__(self):
- return "_WorkerState(" + str(self.__dict__) + ")"
-
-def _get_broker(max_workers, client, worker_factory, host=None):
- """Return a connection to a manager/worker message_broker
-
- Args:
- max_workers - max # of workers to run concurrently.
- client - BrokerClient implementation to dispatch
- replies to.
- worker_factory: factory method for creating objects that implement the Worker interface.
- host: optional picklable host object that can be passed to workers for testing.
- Returns:
- A handle to an object that will talk to a message broker configured
- for the normal manager/worker communication."""
- if max_workers == 1:
- queue_class = Queue.Queue
- manager_class = _InlineManager
- else:
- queue_class = multiprocessing.Queue
- manager_class = _MultiProcessManager
-
- broker = _Broker(queue_class)
- return manager_class(broker, client, worker_factory, host)
-
-
class WorkerException(Exception):
"""Raised when we receive an unexpected/unknown exception from a worker."""
pass
-class BrokerClient(object):
- """Abstract base class / interface that all message broker clients must
- implement. In addition to the methods below, by convention clients
- implement routines of the signature type
-
- handle_MESSAGE_NAME(self, src, ...):
-
- where MESSAGE_NAME matches the string passed to post_message(), and
- src indicates the name of the sender. If the message contains values in
- the message body, those will be provided as optparams."""
-
- def is_done(self):
- """Called from inside run_message_loop() to indicate whether to exit."""
- raise NotImplementedError
-
- def name(self):
- """Return a name that identifies the client."""
- raise NotImplementedError
-
-
-class _Broker(object):
- """Brokers provide the basic model of a set of topics. Clients can post a
- message to any topic using post_message(), and can process messages on one
- topic at a time using run_message_loop()."""
-
- def __init__(self, queue_maker):
- """Args:
- queue_maker: a factory method that returns objects implementing a
- Queue interface (put()/get()).
- """
- self._queue_maker = queue_maker
- self._topics = {}
-
- def __del__(self):
- self.cleanup()
-
- def cleanup(self):
- for queue in self._topics.values():
- if hasattr(queue, 'close'):
- queue.close()
- self._topics = {}
-
- def add_topic(self, topic_name):
- if topic_name not in self._topics:
- self._topics[topic_name] = self._queue_maker()
-
- def _get_queue_for_topic(self, topic_name):
- return self._topics[topic_name]
-
- def post_message(self, client, topic_name, message_name, *message_args):
- """Post a message to the appropriate topic name.
-
- Messages have a name and a tuple of optional arguments. Both must be picklable."""
- message = _Message(client.name, topic_name, message_name, message_args)
- queue = self._get_queue_for_topic(topic_name)
- queue.put(_Message.dumps(message))
-
- def run_message_loop(self, topic_name, client, delay_secs=None):
- """Loop processing messages until client.is_done() or delay passes.
-
- To run indefinitely, set delay_secs to None."""
- assert delay_secs is None or delay_secs > 0
- self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)
-
- def run_all_pending(self, topic_name, client):
- """Process messages until client.is_done() or caller would block."""
- self._run_loop(topic_name, client, block=False, delay_secs=None)
-
- def _run_loop(self, topic_name, client, block, delay_secs):
- queue = self._get_queue_for_topic(topic_name)
- while not client.is_done():
- try:
- s = queue.get(block, delay_secs)
- except Queue.Empty:
- return
- msg = _Message.loads(s)
- self._dispatch_message(msg, client)
-
- def _dispatch_message(self, message, client):
- if not hasattr(client, 'handle_' + message.name):
- raise ValueError(
- "%s: received message '%s' it couldn't handle" %
- (client.name, message.name))
- optargs = message.args
- message_handler = getattr(client, 'handle_' + message.name)
- message_handler(message.src, *optargs)
-
-
class _Message(object):
- @staticmethod
- def loads(string_value):
- obj = cPickle.loads(string_value)
- assert(isinstance(obj, _Message))
- return obj
-
- def __init__(self, src, topic_name, message_name, message_args):
+ def __init__(self, src, message_name, message_args, from_user, logs):
self.src = ""
- self.topic_name = topic_name
self.name = message_name
self.args = message_args
+ self.from_user = from_user
+ self.logs = logs
- def dumps(self):
- return cPickle.dumps(self)
-
def __repr__(self):
- return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
- (self.src, self.topic_name, self.name))
+ return '_Message(src="" name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
-class _BrokerConnection(object):
- """_BrokerConnection provides a connection-oriented facade on top of a
- Broker, so that callers don't have to repeatedly pass the same topic
- names over and over."""
+class _Worker(multiprocessing.Process):
+ def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager):
+ super(_Worker, self).__init__()
+ self.host = host
+ self.worker_number = worker_number
+ self.name = 'worker/%d' % worker_number
+ self.log_messages = []
+ self._running_inline = running_inline
+ self._manager = manager
- def __init__(self, broker, client, run_topic, post_topic):
- """Create a _BrokerConnection on top of a _Broker. Note that the _Broker
- is passed in rather than created so that a single _Broker can be used
- by multiple _BrokerConnections."""
- self._broker = broker
- self._client = client
- self._post_topic = post_topic
- self._run_topic = run_topic
- broker.add_topic(run_topic)
- broker.add_topic(post_topic)
+ self._messages_to_manager = messages_to_manager
+ self._messages_to_worker = messages_to_worker
+ self._worker = worker_factory(self)
+ self._logger = None
+ self._log_handler = None
- def cleanup(self):
- self._broker.cleanup()
- self._broker = None
+ def terminate(self):
+ if self._worker:
+ self._worker.stop()
+ self._worker = None
+ if self.is_alive():
+ super(_Worker, self).terminate()
- def run_message_loop(self, delay_secs=None):
- self._broker.run_message_loop(self._run_topic, self._client, delay_secs)
+ def _close(self):
+ if self._log_handler and self._logger:
+ self._logger.removeHandler(self._log_handler)
+ self._log_handler = None
+ self._logger = None
- def post_message(self, message_name, *message_args):
- self._broker.post_message(self._client, self._post_topic,
- message_name, *message_args)
+ def start(self):
+ if not self._running_inline:
+ super(_Worker, self).start()
- def raise_exception(self, exc_info):
- # Since tracebacks aren't picklable, send the extracted stack instead,
- # but at least log the full traceback.
- exception_type, exception_value, exception_traceback = sys.exc_info()
- stack_utils.log_traceback(_log.error, exception_traceback)
- stack = traceback.extract_tb(exception_traceback)
- self._broker.post_message(self._client, self._post_topic, 'exception', exception_type, exception_value, stack)
+ def run(self):
+ if not self.host:
+ self.host = Host()
+ if not self._running_inline:
+ self._set_up_logging()
-
-class AbstractWorker(BrokerClient):
- def __init__(self, worker_connection, worker_number):
- BrokerClient.__init__(self)
- self.worker = None
- self._worker_connection = worker_connection
- self.worker_number = worker_number
- self.name = 'worker/%d' % worker_number
- self._done = False
- self._canceled = False
- self._options = optparse.Values({'verbose': False})
- self.host = None
-
- def is_done(self):
- return self._done or self._canceled
-
- def stop_handling_messages(self):
- self._done = True
-
- def run(self, host):
- """Callback for the worker to start executing. Typically does any
- remaining initialization and then calls broker_connection.run_message_loop()."""
+ worker = self._worker
exception_msg = ""
- self.host = host
+ _log.debug("%s starting" % self.name)
- self.worker.start()
- _log.debug('%s starting' % self.name)
-
try:
- self._worker_connection.run_message_loop()
- if not self.is_done():
- raise AssertionError("%s: ran out of messages in worker queue."
- % self.name)
- except KeyboardInterrupt:
+ worker.start()
+ while True:
+ message = self._messages_to_worker.get()
+ if message.from_user:
+ worker.handle(message.name, message.src, *message.args)
+ self.yield_to_caller()
+ else:
+ assert message.name == 'stop', 'bad message %s' % repr(message)
+ break
+
+ except Queue.Empty:
+ assert False, '%s: ran out of messages in worker queue.' % self.name
+ except KeyboardInterrupt, e:
exception_msg = ", interrupted"
- self._worker_connection.raise_exception(sys.exc_info())
- except:
- exception_msg = ", exception raised"
- self._worker_connection.raise_exception(sys.exc_info())
+ if not self._running_inline:
+ _log.warning('worker exception')
+ self._raise(sys.exc_info())
+ raise
+ except Exception, e:
+ exception_msg = ", exception raised: %s" % str(e)
+ if not self._running_inline:
+ self._raise(sys.exc_info())
+ raise
finally:
- _log.debug("%s done with message loop%s" % (self.name, exception_msg))
+ _log.debug("%s exiting%s" % (self.name, exception_msg))
try:
- self.worker.stop()
+ worker.stop()
finally:
- # Make sure we post a done so that we can flush the log messages
- # and clean up properly even if we raise an exception in worker.cleanup().
- self._worker_connection.post_message('done')
+ self._post(name='done', args=(), from_user=False)
+ self._close()
- def handle_stop(self, source):
- self._done = True
+ def post(self, name, *args):
+ self._post(name, args, from_user=True)
- def handle_test_list(self, source, list_name, test_list):
- self.worker.handle('test_list', source, list_name, test_list)
-
- def cancel(self):
- """Called when possible to indicate to the worker to stop processing
- messages and shut down. Note that workers may be stopped without this
- method being called, so clients should not rely solely on this."""
- self._canceled = True
-
def yield_to_caller(self):
- self._worker_connection.yield_to_broker()
+ if self._running_inline:
+ self._manager._loop(block=False)
- def post(self, *args):
- self._worker_connection.post_message(*args)
+ def _post(self, name, args, from_user):
+ log_messages = self.log_messages
+ self.log_messages = []
+ self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
+ def _raise(self, exc_info):
+ # Since tracebacks aren't picklable, send the extracted stack instead.
+ exception_type, exception_value, exception_traceback = exc_info
+ stack_utils.log_traceback(_log.error, exception_traceback)
+ stack = traceback.extract_tb(exception_traceback)
+ self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
-class _ManagerConnection(_BrokerConnection):
- def __init__(self, broker, client, worker_factory, host):
- _BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC)
- self._worker_factory = worker_factory
- self._host = host
+ def _set_up_logging(self):
+ self._logger = logging.getLogger()
- def start_worker(self, worker_number):
- raise NotImplementedError
-
-
-class _InlineManager(_ManagerConnection):
- def __init__(self, broker, client, worker_factory, host):
- _ManagerConnection.__init__(self, broker, client, worker_factory, host)
- self._inline_worker = None
-
- def start_worker(self, worker_number):
- host = self._host
- self._inline_worker = _InlineWorkerConnection(host, self._broker, self._client, self._worker_factory, worker_number)
- return self._inline_worker
-
- def run_message_loop(self, delay_secs=None):
- # Note that delay_secs is ignored in this case since we can't easily
- # implement it.
- self._inline_worker.run()
- self._broker.run_all_pending(MANAGER_TOPIC, self._client)
-
-
-class _MultiProcessManager(_ManagerConnection):
- def _can_pickle_host(self):
- try:
- cPickle.dumps(self._host)
- return True
- except TypeError:
- return False
-
- def start_worker(self, worker_number):
- host = None
- if self._can_pickle_host():
- host = self._host
- worker_connection = _MultiProcessWorkerConnection(host, self._broker, self._worker_factory, worker_number)
- worker_connection.start()
- return worker_connection
-
-
-class _WorkerConnection(_BrokerConnection):
- def __init__(self, host, broker, worker_factory, worker_number):
- # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the
- # actual Worker (created by worker_factory) is very confusing, but this all gets better when
- # _WorkerConnection and AbstractWorker get merged.
- self._client = AbstractWorker(self, worker_number)
- self._worker = worker_factory(self._client)
- self._client.worker = self._worker
- self._host = host
- self._log_messages = []
- self._logger = None
- self._log_handler = None
- self.name = self._client.name
- _BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC)
-
- def cancel(self):
- raise NotImplementedError
-
- def is_alive(self):
- raise NotImplementedError
-
- def join(self, timeout):
- raise NotImplementedError
-
- # FIXME: rename to yield_to_caller().
- def yield_to_broker(self):
- pass
-
- def post_message(self, *args):
- # FIXME: This is a hack until we can remove the log_messages arg from the manager.
- if args[0] in ('finished_test', 'done'):
- log_messages = self._log_messages
- self._log_messages = []
- args = args + tuple([log_messages])
- super(_WorkerConnection, self).post_message(*args)
-
- def set_up_logging(self):
- self._logger = logging.root
# The unix multiprocessing implementation clones the MeteredStream log handler
# into the child process, so we need to remove it to avoid duplicate logging.
for h in self._logger.handlers:
# log handlers don't have names until python 2.7.
- if getattr(h, 'name', '') == metered_stream.LOG_HANDLER_NAME:
+ # FIXME: get webkitpy.test.printer from a constant as well.
+ if getattr(h, 'name', '') in (metered_stream.LOG_HANDLER_NAME, 'webkitpy.test.printer'):
self._logger.removeHandler(h)
break
- self._logger.setLevel(logging.DEBUG if self._client._options.verbose else logging.INFO)
+
self._log_handler = _WorkerLogHandler(self)
self._logger.addHandler(self._log_handler)
- def clean_up_logging(self):
- if self._log_handler and self._logger:
- self._logger.removeHandler(self._log_handler)
- self._log_handler = None
- self._logger = None
-
-class _InlineWorkerConnection(_WorkerConnection):
- def __init__(self, host, broker, manager_client, worker_factory, worker_number):
- _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
- self._alive = False
- self._manager_client = manager_client
-
- def cancel(self):
- self._client.cancel()
-
- def is_alive(self):
- return self._alive
-
- def join(self, timeout):
- assert not self._alive
-
- def run(self):
- self._alive = True
- try:
- self._client.run(self._host)
- finally:
- self._alive = False
-
- def yield_to_broker(self):
- self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
-
- def raise_exception(self, exc_info):
- # Since the worker is in the same process as the manager, we can
- # raise the exception directly, rather than having to send it through
- # the queue. This allows us to preserve the traceback, but we log
- # it anyway for consistency with the multiprocess case.
- exception_type, exception_value, exception_traceback = sys.exc_info()
- stack_utils.log_traceback(_log.error, exception_traceback)
- raise exception_type, exception_value, exception_traceback
-
-
-class _Process(multiprocessing.Process):
- def __init__(self, worker_connection, client):
- multiprocessing.Process.__init__(self)
- self._worker_connection = worker_connection
- self._client = client
-
- def run(self):
- if not self._worker_connection._host:
- self._worker_connection._host = Host()
- self._worker_connection.run()
-
-
-class _MultiProcessWorkerConnection(_WorkerConnection):
- def __init__(self, host, broker, worker_factory, worker_number):
- _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
- self._proc = _Process(self, self._client)
-
- def cancel(self):
- return self._proc.terminate()
-
- def is_alive(self):
- return self._proc.is_alive()
-
- def join(self, timeout):
- return self._proc.join(timeout)
-
- def start(self):
- self._proc.start()
-
- def run(self):
- self.set_up_logging()
- try:
- self._client.run(self._host)
- finally:
- self.clean_up_logging()
-
-
class _WorkerLogHandler(logging.Handler):
def __init__(self, worker):
logging.Handler.__init__(self)
self._worker = worker
- self._pid = os.getpid()
def emit(self, record):
- self._worker._log_messages.append(record)
+ self._worker.log_messages.append(record)
Deleted: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py (122496 => 122497)
--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py 2012-07-12 20:19:04 UTC (rev 122496)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py 2012-07-12 20:22:46 UTC (rev 122497)
@@ -1,206 +0,0 @@
-# Copyright (C) 2010 Google Inc. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import optparse
-import Queue
-import sys
-import unittest
-
-from webkitpy.common.system import outputcapture
-from webkitpy.layout_tests.controllers import manager_worker_broker
-
-
-# In order to reliably control when child workers are starting and stopping,
-# we use a pair of global variables to hold queues used for messaging. Ideally
-# we wouldn't need globals, but we can't pass these through a lexical closure
-# because those can't be Pickled and sent to a subprocess, and we'd prefer not
-# to have to pass extra arguments to the worker in the start_worker() call.
-starting_queue = None
-stopping_queue = None
-
-
-WORKER_NAME = 'worker/1'
-
-def make_broker(manager, max_workers, start_queue=None, stop_queue=None):
- global starting_queue
- global stopping_queue
- starting_queue = start_queue
- stopping_queue = stop_queue
- return manager_worker_broker._get_broker(max_workers, manager, _TestWorker)
-
-
-class _TestWorker(object):
- def __init__(self, caller):
- self._caller = caller
- self._thing_to_greet = 'everybody'
- self._starting_queue = starting_queue
- self._stopping_queue = stopping_queue
- self._options = optparse.Values({'verbose': False})
-
- def start(self):
- if self._starting_queue:
- self._starting_queue.put('')
-
- if self._stopping_queue:
- self._stopping_queue.get()
-
- def handle(self, message, src, an_int, a_str):
- assert an_int == 1
- assert a_str == "hello, world"
- self._caller.post('finished_test', 2)
-
- def stop(self):
- pass
-
-
-class FunctionTests(unittest.TestCase):
- def test_get__inline(self):
- self.assertTrue(make_broker(self, 1) is not None)
-
- def test_get__processes(self):
- # This test sometimes fails on Windows. See <http://webkit.org/b/55087>.
- if sys.platform in ('cygwin', 'win32'):
- return
- self.assertTrue(make_broker(self, 2) is not None)
-
-
-class _TestsMixin(object):
- """Mixin class that implements a series of tests to enforce the
- contract all implementations must follow."""
-
- name = 'TesterManager'
-
- def is_done(self):
- return self._done
-
- def handle_done(self, src, log_messages):
- self._done = True
-
- def handle_finished_test(self, src, an_int, log_messages):
- self._an_int = an_int
-
- def handle_exception(self, src, exception_type, exception_value, stack):
- raise exception_type(exception_value)
-
- def setUp(self):
- self._an_int = None
- self._broker = None
- self._done = False
- self._exception = None
- self._max_workers = None
-
- def make_broker(self, starting_queue=None, stopping_queue=None):
- self._broker = make_broker(self, self._max_workers, starting_queue,
- stopping_queue)
-
- def test_name(self):
- self.make_broker()
- worker = self._broker.start_worker(1)
- self.assertEquals(worker.name, WORKER_NAME)
- worker.cancel()
- worker.join(0.1)
- self.assertFalse(worker.is_alive())
- self._broker.cleanup()
-
- def test_cancel(self):
- self.make_broker()
- worker = self._broker.start_worker(1)
- self._broker.post_message('test_list', 1, 'hello, world')
- worker.cancel()
- worker.join(0.1)
- self.assertFalse(worker.is_alive())
- self._broker.cleanup()
-
- def test_done(self):
- self.make_broker()
- worker = self._broker.start_worker(1)
- self._broker.post_message('test_list', 1, 'hello, world')
- self._broker.post_message('stop')
- self._broker.run_message_loop()
- worker.join(0.5)
- self.assertFalse(worker.is_alive())
- self.assertTrue(self.is_done())
- self.assertEqual(self._an_int, 2)
- self._broker.cleanup()
-
- def test_unknown_message(self):
- self.make_broker()
- worker = self._broker.start_worker(1)
- self._broker.post_message('unknown')
- try:
- self._broker.run_message_loop()
- self.fail()
- except ValueError, e:
- self.assertEquals(str(e),
- "%s: received message 'unknown' it couldn't handle" % WORKER_NAME)
- finally:
- worker.join(0.5)
- self.assertFalse(worker.is_alive())
- self._broker.cleanup()
-
-
-class InlineBrokerTests(_TestsMixin, unittest.TestCase):
- def setUp(self):
- _TestsMixin.setUp(self)
- self._max_workers = 1
-
-
-# FIXME: https://bugs.webkit.org/show_bug.cgi?id=54520.
-if sys.platform not in ('cygwin', 'win32'):
-
- class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase):
- def setUp(self):
- _TestsMixin.setUp(self)
- self._max_workers = 2
-
-
-class MessageTest(unittest.TestCase):
- def test__no_body(self):
- msg = manager_worker_broker._Message('src', 'topic_name', 'message_name', None)
- self.assertTrue(repr(msg))
- s = msg.dumps()
- new_msg = manager_worker_broker._Message.loads(s)
- self.assertEqual(new_msg.name, 'message_name')
- self.assertEqual(new_msg.args, None)
- self.assertEqual(new_msg.topic_name, 'topic_name')
- self.assertEqual(new_msg.src, 'src')
-
- def test__body(self):
- msg = manager_worker_broker._Message('src', 'topic_name', 'message_name', ('body', 0))
- self.assertTrue(repr(msg))
- s = msg.dumps()
- new_msg = manager_worker_broker._Message.loads(s)
- self.assertEqual(new_msg.name, 'message_name')
- self.assertEqual(new_msg.args, ('body', 0))
- self.assertEqual(new_msg.topic_name, 'topic_name')
- self.assertEqual(new_msg.src, 'src')
-
-
-
-if __name__ == '__main__':
- unittest.main()