Signed-off-by: Angus Salkeld <[email protected]>
---
 cts/agents/cpg_test_agent.c |   42 +++++---
 cts/corotests.py            |  220 ++++++++++++++++++-------------------------
 2 files changed, 119 insertions(+), 143 deletions(-)

diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
index 9db370d..de47ca8 100644
--- a/cts/agents/cpg_test_agent.c
+++ b/cts/agents/cpg_test_agent.c
@@ -96,36 +96,35 @@ static int32_t my_msgs_to_send;
 static int32_t total_stored_msgs = 0;
 static hdb_handle_t poll_handle;
 
+
+static void send_some_more_messages (void * unused);
+
 static char* err_status_string (char * buf, size_t buf_len, msg_status_t 
status)
 {
        switch (status) {
        case MSG_OK:
                strncpy (buf, "OK", buf_len);
-               return buf;
                break;
        case MSG_NODEID_ERR:
                strncpy (buf, "NODEID_ERR", buf_len);
-               return buf;
                break;
        case MSG_PID_ERR:
                strncpy (buf, "PID_ERR", buf_len);
-               return buf;
                break;
        case MSG_SEQ_ERR:
                strncpy (buf, "SEQ_ERR", buf_len);
-               return buf;
                break;
        case MSG_SIZE_ERR:
                strncpy (buf, "SIZE_ERR", buf_len);
-               return buf;
                break;
        case MSG_SHA1_ERR:
-       default:
                strncpy (buf, "SHA1_ERR", buf_len);
-               return buf;
+               break;
+       default:
+               strncpy (buf, "UNKNOWN_ERR", buf_len);
                break;
        }
-
+       return buf;
 }
 
 static void delivery_callback (
@@ -160,7 +159,7 @@ static void delivery_callback (
                status = MSG_SIZE_ERR;
        }
        sha1_init (&sha1_hash);
-       sha1_process (&sha1_hash, msg_pt->payload, msg_pt->size);
+       sha1_process (&sha1_hash, msg_pt->payload, (msg_pt->size - sizeof 
(msg_t)));
        sha1_done (&sha1_hash, sha1_compare);
        if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
                syslog (LOG_ERR, "%s(); msg seq:%d; incorrect hash",
@@ -285,8 +284,19 @@ static void read_messages (int sock, char* atmost_str)
        send (sock, big_and_buf, strlen (big_and_buf), 0);
 }
 
+static void send_some_more_messages_later (void)
+{
+       poll_timer_handle timer_handle;
+       cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
+       poll_timer_add (
+               poll_handle,
+               100, NULL,
+               send_some_more_messages,
+               &timer_handle);
+}
+
 static unsigned char buffer[200000];
-static void send_some_more_messages (void)
+static void send_some_more_messages (void * unused)
 {
        msg_t my_msg;
        struct iovec iov[2];
@@ -302,7 +312,7 @@ static void send_some_more_messages (void)
 
        send_now = my_msgs_to_send;
 
-       syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
+       //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
        my_msg.pid = my_pid;
        my_msg.nodeid = my_nodeid;
        payload_size = (rand() % 100000);
@@ -325,6 +335,7 @@ static void send_some_more_messages (void)
                res = cpg_flow_control_state_get (cpg_handle, &fc_state);
                if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
                        /* lets do this later */
+                       send_some_more_messages_later ();
                        syslog (LOG_INFO, "%s() flow control enabled.", 
__func__);
                        return;
                }
@@ -332,6 +343,7 @@ static void send_some_more_messages (void)
                res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
                if (res == CS_ERR_TRY_AGAIN) {
                        /* lets do this later */
+                       send_some_more_messages_later ();
                        syslog (LOG_INFO, "%s() cpg_mcast_joined() says try 
again.",
                                __func__);
                        return;
@@ -357,10 +369,10 @@ static void msg_blaster (int sock, char* num_to_send_str)
        /* control the limits */
        if (my_msgs_to_send <= 0)
                my_msgs_to_send = 1;
-       if (my_msgs_to_send > 1000)
-               my_msgs_to_send = 1000;
+       if (my_msgs_to_send > 10000)
+               my_msgs_to_send = 10000;
 
-       send_some_more_messages ();
+       send_some_more_messages (NULL);
 }
 

@@ -541,7 +553,7 @@ static int server_process_data_fn (hdb_handle_t handle,
                exit (0);
        } else {
                if (my_msgs_to_send > 0)
-                       send_some_more_messages ();
+                       send_some_more_messages (NULL);
 
                big_and_buf_rx[nbytes] = '\0';
 
diff --git a/cts/corotests.py b/cts/corotests.py
index d07eb5f..92ea570 100644
--- a/cts/corotests.py
+++ b/cts/corotests.py
@@ -44,19 +44,12 @@ class CoroTest(CTSTest):
         CTSTest.__init__(self,cm)
         self.start = StartTest(cm)
         self.stop = StopTest(cm)
+        self.config = {}
 
     def setup(self, node):
         ret = CTSTest.setup(self, node)
-        self.CM.apply_new_config()
-
-        for n in self.CM.Env["nodes"]:
-            if not self.CM.StataCM(n):
-                self.incr("started")
-                self.start(n)
-        return ret
-
 
-    def setup_sec_key(self, node):
+        # setup the authkey
         localauthkey = '/tmp/authkey'
         if not os.path.exists(localauthkey):
             self.CM.rsh(node, 'corosync-keygen')
@@ -67,6 +60,20 @@ class CoroTest(CTSTest):
                 #copy key onto other nodes
                 self.CM.rsh.cp(localauthkey, "%s:%s" % (n, 
"/etc/corosync/authkey"))
 
+        # copy over any new config
+        for c in self.config:
+            self.CM.new_config[c] = self.config[c]
+
+        # apply the config
+        self.CM.apply_new_config()
+
+        # start any killed corosync's
+        for n in self.CM.Env["nodes"]:
+            if not self.CM.StataCM(n):
+                self.incr("started")
+                self.start(n)
+        return ret
+
 
     def teardown(self, node):
         self.CM.apply_default_config()
@@ -190,42 +197,12 @@ class CpgCfgChgOnExecCrash(CpgConfigChangeBase):
 

 ###################################################################
-class CpgCfgChgOnNodeLeave_v2(CpgConfigChangeBase):
+class CpgCfgChgOnNodeIsolate(CpgConfigChangeBase):
 
     def __init__(self, cm):
         CpgConfigChangeBase.__init__(self,cm)
-        self.name="CpgCfgChgOnNodeLeave_v2"
+        self.name="CpgCfgChgOnNodeIsolate"
        
-    def setup(self, node):
-        self.CM.new_config['compatibility'] = 'none'
-        self.CM.new_config['totem/token'] = 10000
-        return CpgConfigChangeBase.setup(self, node)
-
-    def failure_action(self):
-        self.CM.log("isolating node " + self.wobbly)
-        self.CM.isolate_node(self.wobbly)
-
-    def __call__(self, node):
-        self.incr("calls")
-        self.failure_action()
-        return self.wait_for_config_change()
-
-    def teardown(self, node):
-        self.CM.unisolate_node (self.wobbly)
-        return CpgConfigChangeBase.teardown(self, node)
-
-###################################################################
-class CpgCfgChgOnNodeLeave_v1(CpgConfigChangeBase):
-
-    def __init__(self, cm):
-        CpgConfigChangeBase.__init__(self,cm)
-        self.name="CpgCfgChgOnNodeLeave_v1"
-
-    def setup(self, node):
-        self.CM.new_config['compatibility'] = 'whitetank'
-        self.CM.new_config['totem/token'] = 10000
-        return CpgConfigChangeBase.setup(self, node)
-        
     def failure_action(self):
         self.CM.log("isolating node " + self.wobbly)
         self.CM.isolate_node(self.wobbly)
@@ -268,15 +245,13 @@ class CpgMsgOrderBase(CoroTest):
 
         for n in self.CM.Env["nodes"]:
             msgs[n] = []
-            got = False
             stopped = False
-            self.CM.debug( " getting messages from " + n )
+            waited = 0
 
-            while len(msgs[n]) < self.total_num_msgs and not stopped:
+            while len(msgs[n]) < self.total_num_msgs and waited < 60:
 
                 msg = self.CM.agent[n].read_messages(25)
                 if not msg == None:
-                    got = True
                     msgl = msg.split(";")
 
                     # remove empty entries
@@ -288,106 +263,52 @@ class CpgMsgOrderBase(CoroTest):
                             not_done = False
 
                     msgs[n].extend(msgl)
-                elif msg == None and got:
-                    self.CM.debug(" done getting messages from " + n)
-                    stopped = True
-
-                if not got:
+                elif msg == None:
                     time.sleep(1)
+                    waited = waited + 1
+
+            if len(msgs[n]) < self.total_num_msgs:
+                return self.failure("expected %d messages from %s got %d" % 
(self.total_num_msgs, n, len(msgs[n])))
 
         fail = False
+        error_message = ''
         for i in range(0, self.total_num_msgs):
             first = None
             for n in self.CM.Env["nodes"]:
+                # first test for errors
+                params = msgs[n][i].split(":")
+                if not 'OK' in params[3]:
+                    fail = True
+                    error_message = 'error: ' + params[3] + ' in received 
message'
+                    self.CM.log(str(params))
+
+                # then look for out of order messages
                 if first == None:
                     first = n
                 else:
                     if not msgs[first][i] == msgs[n][i]:
                         # message order not the same!
                         fail = True
+                        error_message = 'message out of order'
                         self.CM.log(msgs[first][i] + " != " + msgs[n][i])
                 
         if fail:
-            return self.failure()
+            return self.failure(error_message)
         else:
             return self.success()
 
 ###################################################################
 class CpgMsgOrderBasic(CpgMsgOrderBase):
     '''
-    each sends & logs 100 messages
+    each sends & logs 1000 messages
     '''
     def __init__(self, cm):
         CpgMsgOrderBase.__init__(self,cm)
         self.name="CpgMsgOrderBasic"
+        self.num_msgs_per_node = 9000
 
     def __call__(self, node):
         self.incr("calls")
-
-        self.num_msgs_per_node = 100
-        self.cpg_msg_blaster()
-        return self.wait_and_validate_order()
-
-class CpgMsgOrderThreads(CpgMsgOrderBase):
-    '''
-    each sends & logs 100 messages
-    '''
-    def __init__(self, cm):
-        CpgMsgOrderBase.__init__(self,cm)
-        self.name="CpgMsgOrderThreads"
-
-    def setup(self, node):
-        self.CM.new_config['totem/threads'] = 4
-        return CpgMsgOrderBase.setup(self, node)
-
-    def __call__(self, node):
-        self.incr("calls")
-
-        self.num_msgs_per_node = 100
-        self.cpg_msg_blaster()
-        return self.wait_and_validate_order()
-
-
-class CpgMsgOrderSecNss(CpgMsgOrderBase):
-    '''
-    each sends & logs 100 messages
-    '''
-    def __init__(self, cm):
-        CpgMsgOrderBase.__init__(self,cm)
-        self.name="CpgMsgOrderSecNss"
-
-    def setup(self, node):
-        self.setup_sec_key(node)
-        self.CM.new_config['totem/secauth'] = 'on'
-        self.CM.new_config['totem/crypto_accept'] = 'new'
-        self.CM.new_config['totem/crypto_type'] = 'nss'
-        return CpgMsgOrderBase.setup(self, node)
-
-    def __call__(self, node):
-        self.incr("calls")
-
-        self.num_msgs_per_node = 100
-        self.cpg_msg_blaster()
-        return self.wait_and_validate_order()
-
-class CpgMsgOrderSecSober(CpgMsgOrderBase):
-    '''
-    each sends & logs 100 messages
-    '''
-    def __init__(self, cm):
-        CpgMsgOrderBase.__init__(self,cm)
-        self.name="CpgMsgOrderSecSober"
-
-    def setup(self, node):
-        self.setup_sec_key(node)
-        self.CM.new_config['totem/secauth'] = 'on'
-        self.CM.new_config['totem/crypto_type'] = 'sober'
-        return CpgMsgOrderBase.setup(self, node)
-
-    def __call__(self, node):
-        self.incr("calls")
-
-        self.num_msgs_per_node = 100
         self.cpg_msg_blaster()
         return self.wait_and_validate_order()
 
@@ -506,22 +427,17 @@ class ServiceLoadTest(CoroTest):
 
         return self.success()
 
-
+GenTestClasses = []
+GenTestClasses.append(CpgMsgOrderBasic)
+GenTestClasses.append(CpgCfgChgOnExecCrash)
+GenTestClasses.append(CpgCfgChgOnGroupLeave)
+GenTestClasses.append(CpgCfgChgOnNodeLeave)
+GenTestClasses.append(CpgCfgChgOnNodeIsolate)
 
 AllTestClasses = []
 AllTestClasses.append(ServiceLoadTest)
-AllTestClasses.append(CpgMsgOrderBasic)
-AllTestClasses.append(CpgMsgOrderThreads)
-AllTestClasses.append(CpgMsgOrderSecNss)
-AllTestClasses.append(CpgMsgOrderSecSober)
 AllTestClasses.append(MemLeakObject)
 AllTestClasses.append(MemLeakSession)
-AllTestClasses.append(CpgCfgChgOnExecCrash)
-AllTestClasses.append(CpgCfgChgOnGroupLeave)
-AllTestClasses.append(CpgCfgChgOnNodeLeave)
-AllTestClasses.append(CpgCfgChgOnNodeLeave_v1)
-AllTestClasses.append(CpgCfgChgOnNodeLeave_v2)
-
 AllTestClasses.append(FlipTest)
 AllTestClasses.append(RestartTest)
 AllTestClasses.append(StartOnebyOne)
@@ -534,10 +450,58 @@ AllTestClasses.append(RestartOnebyOne)
 
 def CoroTestList(cm, audits):
     result = []
+    configs = []
+    empty = {}
+    configs.append(empty)
+
+    a = {}
+    a['compatibility'] = 'none'
+    a['totem/token'] = 10000
+    configs.append(a)
+
+    b = {}
+    b['compatibility'] = 'whitetank'
+    b['totem/token'] = 10000
+    configs.append(b)
+
+    c = {}
+    c['totem/secauth'] = 'on'
+    c['totem/crypto_accept'] = 'new'
+    c['totem/crypto_type'] = 'nss'
+    configs.append(c)
+
+    d = {}
+    d['totem/secauth'] = 'on'
+    d['totem/crypto_type'] = 'sober'
+    configs.append(d)
+
+    e = {}
+    e['totem/threads'] = 4
+    configs.append(e)
+
+    #quorum/provider=
+    f = {}
+    f['quorum/provider'] = 'corosync_quorum_ykd'
+    configs.append(f)
+
+    num=1
+    for cfg in configs:
+        for testclass in GenTestClasses:
+            bound_test = testclass(cm)
+            if bound_test.is_applicable():
+                bound_test.Audits = audits
+                bound_test.config = cfg
+                bound_test.name = bound_test.name + '_' + str(num)
+                result.append(bound_test)
+        num = num + 1
+
+
+
     for testclass in AllTestClasses:
         bound_test = testclass(cm)
         if bound_test.is_applicable():
             bound_test.Audits = audits
             result.append(bound_test)
+
     return result
 
-- 
1.6.6.1


_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to