Title: [122513] trunk/Tools
Revision
122513
Author
[email protected]
Date
2012-07-12 14:55:37 -0700 (Thu, 12 Jul 2012)

Log Message

webkitpy: rename manager_worker_broker to message_pool
https://bugs.webkit.org/show_bug.cgi?id=91145

Reviewed by Ojan Vafai.

Since the MessagePool interface is more generic (and simpler)
now and will be reused by test-webkitpy, I'm renaming it and
moving it to webkitpy.common.

* Scripts/webkitpy/common/message_pool.py: Renamed from Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py.
* Scripts/webkitpy/layout_tests/controllers/manager.py:
(TestRunInterruptedException.__reduce__):
(Manager._run_tests.worker_factory):
(Manager._run_tests):
* Scripts/webkitpy/layout_tests/run_webkit_tests_integrationtest.py:

Modified Paths

Added Paths

Removed Paths

Diff

Modified: trunk/Tools/ChangeLog (122512 => 122513)


--- trunk/Tools/ChangeLog	2012-07-12 21:46:04 UTC (rev 122512)
+++ trunk/Tools/ChangeLog	2012-07-12 21:55:37 UTC (rev 122513)
@@ -1,5 +1,23 @@
 2012-07-12  Dirk Pranke  <[email protected]>
 
+        webkitpy: rename manager_worker_broker to message_pool
+        https://bugs.webkit.org/show_bug.cgi?id=91145
+
+        Reviewed by Ojan Vafai.
+
+        Since the MessagePool interface is more generic (and simpler)
+        now and will be reused by test-webkitpy, I'm renaming it and
+        moving it to webkitpy.common.
+
+        * Scripts/webkitpy/common/message_pool.py: Renamed from Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py.
+        * Scripts/webkitpy/layout_tests/controllers/manager.py:
+        (TestRunInterruptedException.__reduce__):
+        (Manager._run_tests.worker_factory):
+        (Manager._run_tests):
+        * Scripts/webkitpy/layout_tests/run_webkit_tests_integrationtest.py:
+
+2012-07-12  Dirk Pranke  <[email protected]>
+
         nrwt crashes saving the output for a platform-specific expected test reference
         https://bugs.webkit.org/show_bug.cgi?id=90872
 

Copied: trunk/Tools/Scripts/webkitpy/common/message_pool.py (from rev 122511, trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py) (0 => 122513)


