Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 22400df1f -> f256f675e


DISPATCH-209 : test dispositions over changing topology


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f256f675
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f256f675
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f256f675

Branch: refs/heads/master
Commit: f256f675e63f17f3ade949033c14b93d4d6a146c
Parents: 22400df
Author: mgoulish <mgoul...@redhat.com>
Authored: Wed Feb 14 11:19:05 2018 -0500
Committer: mgoulish <mgoul...@redhat.com>
Committed: Wed Feb 14 11:19:05 2018 -0500

----------------------------------------------------------------------
 tests/CMakeLists.txt                       |   1 +
 tests/system_tests_topology_disposition.py | 817 ++++++++++++++++++++++++
 2 files changed, 818 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f256f675/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 0105c29..7312ba7 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -103,6 +103,7 @@ foreach(py_test_module
     system_tests_authz_service_plugin
     system_tests_delivery_abort
     system_tests_topology
+    system_tests_topology_disposition
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f256f675/tests/system_tests_topology_disposition.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_topology_disposition.py 
b/tests/system_tests_topology_disposition.py
new file mode 100644
index 0000000..03812ea
--- /dev/null
+++ b/tests/system_tests_topology_disposition.py
@@ -0,0 +1,817 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess      import PIPE, STDOUT
+from proton          import Message, PENDING, ACCEPTED, REJECTED, RELEASED, 
SSLDomain, SSLUnavailable, Timeout
+from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, 
Process
+from proton.handlers import MessagingHandler
+from proton.reactor  import Container, AtMostOnce, AtLeastOnce, 
DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
+from proton.utils    import BlockingConnection
+from qpid_dispatch.management.client import Node
+
+import time
+import datetime
+import pdb
+import inspect
+
+
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+
+
+#------------------------------------------------
+# Helper classes for all tests.
+#------------------------------------------------
+
+class Timeout(object):
+    """
+    Named timeout object can handle multiple simultaneous
+    timers, by telling the parent which one fired.
+    """
+    def __init__ ( self, parent, name ):
+        self.parent = parent
+        self.name   = name
+
+    def on_timer_task ( self, event ):
+        self.parent.timeout ( self.name )
+
+
+
+class ManagementMessageHelper ( object ) :
+    """
+    Format management messages.
+    """
+    def __init__ ( self, reply_addr ) :
+        self.reply_addr = reply_addr
+
+    def make_connector_query ( self, connector_name ) :
+        props = {'operation': 'READ', 'type': 
'org.apache.qpid.dispatch.connector', 'name' : connector_name }
+        msg = Message ( properties=props, reply_to=self.reply_addr )
+        return msg
+
+    def make_connector_delete_command ( self, connector_name ) :
+        props = {'operation': 'DELETE', 'type': 
'org.apache.qpid.dispatch.connector', 'name' : connector_name }
+        msg = Message ( properties=props, reply_to=self.reply_addr )
+        return msg
+
+    def make_router_link_query ( self ) :
+        props = { 'count':      '100', 
+                  'operation':  'QUERY', 
+                  'entityType': 'org.apache.qpid.dispatch.router.link', 
+                  'name':       'self', 
+                  'type':       'org.amqp.management' 
+                }
+        attrs = []
+        attrs.append ( unicode('linkType') )
+        attrs.append ( unicode('linkDir') )
+        attrs.append ( unicode('linkName') )
+        attrs.append ( unicode('owningAddr') )
+        attrs.append ( unicode('capacity') )
+        attrs.append ( unicode('undeliveredCount') )
+        attrs.append ( unicode('unsettledCount') )
+        attrs.append ( unicode('acceptedCount') )
+        attrs.append ( unicode('rejectedCount') )
+        attrs.append ( unicode('releasedCount') )
+        attrs.append ( unicode('modifiedCount') )
+
+        msg_body = { }
+        msg_body [ 'attributeNames' ] = attrs
+        return Message ( body=msg_body, properties=props, 
reply_to=self.reply_addr )
+
+
+#------------------------------------------------
+# END Helper classes for all tests.
+#------------------------------------------------
+
+
+
+
+
+#================================================================
+#     Setup
+#================================================================
+
+class TopologyDispositionTests ( TestCase ):
+
+    @classmethod
+    def setUpClass(cls):
+        super(TopologyDispositionTests, cls).setUpClass()
+
+
+
+        def router(name, more_config):
+
+            config = [ ('router',  {'mode': 'interior', 'id': name}),
+                       ('address', {'prefix': 'closest',   'distribution': 
'closest'}),
+                       ('address', {'prefix': 'balanced',  'distribution': 
'balanced'}),
+                       ('address', {'prefix': 'multicast', 'distribution': 
'multicast'})
+                     ]    \
+                     + more_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        A_client_port = cls.tester.get_port()
+        B_client_port = cls.tester.get_port()
+        C_client_port = cls.tester.get_port()
+        D_client_port = cls.tester.get_port()
+
+        A_inter_router_port = cls.tester.get_port()
+        B_inter_router_port = cls.tester.get_port()
+        C_inter_router_port = cls.tester.get_port()
+
+        #
+        #
+        #  Topology of the 4-mesh, with costs of connections marked.
+        #  Tail of arrow indicates initiator of connection.
+        #  (The diagonal connections do not look very much like arrows, I 
fear...)
+        #
+        #                1
+        #         D ----------> A
+        #         | \         > ^
+        #         | 20\   50/   |
+        #         |     \ /     |
+        #      1  |     / \     | 100
+        #         |   /     \   |
+        #         v /         > |
+        #         C ----------> B
+        #                1
+        #
+
+        cls.A_B_cost =  100
+        cls.A_C_cost =   50
+        cls.A_D_cost =    1
+        cls.B_C_cost =    1
+        cls.B_D_cost =   20
+        cls.C_D_cost =    1
+
+        client_link_capacity       = 1000
+        inter_router_link_capacity = 1000
+
+        router ( 'A',
+                 [
+                    ( 'listener',
+                      { 'port': A_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : client_link_capacity
+                      }
+                    ),
+                    ( 'listener',
+                      { 'role': 'inter-router',
+                        'port': A_inter_router_port,
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : inter_router_link_capacity
+                      }
+                    )
+                 ]
+               )
+
+
+        router ( 'B',
+                 [
+                    ( 'listener',
+                      { 'port': B_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : client_link_capacity
+                      }
+                    ),
+                    ( 'listener',
+                      { 'role': 'inter-router',
+                        'port': B_inter_router_port,
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : inter_router_link_capacity
+                      }
+                    ),
+                    # The names on the connectors are what allows me to kill 
them later.
+                    ( 'connector',
+                      {  'name': 'AB_connector',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost':  cls.A_B_cost,
+                         'stripAnnotations': 'no',
+                         'linkCapacity' : inter_router_link_capacity
+                      }
+                    )
+                 ]
+               )
+
+
+        router ( 'C',
+                 [
+                    ( 'listener',
+                      { 'port': C_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : client_link_capacity
+                      }
+                    ),
+                    ( 'listener',
+                      { 'role': 'inter-router',
+                        'port': C_inter_router_port,
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : inter_router_link_capacity
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'AC_connector',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.A_C_cost,
+                         'stripAnnotations': 'no',
+                         'linkCapacity' : inter_router_link_capacity
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'BC_connector',
+                         'role': 'inter-router',
+                         'port': B_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.B_C_cost,
+                         'stripAnnotations': 'no',
+                         'linkCapacity' : inter_router_link_capacity
+                      }
+                    )
+                 ]
+               )
+
+
+        router ( 'D',
+                 [
+                    ( 'listener',
+                      { 'port': D_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no',
+                        'linkCapacity' : client_link_capacity
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'AD_connector',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.A_D_cost,
+                         'stripAnnotations': 'no',
+                         'linkCapacity' : inter_router_link_capacity
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'BD_connector',
+                         'role': 'inter-router',
+                         'port': B_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.B_D_cost,
+                         'stripAnnotations': 'no',
+                         'linkCapacity' : inter_router_link_capacity
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'CD_connector',
+                         'role': 'inter-router',
+                         'port': C_inter_router_port,
+                         'verifyHostName': 'no',
+                         'cost' : cls.C_D_cost,
+                         'stripAnnotations': 'no',
+                         'linkCapacity' : inter_router_link_capacity
+                      }
+                    )
+                 ]
+               )
+
+
+        router_A = cls.routers[0]
+        router_B = cls.routers[1]
+        router_C = cls.routers[2]
+        router_D = cls.routers[3]
+
+        router_A.wait_router_connected('B')
+        router_A.wait_router_connected('C')
+        router_A.wait_router_connected('D')
+
+        cls.client_addrs = ( router_A.addresses[0],
+                             router_B.addresses[0],
+                             router_C.addresses[0],
+                             router_D.addresses[0]
+                           )
+
+        # 1 means skip that test.
+        cls.skip = { 'test_01' : 0
+                   }
+
+
+
+    def test_01_topology_disposition ( self ):
+        name = 'test_01'
+        if self.skip [ name ] :
+            self.skipTest ( "Test skipped during development." )
+        test = TopologyDisposition ( name,
+                                     self.client_addrs,
+                                     "closest/01"
+                                   )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+#================================================================
+#     Tests
+#================================================================
+
+
+
+class TopologyDisposition ( MessagingHandler ):
+    """
+    """
+
+    #  TopologyDisposition Notes
+    #  ========================================
+    #
+    #     1. What is the goal of this test?
+    #     ------------------------------------------
+    #       The point of this test is to make sure that, in spite of 
+    #       serious disruption to a complex router network topology,
+    #       the sender always knows the dispositions of its messages.
+    #       By the end of the test, it should know that all sent 
+    #       messages were either received or released. 
+    #       ( Note that some messages may be "modified", but the reactor
+    #       interface that this test uses issues on_released events for 
+    #       modified messages, same as released, so I am lumping them 
+    #       together.
+    #
+    #
+    #     2. Routes and Connector Kills
+    #     ------------------------------------------
+    #       Messages are always sent from A, and received at B.
+    #       Routes are contrtolled by assigning different costs to the 
+    #       various links, and by then killing 3 connectors one at a timee,
+    #       at different points in the test.
+    #         First route ahould be ADCB.
+    #         Then we kill connector  CD.
+    #         Next route should be   ADB.
+    #         Then we kill connector  BD.
+    #         Next route should be   ACB.
+    #         Then we kill connector  BC.
+    #         Final route should be   AB.
+    #
+    #
+    #     3. Two Timers
+    #     ------------------------------------------
+    #     Sending is done in batches, using a timer. The timer expires
+    #     once every 0.5 seconds, and we send a small batch of 10 messages 
+    #     (or as many as the sender has credit for). 
+    #     There is also a deadline timer that terminates the test with 
+    #     failure if it ever fires.
+    #     
+    #
+    #     4. The Simple State Machine
+    #     ------------------------------------------
+    #     I want behavior that is a little more complex than what would
+    #     be possible by simply reacting to the callback functions, so I
+    #     impose on top of them a simple state machine. The states proceed
+    #     in a simple linear sequence, and some of the callback functions 
+    #     consult the current state befoe deciding what they should do.
+    #     And bump the state machine to its next state when appropriate.
+    #     
+    #                state            purpose
+    #                ----------------------------------------
+    #
+    #                starting         placeholder
+    #
+    #                topo checking    make sure that the 4 routers are
+    #                                 completely connected, as expected.
+    #
+    #                link checking    visual inspection of various link data
+    #                                 during debugging.
+    #         
+    #                sending          send the messages, in 70 batches of 10,
+    #                                 spaced 0.5 seconds apart.
+    #                                     
+    #                done sending     quite sending messages and wait for 
either
+    #                                 the sum of ACCEPTED + RELEASED to add up 
to 
+    #                                 SENT, causing the test to succeed, or 
the 
+    #                                 test timer to expire, causing the test 
to fail.
+    #
+    #                bailing          enter this state when we are in the 
process of
+    #                                 exiting. All callbacks should take no 
action if
+    #                                 the test has entered this state.
+    #
+    #
+    #     5. Sending in bursts.
+    #     ----------------------------------
+    #     When the send-timer goes off, I send a burst of messages ( 
self.send_burst_size ).
+    #     There is no especially great reason for this, except that I liked 
the idea of a
+    #     send timer because it seemed more realistic to me -- more like a 
real application -- 
+    #     and that implies sending bursts of messages.
+
+
+    def __init__ ( self, test_name, client_addrs, destination ):
+        super(TopologyDisposition, self).__init__(prefetch=10)
+        self.dest                 = destination
+        self.error                = None
+        self.sender               = None
+        self.receiver             = None
+        self.test_timer           = None
+        self.send_timer           = None
+        self.n_sent               = 0
+        self.n_accepted           = 0
+        self.n_received           = 0
+        self.n_released           = 0
+        self.reactor              = None
+        self.state                = None
+        self.send_conn            = None
+        self.recv_conn            = None
+        self.debug                = False
+        self.client_addrs         = client_addrs
+        self.timeout_count        = 0
+        self.confirmed_kills      = 0
+        self.send_interval        = 0.5
+        self.to_be_sent           = 700
+        self.deadline             = 100
+        self.message_status       = dict()
+        self.message_times        = dict()
+        self.most_recent_kill     = 0
+        self.first_trouble        = 0
+        self.flow                 = 100
+        self.max_trouble_duration = 20
+        self.link_check_count     = 0
+        self.send_burst_size      = 10
+
+        # Holds the management sender, receiver, and 'helper'
+        # associated with each router.
+        self.routers = {
+                         'A' : dict(),
+                         'B' : dict(),
+                         'C' : dict(),
+                         'D' : dict()
+                       }
+
+        # This tells the system in what order to kill the connectors.
+        self.kill_count = 0
+        self.kill_list = (
+                           ( 'D', 'CD_connector' ),
+                           ( 'D', 'BD_connector' ),
+                           ( 'C', 'BC_connector' )
+                         )
+
+        # We use this to keep track of which connectors we have found
+        # when the test is first getting started and we are checking
+        # the topology.
+        self.connectors_map = { 'AB_connector' : 0,
+                                'AC_connector' : 0,
+                                'AD_connector' : 0,
+                                'BC_connector' : 0,
+                                'BD_connector' : 0,
+                                'CD_connector' : 0
+                              }
+
+
+    def state_transition ( self, message, new_state ) :
+        if self.state == new_state :
+            return
+        self.state = new_state
+        self.debug_print ( "state transition to : %s -- because %s" % ( 
self.state, message ) )
+
+
+    def debug_print ( self, text ) :
+        if self.debug == True:
+            print "%.6lf %s" % ( time.time(), text )
+
+
+    # Shut down everything and exit.
+    def bail ( self, text ):
+        self.state = 'bailing'
+        self.test_timer.cancel ( )
+        self.send_timer.cancel ( )
+
+        self.error = text
+
+        self.send_conn.close ( )
+        self.recv_conn.close ( )
+
+        self.routers['A'] ['mgmt_conn'].close()
+        self.routers['B'] ['mgmt_conn'].close()
+        self.routers['C'] ['mgmt_conn'].close()
+        self.routers['D'] ['mgmt_conn'].close()
+
+
+    # Two separate timers. One controls sending in bursts, one ends the test.
+    def timeout ( self, name ):
+        if self.state == 'bailing' :
+            return
+
+        self.timeout_count += 1
+        if name == 'test':
+            self.state_transition ( 'Timeout Expired', 'bailing' )
+            self.bail ( "Timeout Expired: n_sent=%d n_released=%d 
n_accepted=%d" % \
+                         ( self.n_sent, self.n_released, self.n_accepted ) )
+            return
+        elif name == 'sender':
+            if self.state == 'sending' :
+                if not (self.timeout_count % 20):
+                    if self.kill_count < len(self.kill_list):
+                        self.kill_a_connector ( 
self.kill_list[self.kill_count] )
+                        self.kill_count += 1
+                self.send ( )
+                if self.n_sent >= self.to_be_sent :
+                    self.state_transition ( 'sent %d messages' % 
self.to_be_sent, 'done sending' )
+            elif self.state == 'done sending' :
+                if self.n_sent == self.n_accepted + self.n_released :
+                    self.state_transition ( 'success', 'bailing' )
+                    self.bail ( None )
+
+            self.debug_print ( "sent: %d  received: %d accepted: %d   
released: %d  confirmed kills: %d" % \
+                ( self.n_sent, self.n_received, self.n_accepted, 
self.n_released, self.confirmed_kills ) )
+
+            diff = self.n_sent - (self.n_accepted + self.n_released) 
+
+            # If the difference between n_sent and (accepted + released) is 
+            # ever greater than 10 (the send batch size) 
+            if diff >= self.send_burst_size and self.state == 'done sending' : 
+                self.debug_print ( "TROUBLE : %d" % diff )
+
+                if self.first_trouble == 0:
+                    self.first_trouble = time.time()
+                    self.debug_print ( "first trouble at %.6lf" % 
self.first_trouble )
+                else:
+                    trouble_duration = time.time() - self.first_trouble
+                    self.debug_print ( "trouble duration %.6lf" % 
trouble_duration )
+                    if trouble_duration >= self.max_trouble_duration : 
+                        self.state_transition ( 'trouble duration exceeded 
limit: %d' % self.max_trouble_duration, 'post mortem' )
+                        self.check_links ( )
+
+            self.send_timer = self.reactor.schedule ( self.send_interval, 
Timeout(self, "sender") )
+                    
+
+
+    def on_start ( self, event ):
+        self.state_transition ( 'on_start', 'starting' )
+        self.reactor = event.reactor
+        self.test_timer = event.reactor.schedule ( self.deadline, 
Timeout(self, "test") )
+        self.send_timer = event.reactor.schedule ( self.send_interval, 
Timeout(self, "sender") )
+        self.send_conn  = event.container.connect ( self.client_addrs[0] ) # A
+        self.recv_conn  = event.container.connect ( self.client_addrs[1] ) # B
+
+        self.sender     = event.container.create_sender   ( self.send_conn, 
self.dest )
+        self.receiver   = event.container.create_receiver ( self.recv_conn, 
self.dest )
+
+        self.routers['A'] ['mgmt_conn'] = event.container.connect ( 
self.client_addrs[0] )
+        self.routers['B'] ['mgmt_conn'] = event.container.connect ( 
self.client_addrs[1] )
+        self.routers['C'] ['mgmt_conn'] = event.container.connect ( 
self.client_addrs[2] )
+        self.routers['D'] ['mgmt_conn'] = event.container.connect ( 
self.client_addrs[3] )
+
+        self.routers['A'] ['mgmt_receiver'] = event.container.create_receiver 
( self.routers['A'] ['mgmt_conn'], dynamic=True )
+        self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver 
( self.routers['B'] ['mgmt_conn'], dynamic=True )
+        self.routers['C'] ['mgmt_receiver'] = event.container.create_receiver 
( self.routers['C'] ['mgmt_conn'], dynamic=True )
+        self.routers['D'] ['mgmt_receiver'] = event.container.create_receiver 
( self.routers['D'] ['mgmt_conn'], dynamic=True )
+
+        self.routers['A'] ['mgmt_sender']   = event.container.create_sender   
( self.routers['A'] ['mgmt_conn'], "$management" )
+        self.routers['B'] ['mgmt_sender']   = event.container.create_sender   
( self.routers['B'] ['mgmt_conn'], "$management" )
+        self.routers['C'] ['mgmt_sender']   = event.container.create_sender   
( self.routers['C'] ['mgmt_conn'], "$management" )
+        self.routers['D'] ['mgmt_sender']   = event.container.create_sender   
( self.routers['D'] ['mgmt_conn'], "$management" )
+
+
+
+    #-----------------------------------------------------------------
+    # At start-time, as the management links to the routers open, 
+    # check each one to make sure that it has all the expected
+    # connections.
+    #-----------------------------------------------------------------
+    def on_link_opened ( self, event ) :
+        self.state_transition ( 'on_link_opened', 'topo checking' )
+        # The A mgmt link has opened.  --------------------------
+        # Give it some credit, but we don't need to use this one until
+        # later, if there is a problem.
+        if event.receiver == self.routers['A'] ['mgmt_receiver'] :
+            event.receiver.flow ( self.flow )
+            self.routers['A'] ['mgmt_helper'] = ManagementMessageHelper ( 
event.receiver.remote_source.address )
+        # The B mgmt link has opened. Check its connections. 
--------------------------
+        elif event.receiver == self.routers['B'] ['mgmt_receiver'] :
+            event.receiver.flow ( self.flow )
+            self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( 
event.receiver.remote_source.address )
+            for connector in [ 'AB_connector' ] :
+                self.connector_check ( 'B', connector )
+        # The C mgmt link has opened. Check its connections. 
--------------------------
+        elif event.receiver == self.routers['C'] ['mgmt_receiver'] :
+            event.receiver.flow ( self.flow )
+            self.routers['C'] ['mgmt_helper'] = ManagementMessageHelper ( 
event.receiver.remote_source.address )
+            for connector in [ 'AC_connector', 'BC_connector' ] :
+                self.connector_check ( 'C', connector )
+        # The D mgmt link has opened. Check its connections. 
--------------------------
+        elif event.receiver == self.routers['D'] ['mgmt_receiver']:
+            event.receiver.flow ( self.flow )
+            self.routers['D'] ['mgmt_helper'] = ManagementMessageHelper ( 
event.receiver.remote_source.address )
+            for connector in [ 'AD_connector', 'BD_connector', 'CD_connector' 
] :
+                self.connector_check ( 'D', connector )
+
+
+    def send ( self ):
+        if self.state != 'sending' :
+            self.debug_print ( "send called while state is %s" % self.state )
+            return
+
+        for _ in xrange ( self.send_burst_size ) :
+            if self.sender.credit > 0 :
+                msg = Message ( body=self.n_sent )
+                msg_tag=str(self.n_sent)
+                dlv = self.sender.send ( msg, tag = msg_tag )
+                if dlv == None :
+                    self.debug_print ( "send failed" )
+                    return
+                self.message_status [ msg_tag ] = 'sent'
+                self.message_times  [ msg_tag ] = time.time()
+                self.n_sent += 1
+        self.debug_print ( "send: n_sent %d credit is now: %d" % ( 
self.n_sent, self.sender.credit ) )
+
+
+    def on_message ( self, event ):
+        #----------------------------------------------------------------
+        # Is this a management message?
+        #----------------------------------------------------------------
+        if event.receiver == self.routers['A'] ['mgmt_receiver'] or \
+           event.receiver == self.routers['B'] ['mgmt_receiver'] or \
+           event.receiver == self.routers['C'] ['mgmt_receiver'] or \
+           event.receiver == self.routers['D'] ['mgmt_receiver'] :
+
+            if self.state == 'topo checking' :
+                # In the 'topo checking' state, we send management messages to 
+                # ask the 4 routers about their connections. Then, parsing the
+                # replies, we make sure that we count the expected 6 
connections.
+                # (The 4 routers are completely connected.)
+                if 'OK' == event.message.properties['statusDescription']:
+                    connection_name = event.message.body['name']
+
+                    if connection_name in self.connectors_map :
+                        self.connectors_map [ connection_name ] = 1
+                        self.debug_print ( "topo check found connector %s" % 
connection_name )
+                    else :
+                      self.bail ( "bad connection name: %s" % connection_name )
+
+                    n_connections = sum(self.connectors_map.values())
+                    if n_connections == 6 :
+                      self.state_transition ( 'topo check successful', 'link 
checking' )
+                      self.check_links ( )
+
+            elif self.state == 'link checking' or self.state == 'post mortem' :
+                # Link checking was used during initial debugging of this test,
+                # to visually check on the number of undelivered and unsettled 
+                # messages in each link, especially during the "post mortem" 
+                # state triggered by a failure.
+                if   event.receiver == self.routers['A'] ['mgmt_receiver'] :
+                    self.debug_print ( "received link check message from A 
------------" )
+                elif event.receiver == self.routers['B'] ['mgmt_receiver'] :
+                    self.debug_print ( "received link check message from B 
------------" )
+                elif event.receiver == self.routers['C'] ['mgmt_receiver'] :
+                    self.debug_print ( "received link check message from C 
------------" )
+                elif event.receiver == self.routers['D'] ['mgmt_receiver'] :
+                    self.debug_print ( "received link check message from D 
------------" )
+                body = event.message.body
+                self.debug_print ( "body: %s" % body )
+                self.debug_print ( "properties: %s" % event.message.properties 
)
+
+                self.link_check_count -= 1
+                if self.link_check_count == 0 :
+                    if self.state == 'link checking' :
+                        self.state_transition ( 'link check successful', 
'sending' )
+                        self.send()
+                    elif self.state == 'post mortem' :
+                        self.state_transition ( "post mortem complete", 
'bailing' )
+                        self.bail ( "failed" )
+            elif self.state == 'sending' :
+                if 'No Content' ==  
event.message.properties['statusDescription']:
+                    self.confirmed_kills += 1
+
+        else :
+            if event.receiver == self.receiver :
+                self.n_received += 1
+
+
+
+    def on_accepted ( self, event ):
+        if event.sender == self.sender:
+            self.n_accepted += 1
+            tag = event.delivery.tag
+            self.message_status [ tag ] = 'accepted'
+
+
+    def on_released ( self, event ) :
+
+        if event.sender == self.sender:
+            self.n_released += 1
+            tag = event.delivery.tag
+            self.message_status [ tag ] = 'released'
+
+
+    def connector_check ( self, router, connector ) :
+        self.debug_print ( "checking connector %s for router %s" % (connector, 
router) )
+        mgmt_helper = self.routers[router] ['mgmt_helper']
+        mgmt_sender = self.routers[router] ['mgmt_sender']
+        msg = mgmt_helper.make_connector_query ( connector )
+        mgmt_sender.send ( msg )
+
+
+    def check_links ( self ) :
+        self.link_check_count = 4
+        self.link_check ( 'A' )
+        self.link_check ( 'B' )
+        self.link_check ( 'C' )
+        self.link_check ( 'D' )
+
+
+    def link_check ( self, router_name ) :
+        mgmt_helper = self.routers[router_name] ['mgmt_helper']
+        mgmt_sender = self.routers[router_name] ['mgmt_sender']
+        msg = mgmt_helper.make_router_link_query ( )
+        mgmt_sender.send ( msg )
+
+
+    # The target structure provides the name of the router and the name of its 
connector 
+    # that is to be killed. Create the appropriate management message, and 
send it off.
+    def kill_a_connector ( self, target ) :
+        router = target[0]
+        connector = target[1]
+        mgmt_helper = self.routers[router] ['mgmt_helper']
+        mgmt_sender = self.routers[router] ['mgmt_sender']
+        msg = mgmt_helper.make_connector_delete_command ( connector )
+        self.debug_print ( "!!!!!\nkilling connector %s on router %s \n!!!!!" 
% (connector, router) )
+        mgmt_sender.send ( msg )
+        self.most_recent_kill = time.time()
+
+
+    # Used during debugging.
+    def print_message_status ( self ) :
+        for i in range ( self.n_sent ) :
+            tag = str ( i )
+            print tag, self.message_status [ tag ]
+
+
+    # Used during debugging.
+    def print_unknown_messages ( self ) :
+        count = 0
+        print "Messages with unknown status: "
+        for i in range ( self.n_sent ) :
+            tag = str ( i )
+            if self.message_status [ tag ] == 'sent' :
+              count = count + 1
+              print '    ', tag, 'sent:', self.message_times [ tag ]
+        print "    total: ", count
+
+
+    # Used during debugging.
+    def quick_print_unknown_messages ( self ) :
+        count = 0
+        print "Messages with unknown status: "
+
+        first = -1
+        last  =  0
+
+        for i in range ( self.n_sent ) :
+            tag = str ( i )
+            if self.message_status [ tag ] == 'sent' : # It's not accepted or 
released.
+              count = count + 1
+              if first == -1 :
+                first = i
+              if i > last :
+                last = i
+
+        print '    first : ', first, 'sent : %.6lf' % self.message_times [ 
str(first) ]
+        print '    last  : ', last,  'sent : %.6lf' % self.message_times [ 
str(last)  ]
+        print "    total : ", count
+
+
+    def run(self):
+        Container(self).run()
+
+
+
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to