Title: [122497] trunk/Tools
Revision
122497
Author
[email protected]
Date
2012-07-12 13:22:46 -0700 (Thu, 12 Jul 2012)

Log Message

nrwt: reimplement manager_worker_broker in a much simpler form
https://bugs.webkit.org/show_bug.cgi?id=90513

Reviewed by Ojan Vafai.

This is a wholesale replacement of the MessagePool() implementation
and the other classes in manager_worker_broker.py. All of the
BrokerConnection*, Broker*, etc. classes are gone, and there are now
just a MessagePool class and a _Worker class. Happiness ensues.

I'm removing manager_worker_broker_unittest.py as well; we get
nearly complete coverage from the integration tests, and will
get more coverage when test-webkitpy moves to use this as well,
so having unit tests seems like unnecessary overhead. (running
coverage numbers with test-webkitpy shows that pretty much the only
uncovered lines are lines that are only run in the child processes,
which coverage doesn't handle at the moment).

* Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
(_MessagePool.__init__):
(_MessagePool.run):
(_MessagePool._start_workers):
(_MessagePool):
(_MessagePool.wait):
(_MessagePool._close):
(_MessagePool._handle_done):
(_MessagePool._can_pickle):
(_MessagePool._loop):
(WorkerException):
(_Message.__init__):
(_Message.__repr__):
(_Worker):
(_Worker.__init__):
(_Worker.terminate):
(_Worker._close):
(_Worker.run):
(_Worker.post):
(_Worker.yield_to_caller):
(_Worker._post):
(_Worker._raise):
(_Worker._set_up_logging):
(_WorkerLogHandler.__init__):
(_WorkerLogHandler.emit):
* Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: Removed.

Modified Paths

Removed Paths

Diff

Modified: trunk/Tools/ChangeLog (122496 => 122497)


--- trunk/Tools/ChangeLog	2012-07-12 20:19:04 UTC (rev 122496)
+++ trunk/Tools/ChangeLog	2012-07-12 20:22:46 UTC (rev 122497)
@@ -1,3 +1,50 @@
+2012-07-12  Dirk Pranke  <[email protected]>
+
+        nrwt: reimplement manager_worker_broker in a much simpler form
+        https://bugs.webkit.org/show_bug.cgi?id=90513
+
+        Reviewed by Ojan Vafai.
+
+        This is a wholesale replacement of the MessagePool() implementation
+        and the other classes in manager_worker_broker.py. All of the
+        BrokerConnection*, Broker*, etc. classes are gone, and there are now
+        just a MessagePool class and a _Worker class. Happiness ensues.
+ 
+        I'm removing manager_worker_broker_unittest.py as well; we get
+        nearly complete coverage from the integration tests, and will
+        get more coverage when test-webkitpy moves to use this as well,
+        so having unit tests seems like unnecessary overhead. (running
+        coverage numbers with test-webkitpy shows that pretty much the only
+        uncovered lines are lines that are only run in the child processes,
+        which coverage doesn't handle at the moment).
+ 
+        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
+        (_MessagePool.__init__):
+        (_MessagePool.run):
+        (_MessagePool._start_workers):
+        (_MessagePool):
+        (_MessagePool.wait):
+        (_MessagePool._close):
+        (_MessagePool._handle_done):
+        (_MessagePool._can_pickle):
+        (_MessagePool._loop):
+        (WorkerException):
+        (_Message.__init__):
+        (_Message.__repr__):
+        (_Worker):
+        (_Worker.__init__):
+        (_Worker.terminate):
+        (_Worker._close):
+        (_Worker.run):
+        (_Worker.post):
+        (_Worker.yield_to_caller):
+        (_Worker._post):
+        (_Worker._raise):
+        (_Worker._set_up_logging):
+        (_WorkerLogHandler.__init__):
+        (_WorkerLogHandler.emit):
+        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: Removed.
+
 2012-07-12  Tony Chang  <[email protected]>
 
         [chromium] Remove drag and drop API methods that are no longer used

Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py (122496 => 122497)


--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py	2012-07-12 20:19:04 UTC (rev 122496)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py	2012-07-12 20:22:46 UTC (rev 122497)
@@ -31,8 +31,6 @@
 import cPickle
 import logging
 import multiprocessing
-import optparse
-import os
 import Queue
 import sys
 import time
@@ -47,13 +45,6 @@
 _log = logging.getLogger(__name__)
 
 
-#
-# Topic names for Manager <-> Worker messaging
-#
-MANAGER_TOPIC = 'managers'
-ANY_WORKER_TOPIC = 'workers'
-
-
 def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
     """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
     return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host)
@@ -65,9 +56,17 @@
         self._worker_factory = worker_factory
         self._num_workers = num_workers
         self._worker_startup_delay_secs = worker_startup_delay_secs
-        self._worker_states = {}
+        self._workers = []
+        self._workers_stopped = set()
         self._host = host
-        self.name = 'manager'
+        self._name = 'manager'
+        self._running_inline = (self._num_workers == 1)
+        if self._running_inline:
+            self._messages_to_worker = Queue.Queue()
+            self._messages_to_manager = Queue.Queue()
+        else:
+            self._messages_to_worker = multiprocessing.Queue()
+            self._messages_to_manager = multiprocessing.Queue()
 
     def __enter__(self):
         return self
@@ -77,63 +76,61 @@
         return False
 
     def run(self, shards):
-        manager_connection = _get_broker(self._num_workers, self, self._worker_factory, self._host)
-        for worker_number in xrange(self._num_workers):
-            worker_connection = manager_connection.start_worker(worker_number)
-            worker_state = _WorkerState(worker_number, worker_connection)
-            self._worker_states[worker_connection.name] = worker_state
-            time.sleep(self._worker_startup_delay_secs)
+        """Posts a list of messages to the pool and waits for them to complete."""
+        for message in shards:
+            self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
 
-        messages = list(shards)
-        for message in messages:
-            manager_connection.post_message(*message)
-
         for _ in xrange(self._num_workers):
-            manager_connection.post_message('stop')
+            self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
 
-        self.wait(manager_connection)
+        self.wait()
 
-    def wait(self, manager_connection):
+    def _start_workers(self):
+        assert not self._workers
+        self._workers_stopped = set()
+        host = None
+        if self._running_inline or self._can_pickle(self._host):
+            host = self._host
+
+        for worker_number in xrange(self._num_workers):
+            worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None)
+            self._workers.append(worker)
+            worker.start()
+            if self._worker_startup_delay_secs:
+                time.sleep(self._worker_startup_delay_secs)
+
+    def wait(self):
         try:
-            while not self.is_done():
-                manager_connection.run_message_loop(delay_secs=1.0)
+            self._start_workers()
+            if self._running_inline:
+                self._workers[0].run()
+                self._loop(block=False)
+            else:
+                self._loop(block=True)
         finally:
             self._close()
 
-    def is_done(self):
-        worker_states = self._worker_states.values()
-        return worker_states and all(worker_state.done for worker_state in worker_states)
-
     def _close(self):
-        for worker_state in self._worker_states.values():
-            if worker_state.worker_connection.is_alive():
-                worker_state.worker_connection.cancel()
-                _log.debug('Waiting for worker %d to exit' % worker_state.number)
-                worker_state.worker_connection.join(5.0)
-                if worker_state.worker_connection.is_alive():
-                    _log.error('Worker %d did not exit in time.' % worker_state.number)
+        for worker in self._workers:
+            if worker.is_alive():
+                worker.terminate()
+                worker.join()
+        self._workers = []
+        if not self._running_inline:
+            if self._messages_to_worker:
+                self._messages_to_worker.close()
+                self._messages_to_worker = None
+            if self._messages_to_manager:
+                self._messages_to_manager.close()
+                self._messages_to_manager = None
 
-    def handle_done(self, source, log_messages=None):
-        worker_state = self._worker_states[source]
-        worker_state.done = True
-        self._log_messages(log_messages)
-
-    def handle_started_test(self, *args):
-        self._caller.handle('started_test', *args)
-
-    def handle_finished_test(self, *args):
-        self._caller.handle('finished_test', *args)
-
-    def handle_finished_test_list(self, *args):
-        self._caller.handle('finished_test_list', *args)
-
-    def handle_exception(self, *args):
-        self._handle_worker_exception(*args)
-
     def _log_messages(self, messages):
         for message in messages:
             logging.root.handle(message)
 
+    def _handle_done(self, source):
+        self._workers_stopped.add(source)
+
     @staticmethod
     def _handle_worker_exception(source, exception_type, exception_value, stack):
         if exception_type == KeyboardInterrupt:
@@ -142,433 +139,161 @@
                    source, exception_value.__class__.__name__, str(exception_value)))
         raise WorkerException(str(exception_value))
 
+    def _can_pickle(self, host):
+        try:
+            cPickle.dumps(host)
+            return True
+        except TypeError:
+            return False
 
-class _WorkerState(object):
-    def __init__(self, number, worker_connection):
-        self.worker_connection = worker_connection
-        self.number = number
-        self.done = False
+    def _loop(self, block):
+        try:
+            while True:
+                if len(self._workers_stopped) == len(self._workers):
+                    block = False
+                message = self._messages_to_manager.get(block)
+                self._log_messages(message.logs)
+                if message.from_user:
+                    self._caller.handle(message.name, message.src, *message.args)
+                    continue
+                method = getattr(self, '_handle_' + message.name)
+                assert method, 'bad message %s' % repr(message)
+                method(message.src, *message.args)
+        except Queue.Empty:
+            pass
 
-    def __repr__(self):
-        return "_WorkerState(" + str(self.__dict__) + ")"
 
-
-def _get_broker(max_workers, client, worker_factory, host=None):
-    """Return a connection to a manager/worker message_broker
-
-    Args:
-        max_workers - max # of workers to run concurrently.
-        client - BrokerClient implementation to dispatch
-            replies to.
-        worker_factory: factory method for creating objects that implement the Worker interface.
-        host: optional picklable host object that can be passed to workers for testing.
-    Returns:
-        A handle to an object that will talk to a message broker configured
-        for the normal manager/worker communication."""
-    if max_workers == 1:
-        queue_class = Queue.Queue
-        manager_class = _InlineManager
-    else:
-        queue_class = multiprocessing.Queue
-        manager_class = _MultiProcessManager
-
-    broker = _Broker(queue_class)
-    return manager_class(broker, client, worker_factory, host)
-
-
 class WorkerException(Exception):
     """Raised when we receive an unexpected/unknown exception from a worker."""
     pass
 
 
-class BrokerClient(object):
-    """Abstract base class / interface that all message broker clients must
-    implement. In addition to the methods below, by convention clients
-    implement routines of the signature type
-
-        handle_MESSAGE_NAME(self, src, ...):
-
-    where MESSAGE_NAME matches the string passed to post_message(), and
-    src indicates the name of the sender. If the message contains values in
-    the message body, those will be provided as optparams."""
-
-    def is_done(self):
-        """Called from inside run_message_loop() to indicate whether to exit."""
-        raise NotImplementedError
-
-    def name(self):
-        """Return a name that identifies the client."""
-        raise NotImplementedError
-
-
-class _Broker(object):
-    """Brokers provide the basic model of a set of topics. Clients can post a
-    message to any topic using post_message(), and can process messages on one
-    topic at a time using run_message_loop()."""
-
-    def __init__(self, queue_maker):
-        """Args:
-            queue_maker: a factory method that returns objects implementing a
-                Queue interface (put()/get()).
-        """
-        self._queue_maker = queue_maker
-        self._topics = {}
-
-    def __del__(self):
-        self.cleanup()
-
-    def cleanup(self):
-        for queue in self._topics.values():
-            if hasattr(queue, 'close'):
-                queue.close()
-        self._topics = {}
-
-    def add_topic(self, topic_name):
-        if topic_name not in self._topics:
-            self._topics[topic_name] = self._queue_maker()
-
-    def _get_queue_for_topic(self, topic_name):
-        return self._topics[topic_name]
-
-    def post_message(self, client, topic_name, message_name, *message_args):
-        """Post a message to the appropriate topic name.
-
-        Messages have a name and a tuple of optional arguments. Both must be picklable."""
-        message = _Message(client.name, topic_name, message_name, message_args)
-        queue = self._get_queue_for_topic(topic_name)
-        queue.put(_Message.dumps(message))
-
-    def run_message_loop(self, topic_name, client, delay_secs=None):
-        """Loop processing messages until client.is_done() or delay passes.
-
-        To run indefinitely, set delay_secs to None."""
-        assert delay_secs is None or delay_secs > 0
-        self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)
-
-    def run_all_pending(self, topic_name, client):
-        """Process messages until client.is_done() or caller would block."""
-        self._run_loop(topic_name, client, block=False, delay_secs=None)
-
-    def _run_loop(self, topic_name, client, block, delay_secs):
-        queue = self._get_queue_for_topic(topic_name)
-        while not client.is_done():
-            try:
-                s = queue.get(block, delay_secs)
-            except Queue.Empty:
-                return
-            msg = _Message.loads(s)
-            self._dispatch_message(msg, client)
-
-    def _dispatch_message(self, message, client):
-        if not hasattr(client, 'handle_' + message.name):
-            raise ValueError(
-               "%s: received message '%s' it couldn't handle" %
-               (client.name, message.name))
-        optargs = message.args
-        message_handler = getattr(client, 'handle_' + message.name)
-        message_handler(message.src, *optargs)
-
-
 class _Message(object):
-    @staticmethod
-    def loads(string_value):
-        obj = cPickle.loads(string_value)
-        assert(isinstance(obj, _Message))
-        return obj
-
-    def __init__(self, src, topic_name, message_name, message_args):
+    def __init__(self, src, message_name, message_args, from_user, logs):
         self.src = ""
-        self.topic_name = topic_name
         self.name = message_name
         self.args = message_args
+        self.from_user = from_user
+        self.logs = logs
 
-    def dumps(self):
-        return cPickle.dumps(self)
-
     def __repr__(self):
-        return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
-                (self.src, self.topic_name, self.name))
+        return '_Message(src="" name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
 
 
-class _BrokerConnection(object):
-    """_BrokerConnection provides a connection-oriented facade on top of a
-    Broker, so that callers don't have to repeatedly pass the same topic
-    names over and over."""
+class _Worker(multiprocessing.Process):
+    def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager):
+        super(_Worker, self).__init__()
+        self.host = host
+        self.worker_number = worker_number
+        self.name = 'worker/%d' % worker_number
+        self.log_messages = []
+        self._running_inline = running_inline
+        self._manager = manager
 
-    def __init__(self, broker, client, run_topic, post_topic):
-        """Create a _BrokerConnection on top of a _Broker. Note that the _Broker
-        is passed in rather than created so that a single _Broker can be used
-        by multiple _BrokerConnections."""
-        self._broker = broker
-        self._client = client
-        self._post_topic = post_topic
-        self._run_topic = run_topic
-        broker.add_topic(run_topic)
-        broker.add_topic(post_topic)
+        self._messages_to_manager = messages_to_manager
+        self._messages_to_worker = messages_to_worker
+        self._worker = worker_factory(self)
+        self._logger = None
+        self._log_handler = None
 
-    def cleanup(self):
-        self._broker.cleanup()
-        self._broker = None
+    def terminate(self):
+        if self._worker:
+            self._worker.stop()
+            self._worker = None
+        if self.is_alive():
+            super(_Worker, self).terminate()
 
-    def run_message_loop(self, delay_secs=None):
-        self._broker.run_message_loop(self._run_topic, self._client, delay_secs)
+    def _close(self):
+        if self._log_handler and self._logger:
+            self._logger.removeHandler(self._log_handler)
+        self._log_handler = None
+        self._logger = None
 
-    def post_message(self, message_name, *message_args):
-        self._broker.post_message(self._client, self._post_topic,
-                                  message_name, *message_args)
+    def start(self):
+        if not self._running_inline:
+            super(_Worker, self).start()
 
-    def raise_exception(self, exc_info):
-        # Since tracebacks aren't picklable, send the extracted stack instead,
-        # but at least log the full traceback.
-        exception_type, exception_value, exception_traceback = sys.exc_info()
-        stack_utils.log_traceback(_log.error, exception_traceback)
-        stack = traceback.extract_tb(exception_traceback)
-        self._broker.post_message(self._client, self._post_topic, 'exception', exception_type, exception_value, stack)
+    def run(self):
+        if not self.host:
+            self.host = Host()
+        if not self._running_inline:
+            self._set_up_logging()
 
-
-class AbstractWorker(BrokerClient):
-    def __init__(self, worker_connection, worker_number):
-        BrokerClient.__init__(self)
-        self.worker = None
-        self._worker_connection = worker_connection
-        self.worker_number = worker_number
-        self.name = 'worker/%d' % worker_number
-        self._done = False
-        self._canceled = False
-        self._options = optparse.Values({'verbose': False})
-        self.host = None
-
-    def is_done(self):
-        return self._done or self._canceled
-
-    def stop_handling_messages(self):
-        self._done = True
-
-    def run(self, host):
-        """Callback for the worker to start executing. Typically does any
-        remaining initialization and then calls broker_connection.run_message_loop()."""
+        worker = self._worker
         exception_msg = ""
-        self.host = host
+        _log.debug("%s starting" % self.name)
 
-        self.worker.start()
-        _log.debug('%s starting' % self.name)
-
         try:
-            self._worker_connection.run_message_loop()
-            if not self.is_done():
-                raise AssertionError("%s: ran out of messages in worker queue."
-                                     % self.name)
-        except KeyboardInterrupt:
+            worker.start()
+            while True:
+                message = self._messages_to_worker.get()
+                if message.from_user:
+                    worker.handle(message.name, message.src, *message.args)
+                    self.yield_to_caller()
+                else:
+                    assert message.name == 'stop', 'bad message %s' % repr(message)
+                    break
+
+        except Queue.Empty:
+            assert False, '%s: ran out of messages in worker queue.' % self.name
+        except KeyboardInterrupt, e:
             exception_msg = ", interrupted"
-            self._worker_connection.raise_exception(sys.exc_info())
-        except:
-            exception_msg = ", exception raised"
-            self._worker_connection.raise_exception(sys.exc_info())
+            if not self._running_inline:
+                _log.warning('worker exception')
+                self._raise(sys.exc_info())
+            raise
+        except Exception, e:
+            exception_msg = ", exception raised: %s" % str(e)
+            if not self._running_inline:
+                self._raise(sys.exc_info())
+            raise
         finally:
-            _log.debug("%s done with message loop%s" % (self.name, exception_msg))
+            _log.debug("%s exiting%s" % (self.name, exception_msg))
             try:
-                self.worker.stop()
+                worker.stop()
             finally:
-                # Make sure we post a done so that we can flush the log messages
-                # and clean up properly even if we raise an exception in worker.cleanup().
-                self._worker_connection.post_message('done')
+                self._post(name='done', args=(), from_user=False)
+            self._close()
 
-    def handle_stop(self, source):
-        self._done = True
+    def post(self, name, *args):
+        self._post(name, args, from_user=True)
 
-    def handle_test_list(self, source, list_name, test_list):
-        self.worker.handle('test_list', source, list_name, test_list)
-
-    def cancel(self):
-        """Called when possible to indicate to the worker to stop processing
-        messages and shut down. Note that workers may be stopped without this
-        method being called, so clients should not rely solely on this."""
-        self._canceled = True
-
     def yield_to_caller(self):
-        self._worker_connection.yield_to_broker()
+        if self._running_inline:
+            self._manager._loop(block=False)
 
-    def post(self, *args):
-        self._worker_connection.post_message(*args)
+    def _post(self, name, args, from_user):
+        log_messages = self.log_messages
+        self.log_messages = []
+        self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
 
+    def _raise(self, exc_info):
+        # Since tracebacks aren't picklable, send the extracted stack instead.
+        exception_type, exception_value, exception_traceback = exc_info
+        stack_utils.log_traceback(_log.error, exception_traceback)
+        stack = traceback.extract_tb(exception_traceback)
+        self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
 
-class _ManagerConnection(_BrokerConnection):
-    def __init__(self, broker, client, worker_factory, host):
-        _BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC)
-        self._worker_factory = worker_factory
-        self._host = host
+    def _set_up_logging(self):
+        self._logger = logging.getLogger()
 
-    def start_worker(self, worker_number):
-        raise NotImplementedError
-
-
-class _InlineManager(_ManagerConnection):
-    def __init__(self, broker, client, worker_factory, host):
-        _ManagerConnection.__init__(self, broker, client, worker_factory, host)
-        self._inline_worker = None
-
-    def start_worker(self, worker_number):
-        host = self._host
-        self._inline_worker = _InlineWorkerConnection(host, self._broker, self._client, self._worker_factory, worker_number)
-        return self._inline_worker
-
-    def run_message_loop(self, delay_secs=None):
-        # Note that delay_secs is ignored in this case since we can't easily
-        # implement it.
-        self._inline_worker.run()
-        self._broker.run_all_pending(MANAGER_TOPIC, self._client)
-
-
-class _MultiProcessManager(_ManagerConnection):
-    def _can_pickle_host(self):
-        try:
-            cPickle.dumps(self._host)
-            return True
-        except TypeError:
-            return False
-
-    def start_worker(self, worker_number):
-        host = None
-        if self._can_pickle_host():
-            host = self._host
-        worker_connection = _MultiProcessWorkerConnection(host, self._broker, self._worker_factory, worker_number)
-        worker_connection.start()
-        return worker_connection
-
-
-class _WorkerConnection(_BrokerConnection):
-    def __init__(self, host, broker, worker_factory, worker_number):
-        # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the
-        # actual Worker (created by worker_factory) is very confusing, but this all gets better when
-        # _WorkerConnection and AbstractWorker get merged.
-        self._client = AbstractWorker(self, worker_number)
-        self._worker = worker_factory(self._client)
-        self._client.worker = self._worker
-        self._host = host
-        self._log_messages = []
-        self._logger = None
-        self._log_handler = None
-        self.name = self._client.name
-        _BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC)
-
-    def cancel(self):
-        raise NotImplementedError
-
-    def is_alive(self):
-        raise NotImplementedError
-
-    def join(self, timeout):
-        raise NotImplementedError
-
-    # FIXME: rename to yield_to_caller().
-    def yield_to_broker(self):
-        pass
-
-    def post_message(self, *args):
-        # FIXME: This is a hack until we can remove the log_messages arg from the manager.
-        if args[0] in ('finished_test', 'done'):
-            log_messages = self._log_messages
-            self._log_messages = []
-            args = args + tuple([log_messages])
-        super(_WorkerConnection, self).post_message(*args)
-
-    def set_up_logging(self):
-        self._logger = logging.root
         # The unix multiprocessing implementation clones the MeteredStream log handler
         # into the child process, so we need to remove it to avoid duplicate logging.
         for h in self._logger.handlers:
             # log handlers don't have names until python 2.7.
-            if getattr(h, 'name', '') == metered_stream.LOG_HANDLER_NAME:
+            # FIXME: get webkitpy.test.printer from a constant as well.
+            if getattr(h, 'name', '') in (metered_stream.LOG_HANDLER_NAME, 'webkitpy.test.printer'):
                 self._logger.removeHandler(h)
                 break
-        self._logger.setLevel(logging.DEBUG if self._client._options.verbose else logging.INFO)
+
         self._log_handler = _WorkerLogHandler(self)
         self._logger.addHandler(self._log_handler)
 
-    def clean_up_logging(self):
-        if self._log_handler and self._logger:
-            self._logger.removeHandler(self._log_handler)
-        self._log_handler = None
-        self._logger = None
 
-
-class _InlineWorkerConnection(_WorkerConnection):
-    def __init__(self, host, broker, manager_client, worker_factory, worker_number):
-        _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
-        self._alive = False
-        self._manager_client = manager_client
-
-    def cancel(self):
-        self._client.cancel()
-
-    def is_alive(self):
-        return self._alive
-
-    def join(self, timeout):
-        assert not self._alive
-
-    def run(self):
-        self._alive = True
-        try:
-            self._client.run(self._host)
-        finally:
-            self._alive = False
-
-    def yield_to_broker(self):
-        self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
-
-    def raise_exception(self, exc_info):
-        # Since the worker is in the same process as the manager, we can
-        # raise the exception directly, rather than having to send it through
-        # the queue. This allows us to preserve the traceback, but we log
-        # it anyway for consistency with the multiprocess case.
-        exception_type, exception_value, exception_traceback = sys.exc_info()
-        stack_utils.log_traceback(_log.error, exception_traceback)
-        raise exception_type, exception_value, exception_traceback
-
-
-class _Process(multiprocessing.Process):
-    def __init__(self, worker_connection, client):
-        multiprocessing.Process.__init__(self)
-        self._worker_connection = worker_connection
-        self._client = client
-
-    def run(self):
-        if not self._worker_connection._host:
-            self._worker_connection._host = Host()
-        self._worker_connection.run()
-
-
-class _MultiProcessWorkerConnection(_WorkerConnection):
-    def __init__(self, host, broker, worker_factory, worker_number):
-        _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
-        self._proc = _Process(self, self._client)
-
-    def cancel(self):
-        return self._proc.terminate()
-
-    def is_alive(self):
-        return self._proc.is_alive()
-
-    def join(self, timeout):
-        return self._proc.join(timeout)
-
-    def start(self):
-        self._proc.start()
-
-    def run(self):
-        self.set_up_logging()
-        try:
-            self._client.run(self._host)
-        finally:
-            self.clean_up_logging()
-
-
 class _WorkerLogHandler(logging.Handler):
     def __init__(self, worker):
         logging.Handler.__init__(self)
         self._worker = worker
-        self._pid = os.getpid()
 
     def emit(self, record):
-        self._worker._log_messages.append(record)
+        self._worker.log_messages.append(record)

Deleted: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py (122496 => 122497)


--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py	2012-07-12 20:19:04 UTC (rev 122496)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py	2012-07-12 20:22:46 UTC (rev 122497)
@@ -1,206 +0,0 @@
-# Copyright (C) 2010 Google Inc. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-#     * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#     * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-#     * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import optparse
-import Queue
-import sys
-import unittest
-
-from webkitpy.common.system import outputcapture
-from webkitpy.layout_tests.controllers import manager_worker_broker
-
-
-# In order to reliably control when child workers are starting and stopping,
-# we use a pair of global variables to hold queues used for messaging. Ideally
-# we wouldn't need globals, but we can't pass these through a lexical closure
-# because those can't be Pickled and sent to a subprocess, and we'd prefer not
-# to have to pass extra arguments to the worker in the start_worker() call.
-starting_queue = None
-stopping_queue = None
-
-
-WORKER_NAME = 'worker/1'
-
-def make_broker(manager, max_workers, start_queue=None, stop_queue=None):
-    global starting_queue
-    global stopping_queue
-    starting_queue = start_queue
-    stopping_queue = stop_queue
-    return manager_worker_broker._get_broker(max_workers, manager, _TestWorker)
-
-
-class _TestWorker(object):
-    def __init__(self, caller):
-        self._caller = caller
-        self._thing_to_greet = 'everybody'
-        self._starting_queue = starting_queue
-        self._stopping_queue = stopping_queue
-        self._options = optparse.Values({'verbose': False})
-
-    def start(self):
-        if self._starting_queue:
-            self._starting_queue.put('')
-
-        if self._stopping_queue:
-            self._stopping_queue.get()
-
-    def handle(self, message, src, an_int, a_str):
-        assert an_int == 1
-        assert a_str == "hello, world"
-        self._caller.post('finished_test', 2)
-
-    def stop(self):
-        pass
-
-
-class FunctionTests(unittest.TestCase):
-    def test_get__inline(self):
-        self.assertTrue(make_broker(self, 1) is not None)
-
-    def test_get__processes(self):
-        # This test sometimes fails on Windows. See <http://webkit.org/b/55087>.
-        if sys.platform in ('cygwin', 'win32'):
-            return
-        self.assertTrue(make_broker(self, 2) is not None)
-
-
-class _TestsMixin(object):
-    """Mixin class that implements a series of tests to enforce the
-    contract all implementations must follow."""
-
-    name = 'TesterManager'
-
-    def is_done(self):
-        return self._done
-
-    def handle_done(self, src, log_messages):
-        self._done = True
-
-    def handle_finished_test(self, src, an_int, log_messages):
-        self._an_int = an_int
-
-    def handle_exception(self, src, exception_type, exception_value, stack):
-        raise exception_type(exception_value)
-
-    def setUp(self):
-        self._an_int = None
-        self._broker = None
-        self._done = False
-        self._exception = None
-        self._max_workers = None
-
-    def make_broker(self, starting_queue=None, stopping_queue=None):
-        self._broker = make_broker(self, self._max_workers, starting_queue,
-                                   stopping_queue)
-
-    def test_name(self):
-        self.make_broker()
-        worker = self._broker.start_worker(1)
-        self.assertEquals(worker.name, WORKER_NAME)
-        worker.cancel()
-        worker.join(0.1)
-        self.assertFalse(worker.is_alive())
-        self._broker.cleanup()
-
-    def test_cancel(self):
-        self.make_broker()
-        worker = self._broker.start_worker(1)
-        self._broker.post_message('test_list', 1, 'hello, world')
-        worker.cancel()
-        worker.join(0.1)
-        self.assertFalse(worker.is_alive())
-        self._broker.cleanup()
-
-    def test_done(self):
-        self.make_broker()
-        worker = self._broker.start_worker(1)
-        self._broker.post_message('test_list', 1, 'hello, world')
-        self._broker.post_message('stop')
-        self._broker.run_message_loop()
-        worker.join(0.5)
-        self.assertFalse(worker.is_alive())
-        self.assertTrue(self.is_done())
-        self.assertEqual(self._an_int, 2)
-        self._broker.cleanup()
-
-    def test_unknown_message(self):
-        self.make_broker()
-        worker = self._broker.start_worker(1)
-        self._broker.post_message('unknown')
-        try:
-            self._broker.run_message_loop()
-            self.fail()
-        except ValueError, e:
-            self.assertEquals(str(e),
-                              "%s: received message 'unknown' it couldn't handle" % WORKER_NAME)
-        finally:
-            worker.join(0.5)
-        self.assertFalse(worker.is_alive())
-        self._broker.cleanup()
-
-
-class InlineBrokerTests(_TestsMixin, unittest.TestCase):
-    def setUp(self):
-        _TestsMixin.setUp(self)
-        self._max_workers = 1
-
-
-# FIXME: https://bugs.webkit.org/show_bug.cgi?id=54520.
-if sys.platform not in ('cygwin', 'win32'):
-
-    class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase):
-        def setUp(self):
-            _TestsMixin.setUp(self)
-            self._max_workers = 2
-
-
-class MessageTest(unittest.TestCase):
-    def test__no_body(self):
-        msg = manager_worker_broker._Message('src', 'topic_name', 'message_name', None)
-        self.assertTrue(repr(msg))
-        s = msg.dumps()
-        new_msg = manager_worker_broker._Message.loads(s)
-        self.assertEqual(new_msg.name, 'message_name')
-        self.assertEqual(new_msg.args, None)
-        self.assertEqual(new_msg.topic_name, 'topic_name')
-        self.assertEqual(new_msg.src, 'src')
-
-    def test__body(self):
-        msg = manager_worker_broker._Message('src', 'topic_name', 'message_name', ('body', 0))
-        self.assertTrue(repr(msg))
-        s = msg.dumps()
-        new_msg = manager_worker_broker._Message.loads(s)
-        self.assertEqual(new_msg.name, 'message_name')
-        self.assertEqual(new_msg.args, ('body', 0))
-        self.assertEqual(new_msg.topic_name, 'topic_name')
-        self.assertEqual(new_msg.src, 'src')
-
-
-
-if __name__ == '__main__':
-    unittest.main()
_______________________________________________
webkit-changes mailing list
[email protected]
http://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to