--- trunk/Tools/Scripts/webkitpy/common/message_pool.py	                        (rev 0)
+++ trunk/Tools/Scripts/webkitpy/common/message_pool.py	2012-07-12 21:55:37 UTC (rev 122513)
@@ -0,0 +1,310 @@
+# Copyright (C) 2011 Google Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Module for handling messages and concurrency for run-webkit-tests
+and test-webkitpy. This module follows the design for multiprocessing.Pool
+and concurrency.futures.ProcessPoolExecutor, with the following differences:
+
+* Tasks are executed in stateful subprocesses via objects that implement the
+  Worker interface - this allows the workers to share state across tasks.
+* The pool provides an asynchronous event-handling interface so the caller
+  may receive events as tasks are processed.
+
+If you don't need these features, use multiprocessing.Pool or concurrency.futures
+intead.
+
+"""
+
+import cPickle
+import logging
+import multiprocessing
+import Queue
+import sys
+import time
+import traceback
+
+
+from webkitpy.common.host import Host
+from webkitpy.common.system import stack_utils
+
+
+_log = logging.getLogger(__name__)
+
+
+def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
+    """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
+    return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host)
+
+
+class _MessagePool(object):
+    def __init__(self, caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
+        self._caller = caller
+        self._worker_factory = worker_factory
+        self._num_workers = num_workers
+        self._worker_startup_delay_secs = worker_startup_delay_secs
+        self._workers = []
+        self._workers_stopped = set()
+        self._host = host
+        self._name = 'manager'
+        self._running_inline = (self._num_workers == 1)
+        if self._running_inline:
+            self._messages_to_worker = Queue.Queue()
+            self._messages_to_manager = Queue.Queue()
+        else:
+            self._messages_to_worker = multiprocessing.Queue()
+            self._messages_to_manager = multiprocessing.Queue()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self._close()
+        return False
+
+    def run(self, shards):
+        """Posts a list of messages to the pool and waits for them to complete."""
+        for message in shards:
+            self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
+
+        for _ in xrange(self._num_workers):
+            self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
+
+        self.wait()
+
+    def _start_workers(self):
+        assert not self._workers
+        self._workers_stopped = set()
+        host = None
+        if self._running_inline or self._can_pickle(self._host):
+            host = self._host
+
+        for worker_number in xrange(self._num_workers):
+            worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None)
+            self._workers.append(worker)
+            worker.start()
+            if self._worker_startup_delay_secs:
+                time.sleep(self._worker_startup_delay_secs)
+
+    def wait(self):
+        try:
+            self._start_workers()
+            if self._running_inline:
+                self._workers[0].run()
+                self._loop(block=False)
+            else:
+                self._loop(block=True)
+        finally:
+            self._close()
+
+    def _close(self):
+        for worker in self._workers:
+            if worker.is_alive():
+                worker.terminate()
+                worker.join()
+        self._workers = []
+        if not self._running_inline:
+            if self._messages_to_worker:
+                self._messages_to_worker.close()
+                self._messages_to_worker = None
+            if self._messages_to_manager:
+                self._messages_to_manager.close()
+                self._messages_to_manager = None
+
+    def _log_messages(self, messages):
+        for message in messages:
+            logging.root.handle(message)
+
+    def _handle_done(self, source):
+        self._workers_stopped.add(source)
+
+    @staticmethod
+    def _handle_worker_exception(source, exception_type, exception_value, stack):
+        if exception_type == KeyboardInterrupt:
+            raise exception_type(exception_value)
+        _log.error("%s raised %s('%s'):" % (
+                   source, exception_value.__class__.__name__, str(exception_value)))
+        raise WorkerException(str(exception_value))
+
+    def _can_pickle(self, host):
+        try:
+            cPickle.dumps(host)
+            return True
+        except TypeError:
+            return False
+
+    def _loop(self, block):
+        try:
+            while True:
+                if len(self._workers_stopped) == len(self._workers):
+                    block = False
+                message = self._messages_to_manager.get(block)
+                self._log_messages(message.logs)
+                if message.from_user:
+                    self._caller.handle(message.name, message.src, *message.args)
+                    continue
+                method = getattr(self, '_handle_' + message.name)
+                assert method, 'bad message %s' % repr(message)
+                method(message.src, *message.args)
+        except Queue.Empty:
+            pass
+
+
+class WorkerException(Exception):
+    """Raised when we receive an unexpected/unknown exception from a worker."""
+    pass
+
+
+class _Message(object):
+    def __init__(self, src, message_name, message_args, from_user, logs):
+        self.src = ""
+        self.name = message_name
+        self.args = message_args
+        self.from_user = from_user
+        self.logs = logs
+
+    def __repr__(self):
+        return '_Message(src="" name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
+
+
+class _Worker(multiprocessing.Process):
+    def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager):
+        super(_Worker, self).__init__()
+        self.host = host
+        self.worker_number = worker_number
+        self.name = 'worker/%d' % worker_number
+        self.log_messages = []
+        self._running_inline = running_inline
+        self._manager = manager
+
+        self._messages_to_manager = messages_to_manager
+        self._messages_to_worker = messages_to_worker
+        self._worker = worker_factory(self)
+        self._logger = None
+        self._log_handler = None
+
+    def terminate(self):
+        if self._worker:
+            self._worker.stop()
+            self._worker = None
+        if self.is_alive():
+            super(_Worker, self).terminate()
+
+    def _close(self):
+        if self._log_handler and self._logger:
+            self._logger.removeHandler(self._log_handler)
+        self._log_handler = None
+        self._logger = None
+
+    def start(self):
+        if not self._running_inline:
+            super(_Worker, self).start()
+
+    def run(self):
+        if not self.host:
+            self.host = Host()
+        if not self._running_inline:
+            self._set_up_logging()
+
+        worker = self._worker
+        exception_msg = ""
+        _log.debug("%s starting" % self.name)
+
+        try:
+            worker.start()
+            while True:
+                message = self._messages_to_worker.get()
+                if message.from_user:
+                    worker.handle(message.name, message.src, *message.args)
+                    self.yield_to_caller()
+                else:
+                    assert message.name == 'stop', 'bad message %s' % repr(message)
+                    break
+
+        except Queue.Empty:
+            assert False, '%s: ran out of messages in worker queue.' % self.name
+        except KeyboardInterrupt, e:
+            exception_msg = ", interrupted"
+            if not self._running_inline:
+                _log.warning('worker exception')
+                self._raise(sys.exc_info())
+            raise
+        except Exception, e:
+            exception_msg = ", exception raised: %s" % str(e)
+            if not self._running_inline:
+                self._raise(sys.exc_info())
+            raise
+        finally:
+            _log.debug("%s exiting%s" % (self.name, exception_msg))
+            try:
+                worker.stop()
+            finally:
+                self._post(name='done', args=(), from_user=False)
+            self._close()
+
+    def post(self, name, *args):
+        self._post(name, args, from_user=True)
+
+    def yield_to_caller(self):
+        if self._running_inline:
+            self._manager._loop(block=False)
+
+    def _post(self, name, args, from_user):
+        log_messages = self.log_messages
+        self.log_messages = []
+        self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
+
+    def _raise(self, exc_info):
+        # Since tracebacks aren't picklable, send the extracted stack instead.
+        exception_type, exception_value, exception_traceback = exc_info
+        stack_utils.log_traceback(_log.error, exception_traceback)
+        stack = traceback.extract_tb(exception_traceback)
+        self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
+
+    def _set_up_logging(self):
+        self._logger = logging.getLogger()
+
+        # The unix multiprocessing implementation clones the MeteredStream log handler
+        # into the child process, so we need to remove it to avoid duplicate logging.
+        for h in self._logger.handlers:
+            # log handlers don't have names until python 2.7.
+            # FIXME: log handler names should be passed in.
+            if getattr(h, 'name', '') in ('MeteredStreamLogHandler', 'webkitpy.test.printer'):
+                self._logger.removeHandler(h)
+                break
+
+        self._log_handler = _WorkerLogHandler(self)
+        self._logger.addHandler(self._log_handler)
+
+
+class _WorkerLogHandler(logging.Handler):
+    def __init__(self, worker):
+        logging.Handler.__init__(self)
+        self._worker = worker
+
+    def emit(self, record):
+        self._worker.log_messages.append(record)

