Diff
Modified: trunk/Tools/ChangeLog (121808 => 121809)
--- trunk/Tools/ChangeLog 2012-07-03 23:31:49 UTC (rev 121808)
+++ trunk/Tools/ChangeLog 2012-07-03 23:31:57 UTC (rev 121809)
@@ -1,5 +1,69 @@
2012-07-03 Dirk Pranke <[email protected]>
+ nrwt: make the worker class stand alone with a cleaner interface
+ https://bugs.webkit.org/show_bug.cgi?id=90409
+
+ Reviewed by Ojan Vafai.
+
+ Currently the Worker class derives from AbstractWorker, which is
+ kind of crufty and awkward; it would be better if we did not
+ rely on shared state.
+
+ This change changes that so that Worker derives from object, and
+ exposes the following interface:
+ __init__() - called in the manager process
+ safe_init() - called in the worker process to initialize
+ unpicklable state
+ handle() - a single routine to handle all messages
+ cleanup() - called so the worker can clean up
+
+ Also, all of the "administrative" messages that are handled by
+ the worker (notification of start/stop/etc.) move into
+ manager_worker_broker - this reduces worker.py to just handling
+ the mechanics of actually running each test.
+
+ For the moment, we do this by creating Yet Another wrapper/proxy
+ class in manager_worker_broker, but this will get simpler
+ shortly when the rest of m_w_b is cleaned up.
+
+ With this change worker is now in its new form but there will be
+ a follow-on change that cleans up some names and other minor
+ things.
+
+ This change is again mostly just moving things around and should
+ be covered by the (updated) existing tests.
+
+ * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
+ (get):
+ (AbstractWorker.__init__):
+ (AbstractWorker.run):
+ (AbstractWorker):
+ (AbstractWorker.handle_stop):
+ (AbstractWorker.handle_test_list):
+ (AbstractWorker.yield_to_broker):
+ (AbstractWorker.post_message):
+ (_WorkerConnection.__init__):
+ (_Process.run):
+ * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
+ (_TestWorker):
+ (_TestWorker.__init__):
+ (_TestWorker.name):
+ (_TestWorker.cleanup):
+ (_TestWorker.handle):
+ (_TestWorker.safe_init):
+ (_TestWorker.stop):
+ (_TestsMixin.handle_finished_test):
+ (_TestsMixin.setUp):
+ (_TestsMixin.test_cancel):
+ (_TestsMixin.test_done):
+ * Scripts/webkitpy/layout_tests/controllers/worker.py:
+ (Worker):
+ (Worker.__init__):
+ (Worker.safe_init):
+ (Worker.handle):
+
+2012-07-03 Dirk Pranke <[email protected]>
+
nrwt: moving child process logging code into manager_worker_broker
https://bugs.webkit.org/show_bug.cgi?id=90408
Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py (121808 => 121809)
--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py 2012-07-03 23:31:49 UTC (rev 121808)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py 2012-07-03 23:31:57 UTC (rev 121809)
@@ -75,6 +75,7 @@
import traceback
+from webkitpy.common.host import Host
from webkitpy.common.system import stack_utils
from webkitpy.layout_tests.views import metered_stream
@@ -96,7 +97,7 @@
max_workers - max # of workers to run concurrently.
client - BrokerClient implementation to dispatch
replies to.
- worker_factory: factory method for creatin objects that implement the AbstractWorker interface.
+ worker_factory: factory method for creating objects that implement the Worker interface.
host: optional picklable host object that can be passed to workers for testing.
Returns:
A handle to an object that will talk to a message broker configured
@@ -263,23 +264,15 @@
class AbstractWorker(BrokerClient):
def __init__(self, worker_connection, worker_number):
- """The constructor should be used to do any simple initialization
- necessary, but should not do anything that creates data structures
- that cannot be Pickled or sent across processes (like opening
- files or sockets). Complex initialization should be done at the
- start of the run() call.
-
- Args:
- worker_connection - handle to the _BrokerConnection object creating
- the worker and that can be used for messaging.
- worker_number - the number/index of the current worker."""
BrokerClient.__init__(self)
+ self.worker = None
self._worker_connection = worker_connection
self._worker_number = worker_number
self._name = 'worker/%d' % worker_number
self._done = False
self._canceled = False
self._options = optparse.Values({'verbose': False})
+ self.host = None
def name(self):
return self._name
@@ -290,11 +283,15 @@
def stop_handling_messages(self):
self._done = True
- def run(self):
+ def run(self, host):
"""Callback for the worker to start executing. Typically does any
remaining initialization and then calls broker_connection.run_message_loop()."""
exception_msg = ""
+ self.host = host
+ self.worker.safe_init()
+ _log.debug('%s starting' % self._name)
+
try:
self._worker_connection.run_message_loop()
if not self.is_done():
@@ -308,14 +305,28 @@
self._worker_connection.raise_exception(sys.exc_info())
finally:
_log.debug("%s done with message loop%s" % (self._name, exception_msg))
+ self.worker.cleanup()
+ self._worker_connection.post_message('done')
+ def handle_stop(self, source):
+ self._done = True
+
+ def handle_test_list(self, source, list_name, test_list):
+ self.worker.handle('test_list', source, list_name, test_list)
+
def cancel(self):
"""Called when possible to indicate to the worker to stop processing
messages and shut down. Note that workers may be stopped without this
method being called, so clients should not rely solely on this."""
self._canceled = True
+ def yield_to_broker(self):
+ self._worker_connection.yield_to_broker()
+ def post_message(self, *args):
+ self._worker_connection.post_message(*args)
+
+
class _ManagerConnection(_BrokerConnection):
def __init__(self, broker, client, worker_factory, host):
_BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC)
@@ -362,7 +373,12 @@
class _WorkerConnection(_BrokerConnection):
def __init__(self, host, broker, worker_factory, worker_number):
- self._client = worker_factory(self, worker_number)
+ # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the
+ # actual Worker (created by worker_factory) is very confusing, but this all gets better when
+ # _WorkerConnection and AbstractWorker get merged.
+ self._client = AbstractWorker(self, worker_number)
+ self._worker = worker_factory(self._client, worker_number)
+ self._client.worker = self._worker
self._host = host
self._log_messages = []
self._logger = None
@@ -451,6 +467,8 @@
self._client = client
def run(self):
+ if not self._worker_connection._host:
+ self._worker_connection._host = Host()
self._worker_connection.run()
Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py (121808 => 121809)
--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py 2012-07-03 23:31:49 UTC (rev 121808)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py 2012-07-03 23:31:57 UTC (rev 121809)
@@ -54,33 +54,36 @@
return manager_worker_broker.get(max_workers, manager, _TestWorker)
-class _TestWorker(manager_worker_broker.AbstractWorker):
- def __init__(self, worker_connection, worker_number=1):
- super(_TestWorker, self).__init__(worker_connection, worker_number)
+class _TestWorker(object):
+ def __init__(self, caller, worker_number):
+ self._caller = caller
self._thing_to_greet = 'everybody'
self._starting_queue = starting_queue
self._stopping_queue = stopping_queue
+ self._options = optparse.Values({'verbose': False})
- def handle_stop(self, src):
- self.stop_handling_messages()
+ def name(self):
+ return WORKER_NAME
- def handle_test(self, src, an_int, a_str):
+ def cleanup(self):
+ pass
+
+ def handle(self, message, src, an_int, a_str):
assert an_int == 1
assert a_str == "hello, world"
- self._worker_connection.post_message('test', 2, 'hi, ' + self._thing_to_greet)
+ self._caller.post_message('finished_test', 2)
- def run(self, host):
+ def safe_init(self):
if self._starting_queue:
self._starting_queue.put('')
if self._stopping_queue:
self._stopping_queue.get()
- try:
- super(_TestWorker, self).run()
- finally:
- self._worker_connection.post_message('done')
+ def stop(self):
+ self._caller.post_message('done')
+
class FunctionTests(unittest.TestCase):
def test_get__inline(self):
self.assertTrue(make_broker(self, 1) is not None)
@@ -105,16 +108,14 @@
def handle_done(self, src, log_messages):
self._done = True
- def handle_test(self, src, an_int, a_str):
+ def handle_finished_test(self, src, an_int, log_messages):
self._an_int = an_int
- self._a_str = a_str
def handle_exception(self, src, exception_type, exception_value, stack):
raise exception_type(exception_value)
def setUp(self):
self._an_int = None
- self._a_str = None
self._broker = None
self._done = False
self._exception = None
@@ -136,7 +137,7 @@
def test_cancel(self):
self.make_broker()
worker = self._broker.start_worker(1)
- self._broker.post_message('test', 1, 'hello, world')
+ self._broker.post_message('test_list', 1, 'hello, world')
worker.cancel()
worker.join(0.1)
self.assertFalse(worker.is_alive())
@@ -145,14 +146,13 @@
def test_done(self):
self.make_broker()
worker = self._broker.start_worker(1)
- self._broker.post_message('test', 1, 'hello, world')
+ self._broker.post_message('test_list', 1, 'hello, world')
self._broker.post_message('stop')
self._broker.run_message_loop()
worker.join(0.5)
self.assertFalse(worker.is_alive())
self.assertTrue(self.is_done())
self.assertEqual(self._an_int, 2)
- self.assertEqual(self._a_str, 'hi, everybody')
self._broker.cleanup()
def test_unknown_message(self):
Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py (121808 => 121809)
--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py 2012-07-03 23:31:49 UTC (rev 121808)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/worker.py 2012-07-03 23:31:57 UTC (rev 121809)
@@ -32,8 +32,6 @@
import threading
import time
-from webkitpy.common.host import Host
-from webkitpy.layout_tests.controllers import manager_worker_broker
from webkitpy.layout_tests.controllers import single_test_runner
from webkitpy.layout_tests.models import test_expectations
from webkitpy.layout_tests.models import test_results
@@ -42,11 +40,16 @@
_log = logging.getLogger(__name__)
-class Worker(manager_worker_broker.AbstractWorker):
+class Worker(object):
def __init__(self, worker_connection, worker_number, results_directory, options):
- super(Worker, self).__init__(worker_connection, worker_number)
+ self._worker_connection = worker_connection
+ self._worker_number = worker_number
+ self._name = 'worker/%d' % worker_number
self._results_directory = results_directory
self._options = options
+
+ # The remaining fields are initialized in safe_init()
+ self._host = None
self._port = None
self._batch_size = None
self._batch_count = None
@@ -59,35 +62,19 @@
self.cleanup()
def safe_init(self):
- """This method should only be called when it is is safe for the mixin
- to create state that can't be Pickled.
+ """This method is called when it is safe for the object to create state that
+ does not need to be pickled (usually this means it is called in a child process)."""
+ self._host = self._worker_connection.host
+ self._filesystem = self._host.filesystem
+ self._port = self._host.port_factory.get(self._options.platform, self._options)
- This routine exists so that the mixin can be created and then marshaled
- across into a child process."""
- self._filesystem = self._port.host.filesystem
self._batch_count = 0
self._batch_size = self._options.batch_size or 0
tests_run_filename = self._filesystem.join(self._results_directory, "tests_run%d.txt" % self._worker_number)
self._tests_run_file = self._filesystem.open_text_file_for_writing(tests_run_filename)
- def run(self, host):
- if not host:
- host = Host()
-
- options = self._options
- self._port = host.port_factory.get(options.platform, options)
-
- self.safe_init()
- try:
- _log.debug("%s starting" % self._name)
- super(Worker, self).run()
- finally:
- self.kill_driver()
- _log.debug("%s exiting" % self._name)
- self.cleanup()
- self._worker_connection.post_message('done')
-
- def handle_test_list(self, src, list_name, test_list):
+ def handle(self, name, source, list_name, test_list):
+ assert name == 'test_list'
start_time = time.time()
num_tests = 0
for test_input in test_list:
@@ -99,9 +86,6 @@
elapsed_time = time.time() - start_time
self._worker_connection.post_message('finished_list', list_name, num_tests, elapsed_time)
- def handle_stop(self, src):
- self.stop_handling_messages()
-
def _update_test_input(self, test_input):
test_input.reference_files = self._port.reference_files(test_input.test_name)
if test_input.reference_files: