4 new commits in pytest-xdist: https://bitbucket.org/hpk42/pytest-xdist/commits/2b3f7baf4fa0/ Changeset: 2b3f7baf4fa0 User: hpk42 Date: 2014-01-27 11:37:26 Summary: a failing test for pytest issue419 Affected #: 2 files
diff -r 1138fbd617e138dad6c72a6492d0b7753e61eae9 -r 2b3f7baf4fa00f49b6300e56b7c134167ab0f24d testing/test_dsession.py --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -6,6 +6,7 @@ ) from _pytest import main as outcome import py +import pytest import execnet XSpec = execnet.XSpec @@ -209,3 +210,16 @@ report_collection_diff(from_collection, to_collection, 1, 2) except AssertionError as e: assert py.builtin._totext(e) == error_message + +@pytest.mark.xfail(reason="duplicate test ids not supported yet") +def test_pytest_issue419(testdir): + testdir.makepyfile(""" + import pytest + + @pytest.mark.parametrize('birth_year', [1988, 1988, ]) + def test_2011_table(birth_year): + pass + """) + reprec = testdir.inline_run("-n1") + reprec.assertoutcome(passed=2) + assert 0 diff -r 1138fbd617e138dad6c72a6492d0b7753e61eae9 -r 2b3f7baf4fa00f49b6300e56b7c134167ab0f24d xdist/dsession.py --- a/xdist/dsession.py +++ b/xdist/dsession.py @@ -142,6 +142,8 @@ node.gateway.id, ) + # all collections are the same, good. + # we now create an index self.pending = col if not col: return https://bitbucket.org/hpk42/pytest-xdist/commits/d3a9df2d81a1/ Changeset: d3a9df2d81a1 User: hpk42 Date: 2014-01-27 11:37:27 Summary: remove unneccesary item2nodes data structure for load scheduling Affected #: 1 file diff -r 2b3f7baf4fa00f49b6300e56b7c134167ab0f24d -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b xdist/dsession.py --- a/xdist/dsession.py +++ b/xdist/dsession.py @@ -95,43 +95,27 @@ self.collection_is_completed = True def remove_item(self, node, item): - if item not in self.item2nodes: - raise AssertionError(item, self.item2nodes) - nodes = self.item2nodes[item] - if node in nodes: # the node might have gone down already - nodes.remove(node) - #if not nodes: - # del self.item2nodes[item] - pending = self.node2pending[node] - pending.remove(item) + node_pending = self.node2pending[node] + node_pending.remove(item) # pre-load items-to-test if the node may become ready - if self.pending and len(pending) < self.LOAD_THRESHOLD_NEWITEMS: + if self.pending and len(node_pending) < self.LOAD_THRESHOLD_NEWITEMS: item = self.pending.pop(0) - pending.append(item) - self.item2nodes.setdefault(item, []).append(node) + node_pending.append(item) node.send_runtest(item) self.log("items waiting for node: %d" %(len(self.pending))) - #self.log("item2pending still executing: %s" %(self.item2nodes,)) #self.log("node2pending: %s" %(self.node2pending,)) def remove_node(self, node): pending = self.node2pending.pop(node) - # KeyError if we didn't get an addnode() yet - for item in pending: - l = self.item2nodes[item] - l.remove(node) - if not l: - del self.item2nodes[item] if not pending: return + # the node must have crashed on the item if there are pending ones crashitem = pending.pop(0) self.pending.extend(pending) return crashitem def init_distribute(self): assert self.collection_is_completed - assert not hasattr(self, 'item2nodes') - self.item2nodes = {} # XXX allow nodes to have different collections first_node, col = list(self.node2collection.items())[0] for node, collection in self.node2collection.items(): @@ -154,7 +138,6 @@ nodeindex = i % num_available node, pending = available[nodeindex] node.send_runtest(item) - self.item2nodes.setdefault(item, []).append(node) pending.append(item) if i >= max_one_round: break https://bitbucket.org/hpk42/pytest-xdist/commits/fe461fefe08f/ Changeset: fe461fefe08f User: hpk42 Date: 2014-01-27 11:37:33 Summary: fix issue419: work with collection indices instead of node ids. This reduces network message size. Affected #: 6 files diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r fe461fefe08fb14a20b85743988b24d836ce81e1 CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -7,6 +7,10 @@ - fix pytest issue382 - produce "pytest_runtest_logstart" event again in master. Thanks Aron Curzon. +- fix pytest issue419 by sending/receiving indices into the test + collection instead of node ids (which are not neccessarily unique + for functions parametrized with duplicate values) + 1.9 ------------------------- diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r fe461fefe08fb14a20b85743988b24d836ce81e1 testing/test_dsession.py --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -64,9 +64,9 @@ assert sched.tests_finished() assert node1.sent == ['ALL'] assert node2.sent == ['ALL'] - sched.remove_item(node1, collection[0]) + sched.remove_item(node1, 0) assert sched.tests_finished() - sched.remove_item(node2, collection[0]) + sched.remove_item(node2, 0) assert sched.tests_finished() def test_schedule_remove_node(self): @@ -105,7 +105,7 @@ assert len(node1.sent) == 1 assert len(node2.sent) == 1 x = sorted(node1.sent + node2.sent) - assert x == collection + assert x == [0, 1] sched.remove_item(node1, node1.sent[0]) sched.remove_item(node2, node2.sent[0]) assert sched.tests_finished() @@ -126,14 +126,14 @@ sent1 = node1.sent sent2 = node2.sent chunkitems = col[:sched.ITEM_CHUNKSIZE] - assert sent1 == chunkitems - assert sent2 == chunkitems + assert (sent1 == [0,2] and sent2 == [1,3]) or ( + sent1 == [1,3] and sent2 == [0,2]) assert sched.node2pending[node1] == sent1 assert sched.node2pending[node2] == sent2 assert len(sched.pending) == 1 for node in (node1, node2): - for i in range(sched.ITEM_CHUNKSIZE): - sched.remove_item(node, "xyz") + for i in sched.node2pending[node]: + sched.remove_item(node, i) assert not sched.pending def test_add_remove_node(self): diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r fe461fefe08fb14a20b85743988b24d836ce81e1 testing/test_remote.py --- a/testing/test_remote.py +++ b/testing/test_remote.py @@ -154,7 +154,7 @@ assert ev.kwargs['topdir'] == slave.testdir.tmpdir ids = ev.kwargs['ids'] assert len(ids) == 1 - slave.sendcommand("runtests", ids=ids) + slave.sendcommand("runtests", indices=range(len(ids))) slave.sendcommand("shutdown") ev = slave.popevent("logstart") assert ev.kwargs["nodeid"].endswith("test_func") diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r fe461fefe08fb14a20b85743988b24d836ce81e1 xdist/dsession.py --- a/xdist/dsession.py +++ b/xdist/dsession.py @@ -40,15 +40,15 @@ if len(self.node2pending) >= self.numnodes: self.collection_is_completed = True - def remove_item(self, node, item): - self.node2pending[node].remove(item) + def remove_item(self, node, item_index): + self.node2pending[node].remove(item_index) def remove_node(self, node): # KeyError if we didn't get an addnode() yet pending = self.node2pending.pop(node) if not pending: return - crashitem = pending.pop(0) + crashitem = self.node2collection[node][pending.pop(0)] # XXX what about the rest of pending? return crashitem @@ -56,7 +56,7 @@ assert self.collection_is_completed for node, pending in self.node2pending.items(): node.send_runtest_all() - pending[:] = self.node2collection[node] + pending[:] = range(len(self.node2collection[node])) class LoadScheduling: LOAD_THRESHOLD_NEWITEMS = 5 @@ -94,14 +94,15 @@ if len(self.node2collection) >= self.numnodes: self.collection_is_completed = True - def remove_item(self, node, item): + def remove_item(self, node, item_index): node_pending = self.node2pending[node] - node_pending.remove(item) + assert item_index in node_pending, (item_index, node_pending) + node_pending.remove(item_index) # pre-load items-to-test if the node may become ready if self.pending and len(node_pending) < self.LOAD_THRESHOLD_NEWITEMS: - item = self.pending.pop(0) - node_pending.append(item) - node.send_runtest(item) + item_index = self.pending.pop(0) + node_pending.append(item_index) + node.send_runtest(item_index) self.log("items waiting for node: %d" %(len(self.pending))) #self.log("node2pending: %s" %(self.node2pending,)) @@ -110,7 +111,7 @@ if not pending: return # the node must have crashed on the item if there are pending ones - crashitem = pending.pop(0) + crashitem = self.collection[pending.pop(0)] self.pending.extend(pending) return crashitem @@ -128,17 +129,18 @@ # all collections are the same, good. # we now create an index - self.pending = col + self.collection = col + self.pending = range(len(col)) if not col: return available = list(self.node2pending.items()) num_available = self.numnodes max_one_round = num_available * self.ITEM_CHUNKSIZE - 1 - for i, item in enumerate(self.pending): + for i, item_index in enumerate(self.pending): nodeindex = i % num_available node, pending = available[nodeindex] - node.send_runtest(item) - pending.append(item) + node.send_runtest(item_index) + pending.append(item_index) if i >= max_one_round: break del self.pending[:i + 1] @@ -304,7 +306,7 @@ def slave_testreport(self, node, rep): if not (rep.passed and rep.when != "call"): if rep.when in ("setup", "call"): - self.sched.remove_item(node, rep.nodeid) + self.sched.remove_item(node, rep.item_index) #self.report_line("testreport %s: %s" %(rep.id, rep.status)) rep.node = node self.config.hook.pytest_runtest_logreport(report=rep) diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r fe461fefe08fb14a20b85743988b24d836ce81e1 xdist/remote.py --- a/xdist/remote.py +++ b/xdist/remote.py @@ -47,39 +47,44 @@ name, kwargs = self.channel.receive() self.log("received command %s(**%s)" % (name, kwargs)) if name == "runtests": - ids = kwargs['ids'] - for nodeid in ids: - torun.append(self._id2item[nodeid]) + torun.extend(kwargs['indices']) elif name == "runtests_all": - torun.extend(session.items) - self.log("items to run: %s" %(len(torun))) + torun.extend(range(len(session.items))) + self.log("items to run: %s" % (torun,)) while len(torun) >= 2: - item = torun.pop(0) - nextitem = torun[0] - self.config.hook.pytest_runtest_protocol(item=item, - nextitem=nextitem) + # we store item_index so that we can pick it up from the + # runtest hooks + self.run_one_test(torun) + if name == "shutdown": while torun: - self.config.hook.pytest_runtest_protocol( - item=torun.pop(0), nextitem=None) + self.run_one_test(torun) break return True + def run_one_test(self, torun): + items = self.session.items + self.item_index = torun.pop(0) + if torun: + nextitem = items[torun[0]] + else: + nextitem = None + self.config.hook.pytest_runtest_protocol( + item=items[self.item_index], + nextitem=nextitem) + def pytest_collection_finish(self, session): - self._id2item = {} - ids = [] - for item in session.items: - self._id2item[item.nodeid] = item - ids.append(item.nodeid) self.sendevent("collectionfinish", topdir=str(session.fspath), - ids=ids) + ids=[item.nodeid for item in session.items]) def pytest_runtest_logstart(self, nodeid, location): self.sendevent("logstart", nodeid=nodeid, location=location) def pytest_runtest_logreport(self, report): data = serialize_report(report) + data["item_index"] = self.item_index + assert self.session.items[self.item_index].nodeid == report.nodeid self.sendevent("testreport", data=data) def pytest_collectreport(self, report): diff -r d3a9df2d81a14b7a33b067ff64f28ebfa7df146b -r fe461fefe08fb14a20b85743988b24d836ce81e1 xdist/slavemanage.py --- a/xdist/slavemanage.py +++ b/xdist/slavemanage.py @@ -241,8 +241,8 @@ self.gateway.exit() #del self.gateway - def send_runtest(self, nodeid): - self.sendcommand("runtests", ids=[nodeid]) + def send_runtest(self, index): + self.sendcommand("runtests", indices=[index]) def send_runtest_all(self): self.sendcommand("runtests_all",) @@ -292,7 +292,10 @@ elif eventname == "logstart": self.notify_inproc(eventname, node=self, **kwargs) elif eventname in ("testreport", "collectreport", "teardownreport"): + item_index = kwargs.pop("item_index", None) rep = unserialize_report(eventname, kwargs['data']) + if item_index is not None: + rep.item_index = item_index self.notify_inproc(eventname, node=self, rep=rep) elif eventname == "collectionfinish": self.notify_inproc(eventname, node=self, ids=kwargs['ids']) @@ -307,8 +310,7 @@ self.config.pluginmanager.notify_exception(excinfo) def unserialize_report(name, reportdict): - d = reportdict if name == "testreport": - return runner.TestReport(**d) + return runner.TestReport(**reportdict) elif name == "collectreport": - return runner.CollectReport(**d) + return runner.CollectReport(**reportdict) https://bitbucket.org/hpk42/pytest-xdist/commits/05aa4f48306c/ Changeset: 05aa4f48306c User: hpk42 Date: 2014-01-27 11:37:40 Summary: send multiple "to test" indices in one network message to a slave and improve heuristics for sending chunks where the chunksize depends on the number of remaining tests rather than fixed numbers. This reduces the number of master -> node messages (but not the reverse direction) Affected #: 8 files diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -11,6 +11,13 @@ collection instead of node ids (which are not neccessarily unique for functions parametrized with duplicate values) +- send multiple "to test" indices in one network message to a slave + and improve heuristics for sending chunks where the chunksize + depends on the number of remaining tests rather than fixed numbers. + This reduces the number of master -> node messages (but not the + reverse direction) + + 1.9 ------------------------- diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 setup.py --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ packages = ['xdist'], entry_points = {'pytest11': ['xdist = xdist.plugin'],}, zip_safe=False, - install_requires = ['execnet>=1.1', 'pytest>=2.3.5'], + install_requires = ['execnet>=1.1', 'pytest>=2.4.2'], classifiers=[ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 testing/test_dsession.py --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -29,15 +29,12 @@ self.sent = [] self.gateway = MockGateway() - def send_runtest(self, nodeid): - self.sent.append(nodeid) + def send_runtest_some(self, indices): + self.sent.extend(indices) def send_runtest_all(self): self.sent.append("ALL") - def sendlist(self, items): - self.sent.extend(items) - def shutdown(self): self._shutdown=True @@ -117,17 +114,16 @@ node2 = MockNode() sched.addnode(node1) sched.addnode(node2) - sched.ITEM_CHUNKSIZE = 2 - col = ["xyz"] * (2*sched.ITEM_CHUNKSIZE +1) + col = ["xyz"] * (3) sched.addnode_collection(node1, col) sched.addnode_collection(node2, col) sched.init_distribute() #assert not sched.tests_finished() sent1 = node1.sent sent2 = node2.sent - chunkitems = col[:sched.ITEM_CHUNKSIZE] - assert (sent1 == [0,2] and sent2 == [1,3]) or ( - sent1 == [1,3] and sent2 == [0,2]) + chunkitems = col[:1] + assert (sent1 == [0] and sent2 == [1]) or ( + sent1 == [1] and sent2 == [0]) assert sched.node2pending[node1] == sent1 assert sched.node2pending[node2] == sent2 assert len(sched.pending) == 1 diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 testing/test_remote.py --- a/testing/test_remote.py +++ b/testing/test_remote.py @@ -154,7 +154,7 @@ assert ev.kwargs['topdir'] == slave.testdir.tmpdir ids = ev.kwargs['ids'] assert len(ids) == 1 - slave.sendcommand("runtests", indices=range(len(ids))) + slave.sendcommand("runtests", indices=list(range(len(ids)))) slave.sendcommand("shutdown") ev = slave.popevent("logstart") assert ev.kwargs["nodeid"].endswith("test_func") diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 tox.ini --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,7 @@ [testenv] changedir=testing -deps=pytest>=2.4.2 +deps=pytest>=2.5.1 commands= py.test --junitxml={envlogdir}/junit-{envname}.xml [] [testenv:py27] diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 xdist/dsession.py --- a/xdist/dsession.py +++ b/xdist/dsession.py @@ -59,9 +59,6 @@ pending[:] = range(len(self.node2collection[node])) class LoadScheduling: - LOAD_THRESHOLD_NEWITEMS = 5 - ITEM_CHUNKSIZE = 10 - def __init__(self, numnodes, log=None): self.numnodes = numnodes self.node2pending = {} @@ -96,15 +93,24 @@ def remove_item(self, node, item_index): node_pending = self.node2pending[node] - assert item_index in node_pending, (item_index, node_pending) node_pending.remove(item_index) # pre-load items-to-test if the node may become ready - if self.pending and len(node_pending) < self.LOAD_THRESHOLD_NEWITEMS: - item_index = self.pending.pop(0) - node_pending.append(item_index) - node.send_runtest(item_index) - self.log("items waiting for node: %d" %(len(self.pending))) - #self.log("node2pending: %s" %(self.node2pending,)) + + if self.pending: + # how many nodes do we have remaining per node roughly? + num_nodes = len(self.node2pending) + # if our node goes below a heuristic minimum, fill it out to + # heuristic maximum + items_per_node_min = max( + 1, len(self.pending) // num_nodes // 4) + items_per_node_max = max( + 1, len(self.pending) // num_nodes // 2) + if len(node_pending) <= items_per_node_min: + num_send = items_per_node_max - len(node_pending) + 1 + self._send_tests(node, num_send) + + self.log("num items waiting for node:", len(self.pending)) + #self.log("node2pending:", self.node2pending) def remove_node(self, node): pending = self.node2pending.pop(node) @@ -118,8 +124,9 @@ def init_distribute(self): assert self.collection_is_completed # XXX allow nodes to have different collections - first_node, col = list(self.node2collection.items())[0] - for node, collection in self.node2collection.items(): + node_collection_items = list(self.node2collection.items()) + first_node, col = node_collection_items[0] + for node, collection in node_collection_items[1:]: report_collection_diff( col, collection, @@ -130,21 +137,25 @@ # all collections are the same, good. # we now create an index self.collection = col - self.pending = range(len(col)) + self.pending[:] = range(len(col)) if not col: return - available = list(self.node2pending.items()) - num_available = self.numnodes - max_one_round = num_available * self.ITEM_CHUNKSIZE - 1 - for i, item_index in enumerate(self.pending): - nodeindex = i % num_available - node, pending = available[nodeindex] - node.send_runtest(item_index) - pending.append(item_index) - if i >= max_one_round: - break - del self.pending[:i + 1] + # how many items per node do we have about? + items_per_node = len(self.collection) // len(self.node2pending) + # take half of it for initial distribution, at least 1 + node_chunksize = max(items_per_node // 2, 1) + # and initialize each node with a chunk of tests + for node in self.node2pending: + self._send_tests(node, node_chunksize) + #f = open("/tmp/sent", "w") + def _send_tests(self, node, num): + tests_per_node = self.pending[:num] + #print >>self.f, "sent", node, tests_per_node + if tests_per_node: + del self.pending[:num] + self.node2pending[node].extend(tests_per_node) + node.send_runtest_some(tests_per_node) def report_collection_diff(from_collection, to_collection, from_id, to_id): """Report the collected test difference between two nodes. @@ -243,7 +254,7 @@ assert callname, kwargs method = "slave_" + callname call = getattr(self, method) - self.log("calling method: %s(**%s)" % (method, kwargs)) + self.log("calling method", method, kwargs) call(**kwargs) if self.sched.tests_finished(): self.triggershutdown() diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 xdist/remote.py --- a/xdist/remote.py +++ b/xdist/remote.py @@ -24,7 +24,7 @@ def pytest_internalerror(self, excrepr): for line in str(excrepr).split("\n"): - self.log("IERROR> " + line) + self.log("IERROR>", line) def pytest_sessionstart(self, session): self.session = session @@ -45,24 +45,19 @@ torun = [] while 1: name, kwargs = self.channel.receive() - self.log("received command %s(**%s)" % (name, kwargs)) + self.log("received command", name, kwargs) if name == "runtests": torun.extend(kwargs['indices']) elif name == "runtests_all": torun.extend(range(len(session.items))) - self.log("items to run: %s" % (torun,)) - while len(torun) >= 2: - # we store item_index so that we can pick it up from the - # runtest hooks - self.run_one_test(torun) - + self.log("items to run:", torun) + while torun: + self.run_tests(torun) if name == "shutdown": - while torun: - self.run_one_test(torun) break return True - def run_one_test(self, torun): + def run_tests(self, torun): items = self.session.items self.item_index = torun.pop(0) if torun: diff -r fe461fefe08fb14a20b85743988b24d836ce81e1 -r 05aa4f48306c905ae4d183a4b337f63cc4490466 xdist/slavemanage.py --- a/xdist/slavemanage.py +++ b/xdist/slavemanage.py @@ -241,8 +241,8 @@ self.gateway.exit() #del self.gateway - def send_runtest(self, index): - self.sendcommand("runtests", indices=[index]) + def send_runtest_some(self, indices): + self.sendcommand("runtests", indices=indices) def send_runtest_all(self): self.sendcommand("runtests_all",) Repository URL: https://bitbucket.org/hpk42/pytest-xdist/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. _______________________________________________ pytest-commit mailing list pytest-commit@python.org https://mail.python.org/mailman/listinfo/pytest-commit