Property changes: trunk/Tools/Scripts/webkitpy/common/message_pool.py


Added: svn:eol-style

Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py (122512 => 122513)


--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py	2012-07-12 21:46:04 UTC (rev 122512)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py	2012-07-12 21:55:37 UTC (rev 122513)
@@ -44,7 +44,7 @@
 import sys
 import time
 
-from webkitpy.layout_tests.controllers import manager_worker_broker
+from webkitpy.common import message_pool
 from webkitpy.layout_tests.controllers import worker
 from webkitpy.layout_tests.controllers.test_result_writer import TestResultWriter
 from webkitpy.layout_tests.layout_package import json_layout_results_generator
@@ -264,8 +264,8 @@
         return self.__class__, (self.reason,)
 
 
-# Export this so callers don't need to know about manager_worker_broker.
-WorkerException = manager_worker_broker.WorkerException
+# Export this so callers don't need to know about message pools.
+WorkerException = message_pool.WorkerException
 
 
 class TestShard(object):
@@ -767,15 +767,13 @@
         def worker_factory(worker_connection):
             return worker.Worker(worker_connection, self.results_directory(), self._options)
 
-        manager_connection = manager_worker_broker.get(num_workers, self, worker_factory, self._port.host)
-
         if self._options.dry_run:
             return (keyboard_interrupted, interrupted, self._worker_stats.values(), self._group_stats, self._all_results)
 
         self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
 
         try:
