This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 0f8d91e DISPATCH-1408 - Modify ClosestTest to send specific number of messages. Also added additional checks to make sure that all routers know about closed receivers. This closes #574. 0f8d91e is described below commit 0f8d91e30eb5477e4f832bfe09552193371145a1 Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Fri Sep 27 13:52:48 2019 -0400 DISPATCH-1408 - Modify ClosestTest to send specific number of messages. Also added additional checks to make sure that all routers know about closed receivers. This closes #574. --- tests/system_tests_distribution.py | 103 ++++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 25 deletions(-) diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py index c3be49c..ea1f122 100644 --- a/tests/system_tests_distribution.py +++ b/tests/system_tests_distribution.py @@ -2272,6 +2272,17 @@ class ClosestTest ( MessagingHandler ): self.test_name = test_name self.sender = None + self.n_sent_1 = 0 + self.n_sent_2 = 0 + self.n_sent_3 = 0 + + self.recv_1_a_closed = False + self.recv_1_b_closed = False + self.recv_2_a_closed = False + self.recv_2_b_closed = False + + self.first_check = True + self.send_on_sendable = True def timeout ( self ): self.bail ( "Timeout Expired " ) @@ -2322,46 +2333,69 @@ class ClosestTest ( MessagingHandler ): self.recv_3_b.flow ( self.n_expected ) self.addr_check_receiver = event.container.create_receiver ( self.cnx_1, dynamic=True ) + self.addr_check_receiver.flow(100) self.addr_check_sender = event.container.create_sender ( self.cnx_1, "$management" ) def on_link_opened(self, event): - if event.receiver: - event.receiver.flow ( self.n_expected ) if event.receiver == self.addr_check_receiver: # my addr-check link has opened: make the addr_checker with the given address. self.addr_checker = AddressChecker ( self.addr_check_receiver.remote_source.address ) self.addr_check() - def on_sendable ( self, event ): # Fix for DISPATCH-1408 - Make sure that the correct sender is sending the messages to self.dest - if event.sender == self.sender: + if event.sender == self.sender and self.n_sent_1 < self.one_third and self.send_on_sendable: msg = Message ( body = "Hello, closest.", address = self.dest) event.sender.send ( msg ) + self.n_sent_1 += 1 + if self.n_sent_1 == self.one_third: + # Henceforth, we will not be using on_sendable to send messages. + # Look at on_link_closed + self.send_on_sendable = False + + def send_messages(self): + while self.n_sent_1 < self.one_third: + msg = Message ( body = "Hello, closest.", + address = self.dest) + self.sender.send ( msg ) + self.n_sent_1 += 1 - def on_message ( self, event ): + def on_message ( self, event ): if event.receiver == self.addr_check_receiver: # This is a response to one of my address-readiness checking messages. response = self.addr_checker.parse_address_query_response(event.message) - if response.status_code == 200 and response.subscriberCount == 2 and response.remoteCount == 2: - # now we know that we have two subscribers on attached router, and two remote - # routers that know about the address. The network is ready. - # Now we can make the sender without getting a - # "No Path To Destination" error. - self.sender = event.container.create_sender ( self.send_cnx, self.dest ) - - # And we can quit checking. - if self.addr_check_timer: - self.addr_check_timer.cancel() - self.addr_check_timer = None + if self.first_check: + if response.status_code == 200 and response.subscriberCount == 2 and response.remoteCount == 2: + # now we know that we have two subscribers on attached router, and two remote + # routers that know about the address. The network is ready. + # Now we can make the sender without getting a + # "No Path To Destination" error. + self.sender = event.container.create_sender ( self.send_cnx, self.dest ) + + # And we can quit checking. + if self.addr_check_timer: + self.addr_check_timer.cancel() + self.addr_check_timer = None + else: + # If the latest check did not find the link-attack route ready, + # schedule another check a little while from now. + self.addr_check_timer = event.reactor.schedule(0.25, AddressCheckerTimeout(self)) else: - # If the latest check did not find the link-attack route ready, - # schedule another check a little while from now. - self.addr_check_timer = event.reactor.schedule(0.25, AddressCheckerTimeout(self)) + if response.status_code == 200 and response.subscriberCount == 0 and response.remoteCount == 1: + self.send_messages() + if self.addr_check_timer: + self.addr_check_timer.cancel() + self.addr_check_timer = None + else: + # If the latest check did not find the link-attack route ready, + # schedule another check a little while from now. + self.addr_check_timer = event.reactor.schedule(0.25, + AddressCheckerTimeout( + self)) else : # This is a payload message. self.n_received += 1 @@ -2390,8 +2424,8 @@ class ClosestTest ( MessagingHandler ): if (self.count_2_a + self.count_2_b + self.count_3_a + self.count_3_b) > 0 : self.bail ( "error: routers 2 or 3 got messages before router 1 receivers were closed." ) # Make sure both receivers got some messages. - if (self.count_1_a * self.count_1_b) == 0: - self.bail ( "error: one of the receivers on router 1 got no messages." ) + if (self.count_1_a < self.one_third/2 or self.count_1_b < self.one_third/2) or (self.count_1_b != self.count_1_a): + self.bail ( "error: recv_1_a and recv_1_b did not get equal number of messages" ) elif self.n_received == 2 * self.one_third: # The next one-third of messages should have gone exclusively @@ -2402,18 +2436,37 @@ class ClosestTest ( MessagingHandler ): if (self.count_3_a + self.count_3_b) > 0 : self.bail ( "error: router 3 got messages before 2 was closed." ) # Make sure both receivers got some messages. - if (self.count_2_a * self.count_2_b) == 0: - self.bail ( "error: one of the receivers on router 2 got no messages." ) + if (self.count_2_a < self.one_third/2 or self.count_2_b < self.one_third/2) or (self.count_2_b != self.count_2_a): + self.bail ( "error: recv_2_a and recv_2_b did not get equal number of messages" ) # By the time we reach the expected number of messages # we have closed the router_1 and router_2 receivers. If the # router_3 receivers are empty at this point, something is wrong. if self.n_received >= self.n_expected : - if (self.count_3_a * self.count_3_b) == 0: - self.bail ( "error: one of the receivers on router 3 got no messages." ) + if (self.count_3_a < self.one_third/2 or self.count_3_b < self.one_third/2) or (self.count_3_b != self.count_3_a): + self.bail ( "error: recv_3_a and recv_3_b did not get equal number of messages" ) else: self.bail ( None ) + def on_link_closed(self, event): + if event.receiver == self.recv_1_b or event.receiver == self.recv_1_a: + if event.receiver == self.recv_1_a: + self.recv_1_a_closed = True + if event.receiver == self.recv_1_b: + self.recv_1_b_closed = True + if self.recv_1_a_closed and self.recv_1_b_closed: + self.n_sent_1 = 0 + self.send_messages() + + if event.receiver == self.recv_2_a or event.receiver == self.recv_2_b: + if event.receiver == self.recv_2_a: + self.recv_2_a_closed = True + if event.receiver == self.recv_2_b: + self.recv_2_b_closed = True + if self.recv_2_a_closed and self.recv_2_b_closed: + self.n_sent_1 = 0 + self.first_check = False + self.addr_check() def addr_check ( self ): # Send the message that will query the management code to discover --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org