-            with manager_worker_broker.get(self, worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
+            with message_pool.get(self, worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
         except KeyboardInterrupt:
             self._printer.flush()

Deleted: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py (122512 => 122513)


--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py	2012-07-12 21:46:04 UTC (rev 122512)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py	2012-07-12 21:55:37 UTC (rev 122513)
@@ -1,299 +0,0 @@
-# Copyright (C) 2011 Google Inc. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-#     * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#     * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-#     * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Module for handling messages and concurrency for run-webkit-tests."""
-
-import cPickle
-import logging
-import multiprocessing
-import Queue
-import sys
-import time
-import traceback
-
-
-from webkitpy.common.host import Host
-from webkitpy.common.system import stack_utils
-from webkitpy.layout_tests.views import metered_stream
-
-
-_log = logging.getLogger(__name__)
-
-
-def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
-    """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
-    return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host)
-
-
-class _MessagePool(object):
-    def __init__(self, caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
-        self._caller = caller
-        self._worker_factory = worker_factory
-        self._num_workers = num_workers
-        self._worker_startup_delay_secs = worker_startup_delay_secs
-        self._workers = []
-        self._workers_stopped = set()
-        self._host = host
-        self._name = 'manager'
-        self._running_inline = (self._num_workers == 1)
-        if self._running_inline:
-            self._messages_to_worker = Queue.Queue()
-            self._messages_to_manager = Queue.Queue()
-        else:
-            self._messages_to_worker = multiprocessing.Queue()
-            self._messages_to_manager = multiprocessing.Queue()
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        self._close()
-        return False
-
-    def run(self, shards):
-        """Posts a list of messages to the pool and waits for them to complete."""
-        for message in shards:
-            self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
-
-        for _ in xrange(self._num_workers):
-            self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
-
-        self.wait()
-
-    def _start_workers(self):
-        assert not self._workers
-        self._workers_stopped = set()
-        host = None
-        if self._running_inline or self._can_pickle(self._host):
-            host = self._host
-
-        for worker_number in xrange(self._num_workers):
-            worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None)
-            self._workers.append(worker)
-            worker.start()
-            if self._worker_startup_delay_secs:
-                time.sleep(self._worker_startup_delay_secs)
-
-    def wait(self):
-        try:
-            self._start_workers()
-            if self._running_inline:
-                self._workers[0].run()
-                self._loop(block=False)
-            else:
-                self._loop(block=True)
-        finally:
-            self._close()
-
-    def _close(self):
-        for worker in self._workers:
-            if worker.is_alive():
-                worker.terminate()
-                worker.join()
-        self._workers = []
-        if not self._running_inline:
-            if self._messages_to_worker:
-                self._messages_to_worker.close()
-                self._messages_to_worker = None
-            if self._messages_to_manager:
-                self._messages_to_manager.close()
-                self._messages_to_manager = None
-
-    def _log_messages(self, messages):
-        for message in messages:
-            logging.root.handle(message)
-
-    def _handle_done(self, source):
-        self._workers_stopped.add(source)
-
-    @staticmethod
-    def _handle_worker_exception(source, exception_type, exception_value, stack):
-        if exception_type == KeyboardInterrupt:
-            raise exception_type(exception_value)
-        _log.error("%s raised %s('%s'):" % (
-                   source, exception_value.__class__.__name__, str(exception_value)))
-        raise WorkerException(str(exception_value))
-
-    def _can_pickle(self, host):
-        try:
-            cPickle.dumps(host)
-            return True
-        except TypeError:
-            return False
-
-    def _loop(self, block):
-        try:
-            while True:
-                if len(self._workers_stopped) == len(self._workers):
-                    block = False
-                message = self._messages_to_manager.get(block)
-                self._log_messages(message.logs)
-                if message.from_user:
-                    self._caller.handle(message.name, message.src, *message.args)
-                    continue
-                method = getattr(self, '_handle_' + message.name)
-                assert method, 'bad message %s' % repr(message)
-                method(message.src, *message.args)
-        except Queue.Empty:
-            pass
-
-
-class WorkerException(Exception):
-    """Raised when we receive an unexpected/unknown exception from a worker."""
-    pass
-
-
-class _Message(object):
-    def __init__(self, src, message_name, message_args, from_user, logs):
-        self.src = ""
-        self.name = message_name
-        self.args = message_args
-        self.from_user = from_user
-        self.logs = logs
-
-    def __repr__(self):
-        return '_Message(src="" name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
-
-
-class _Worker(multiprocessing.Process):
-    def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager):
-        super(_Worker, self).__init__()
-        self.host = host
-        self.worker_number = worker_number
-        self.name = 'worker/%d' % worker_number
-        self.log_messages = []
-        self._running_inline = running_inline
-        self._manager = manager
-
-        self._messages_to_manager = messages_to_manager
-        self._messages_to_worker = messages_to_worker
-        self._worker = worker_factory(self)
-        self._logger = None
-        self._log_handler = None
-
-    def terminate(self):
-        if self._worker:
-            self._worker.stop()
-            self._worker = None
-        if self.is_alive():
-            super(_Worker, self).terminate()
-
-    def _close(self):
-        if self._log_handler and self._logger:
-            self._logger.removeHandler(self._log_handler)
-        self._log_handler = None
-        self._logger = None
-
-    def start(self):
-        if not self._running_inline:
-            super(_Worker, self).start()
-
-    def run(self):
-        if not self.host:
-            self.host = Host()
-        if not self._running_inline:
-            self._set_up_logging()
-
-        worker = self._worker
-        exception_msg = ""
-        _log.debug("%s starting" % self.name)
-
-        try:
-            worker.start()
-            while True:
-                message = self._messages_to_worker.get()
-                if message.from_user:
-                    worker.handle(message.name, message.src, *message.args)
-                    self.yield_to_caller()
-                else:
-                    assert message.name == 'stop', 'bad message %s' % repr(message)
-                    break
-
-        except Queue.Empty:
-            assert False, '%s: ran out of messages in worker queue.' % self.name
-        except KeyboardInterrupt, e:
-            exception_msg = ", interrupted"
-            if not self._running_inline:
-                _log.warning('worker exception')
-                self._raise(sys.exc_info())
-            raise
-        except Exception, e:
-            exception_msg = ", exception raised: %s" % str(e)
-            if not self._running_inline:
-                self._raise(sys.exc_info())
-            raise
-        finally:
-            _log.debug("%s exiting%s" % (self.name, exception_msg))
-            try:
-                worker.stop()
-            finally:
-                self._post(name='done', args=(), from_user=False)
-            self._close()
-
-    def post(self, name, *args):
-        self._post(name, args, from_user=True)
-
-    def yield_to_caller(self):
-        if self._running_inline:
-            self._manager._loop(block=False)
-
-    def _post(self, name, args, from_user):
-        log_messages = self.log_messages
-        self.log_messages = []
-        self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
-
-    def _raise(self, exc_info):
-        # Since tracebacks aren't picklable, send the extracted stack instead.
-        exception_type, exception_value, exception_traceback = exc_info
-        stack_utils.log_traceback(_log.error, exception_traceback)
-        stack = traceback.extract_tb(exception_traceback)
-        self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
-
-    def _set_up_logging(self):
-        self._logger = logging.getLogger()
-
-        # The unix multiprocessing implementation clones the MeteredStream log handler
-        # into the child process, so we need to remove it to avoid duplicate logging.
-        for h in self._logger.handlers:
-            # log handlers don't have names until python 2.7.
-            # FIXME: get webkitpy.test.printer from a constant as well.
-            if getattr(h, 'name', '') in (metered_stream.LOG_HANDLER_NAME, 'webkitpy.test.printer'):
-                self._logger.removeHandler(h)
-                break
-
-        self._log_handler = _WorkerLogHandler(self)
-        self._logger.addHandler(self._log_handler)
-
-
-class _WorkerLogHandler(logging.Handler):
-    def __init__(self, worker):
-        logging.Handler.__init__(self)
-        self._worker = worker
-
-    def emit(self, record):
-        self._worker.log_messages.append(record)

Modified: trunk/Tools/Scripts/webkitpy/layout_tests/run_webkit_tests_integrationtest.py (122512 => 122513)


--- trunk/Tools/Scripts/webkitpy/layout_tests/run_webkit_tests_integrationtest.py	2012-07-12 21:46:04 UTC (rev 122512)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/run_webkit_tests_integrationtest.py	2012-07-12 21:55:37 UTC (rev 122513)
@@ -51,7 +51,7 @@
 
 from webkitpy.layout_tests import port
 from webkitpy.layout_tests import run_webkit_tests
-from webkitpy.layout_tests.controllers.manager_worker_broker import WorkerException
+from webkitpy.layout_tests.controllers.manager import WorkerException
 from webkitpy.layout_tests.port import Port
 from webkitpy.layout_tests.port.test import TestPort, TestDriver
 from webkitpy.test.skip import skip_if
_______________________________________________
webkit-changes mailing list
[email protected]
http://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to