Bug:
sometimes shutdown can take a long time if all nodes
are shutdown together.

Signed-off-by: Angus Salkeld <[email protected]>
---
 cts/agents/Makefile.am      |    2 +-
 cts/agents/cpg_test_agent.c |  147 ++++++++++++++++++++++++++++++++++++-------
 cts/corotests.py            |   79 +++++++++++++++++++++---
 3 files changed, 197 insertions(+), 31 deletions(-)

diff --git a/cts/agents/Makefile.am b/cts/agents/Makefile.am
index 8a3278a..3e1f419 100644
--- a/cts/agents/Makefile.am
+++ b/cts/agents/Makefile.am
@@ -58,7 +58,7 @@ endif
 noinst_HEADERS          = common_test_agent.h
 
 cpg_test_agent_SOURCES = cpg_test_agent.c common_test_agent.c
-cpg_test_agent_LDADD =  -lcpg -lcoroipcc ../../exec/coropoll.o 
../../exec/crypto.o
+cpg_test_agent_LDADD =  -lcpg -lcfg -lcoroipcc ../../exec/coropoll.o 
../../exec/crypto.o
 cpg_test_agent_LDFLAGS =  -L../../lib -L.
 
 confdb_test_agent_SOURCES = confdb_test_agent.c common_test_agent.c
diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
index 1b6a067..7120c08 100644
--- a/cts/agents/cpg_test_agent.c
+++ b/cts/agents/cpg_test_agent.c
@@ -50,6 +50,7 @@
 #include <corosync/totem/coropoll.h>
 #include <corosync/list.h>
 #include <corosync/cpg.h>
+#include <corosync/cfg.h>
 #include "../../exec/crypto.h"
 #include "common_test_agent.h"
 
@@ -82,7 +83,9 @@ static char big_and_buf[HOW_BIG_AND_BUF];
 static int32_t record_config_events_g = 0;
 static int32_t record_messages_g = 0;
 static cpg_handle_t cpg_handle = 0;
+static corosync_cfg_handle_t cfg_handle = 0;
 static int32_t cpg_fd = -1;
+static int32_t cfg_fd = -1;
 static struct list_head config_chg_log_head;
 static struct list_head msg_log_head;
 static pid_t my_pid;
@@ -91,7 +94,7 @@ static int32_t my_seq;
 static int32_t use_zcb = 0;
 static int32_t my_msgs_to_send;
 static int32_t total_stored_msgs = 0;
-
+static int32_t in_cnchg = 0;
 
 static void send_some_more_messages (void * unused);
 
@@ -209,8 +212,25 @@ static void config_change_callback (
                        list_add_tail (&log_pt->list, &config_chg_log_head);
                }
        }
+       in_cnchg = 1;
+       send_some_more_messages (NULL);
+       in_cnchg = 0;
 }
 
+static void my_shutdown_callback (corosync_cfg_handle_t handle,
+       corosync_cfg_shutdown_flags_t flags)
+{
+       syslog (LOG_CRIT, "%s flags:%d", __func__, flags);
+       if (flags & COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
+               corosync_cfg_replyto_shutdown (cfg_handle, 
COROSYNC_CFG_SHUTDOWN_FLAG_YES);
+       }
+}
+
+
+static corosync_cfg_callbacks_t cfg_callbacks = {
+       .corosync_cfg_shutdown_callback = my_shutdown_callback,
+       .corosync_cfg_state_track_callback = NULL,
+};
 static cpg_callbacks_t callbacks = {
        .cpg_deliver_fn = delivery_callback,
        .cpg_confchg_fn = config_change_callback,
@@ -365,6 +385,13 @@ free_buffer:
 }
 
 
+#define cs_repeat(counter, max, code) do {             \
+       code;                                           \
+       if(res == CS_ERR_TRY_AGAIN) {                   \
+           counter++;                                  \
+           sleep(counter);                             \
+       }                                               \
+    } while(res == CS_ERR_TRY_AGAIN && counter < max)
 
 static unsigned char buffer[200000];
 static void send_some_more_messages_normal (void)
@@ -377,6 +404,8 @@ static void send_some_more_messages_normal (void)
        hash_state sha1_hash;
        cs_error_t res;
        cpg_flow_control_state_t fc_state;
+       int retries = 0;
+       time_t before;
 
        if (cpg_fd < 0)
                return;
@@ -402,29 +431,42 @@ static void send_some_more_messages_normal (void)
        iov[1].iov_base = buffer;
 
        for (i = 0; i < send_now; i++) {
-
-               res = cpg_flow_control_state_get (cpg_handle, &fc_state);
-               if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
-                       /* lets do this later */
-                       send_some_more_messages_later ();
-                       syslog (LOG_INFO, "%s() flow control enabled.", 
__func__);
-                       return;
-               }
-
-               res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
-               if (res == CS_ERR_TRY_AGAIN) {
-                       /* lets do this later */
-                       send_some_more_messages_later ();
-                       syslog (LOG_INFO, "%s() cpg_mcast_joined() says try 
again.",
-                               __func__);
-                       return;
-               } else
+               if (in_cnchg) {
+                       retries = 0;
+                       before = time(NULL);
+                       cs_repeat(retries, 30, res = 
cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1));
+                       if (retries > 20) {
+                               syslog (LOG_ERR, "%s() -> cs_repeat: blocked 
for :%lu secs.",
+                                       __func__, (unsigned long)(time(NULL) - 
before));
+                       }
                        if (res != CS_OK) {
-                               syslog (LOG_ERR, "%s() -> cpg_mcast_joined 
error:%d, exiting.",
+                               syslog (LOG_ERR, "%s() -> cpg_mcast_joined 
error:%d.",
                                        __func__, res);
-                               exit (-2);
+                               return;
+                       }
+               } else {
+                       res = cpg_flow_control_state_get (cpg_handle, 
&fc_state);
+                       if (res == CS_OK && fc_state == 
CPG_FLOW_CONTROL_ENABLED) {
+                               /* lets do this later */
+                               send_some_more_messages_later ();
+                               syslog (LOG_INFO, "%s() flow control enabled.", 
__func__);
+                               return;
                        }
 
+                       res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, 
iov, 2);
+                       if (res == CS_ERR_TRY_AGAIN) {
+                               /* lets do this later */
+                               send_some_more_messages_later ();
+                               syslog (LOG_INFO, "%s() cpg_mcast_joined() says 
try again.",
+                                       __func__);
+                               return;
+                       } else
+                               if (res != CS_OK) {
+                                       syslog (LOG_ERR, "%s() -> 
cpg_mcast_joined error:%d, exiting.",
+                                               __func__, res);
+                                       exit (-2);
+                               }
+               }
                my_msgs_to_send--;
        }
 }
@@ -493,18 +535,52 @@ static void msg_blaster_zcb (int sock, char* 
num_to_send_str)
        send_some_more_messages_zcb ();
 }
 
+static corosync_cfg_state_notification_t notification_buffer;
+
+static int cfg_dispatch_wrapper_fn (hdb_handle_t handle,
+       int fd,
+       int revents,
+       void *data)
+{
+       cs_error_t error;
+       if (revents & POLLHUP || revents & POLLERR) {
+               syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CFG", 
__func__);
+               poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
+               close (cfg_fd);
+               cfg_fd = -1;
+               return -1;
+       }
+       error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
+       if (error == CS_ERR_LIBRARY) {
+               syslog (LOG_ERR, "%s() got LIB error disconnecting from CFG.", 
__func__);
+               poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
+               close (cfg_fd);
+               cfg_fd = -1;
+               return -1;
+       }
+       return 0;
+}
 
 static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
        int fd,
        int revents,
        void *data)
 {
-       cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
+       cs_error_t error;
+       if (revents & POLLHUP || revents & POLLERR) {
+               syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CPG", 
__func__);
+               poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
+               close (cpg_fd);
+               cpg_fd = -1;
+               return -1;
+       }
+       error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
        if (error == CS_ERR_LIBRARY) {
-               syslog (LOG_ERR, "%s() got LIB error disconnecting from 
corosync.", __func__);
+               syslog (LOG_ERR, "%s() got LIB error disconnecting from CPG", 
__func__);
                poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
                close (cpg_fd);
                cpg_fd = -1;
+               return -1;
        }
        return 0;
 }
@@ -602,6 +678,33 @@ static void do_command (int sock, char* func, char*args[], 
int num_args)
        } else if (strcmp ("are_you_ok_dude", func) == 0) {
                snprintf (response, 100, "%s", OK_STR);
                send (sock, response, strlen (response), 0);
+
+       } else if (strcmp ("cfg_shutdown", func) == 0) {
+
+               corosync_cfg_try_shutdown (cfg_handle, 
COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
+
+       } else if (strcmp ("cfg_initialize",func) == 0) {
+               int retry_count = 0;
+
+               syslog (LOG_INFO,"%s %s() called!", __func__, func);
+               result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
+               while (result != CS_OK) {
+                       syslog (LOG_ERR,
+                               "cfg_initialize error %d (attempt %d)\n",
+                               result, retry_count);
+                       if (retry_count >= 3) {
+                               exit (1);
+                       }
+                       sleep(1);
+                       retry_count++;
+                       result = corosync_cfg_initialize (&cfg_handle, 
&cfg_callbacks);
+               }
+
+               corosync_cfg_fd_get (cfg_handle, &cfg_fd);
+
+               corosync_cfg_state_track (cfg_handle, 0, &notification_buffer);
+
+               poll_dispatch_add (ta_poll_handle_get(), cfg_fd, 
POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn);
        } else {
                syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
        }
diff --git a/cts/corotests.py b/cts/corotests.py
index 10bdfc0..55c0792 100644
--- a/cts/corotests.py
+++ b/cts/corotests.py
@@ -79,6 +79,7 @@ class CoroTest(CTSTest):
             if self.need_all_up and self.CM.start_cpg:
                 self.CM.cpg_agent[n].clean_start()
                 self.CM.cpg_agent[n].cpg_join(self.name)
+                self.CM.cpg_agent[n].cfg_initialize()
             if not self.need_all_up and self.CM.StataCM(n):
                 self.incr("stopped")
                 self.stop(n)
@@ -1019,9 +1020,57 @@ class GenSimulStop(CoroTest):
         return self.success()
 
 
+###################################################################
+class GenStopAllBeekhof(CoroTest):
+    '''Stop all the nodes ~ simultaneously'''
+
+    def __init__(self, cm):
+        CoroTest.__init__(self,cm)
+        self.name="GenStopAllBeekhof"
+        self.need_all_up = True
+
+    def __call__(self, node):
+        '''Perform the 'GenStopAllBeekhof' test. '''
+        self.incr("calls")
+
+        stopping = int(time.time())
+        for n in self.CM.Env["nodes"]:
+            self.CM.cpg_agent[n].msg_blaster(10000)
+            self.CM.cpg_agent[n].cfg_shutdown()
+            self.CM.ShouldBeStatus[n] = "down"
+
+        waited = 0
+        max_wait = 60
+
+        still_up = list(self.CM.Env["nodes"])
+        while len(still_up) > 0:
+            waited = int(time.time()) - stopping
+            self.CM.log("%s still up %s; waited %d secs" % (self.name, 
str(still_up), waited))
+            if waited > max_wait:
+                break
+            time.sleep(3)
+            for v in self.CM.Env["nodes"]:
+                if v in still_up:
+                    self.CM.ShouldBeStatus[n] = "down"
+                    if not self.CM.StataCM(v):
+                        still_up.remove(v)
+        
+        waited = int(time.time()) - stopping
+        if waited > max_wait:
+            for v in still_up:
+                self.CM.log("%s killing corosync on %s" % (self.name, v))
+                self.CM.rsh(v, 'killall -SIGSEGV corosync cpg_test_agent')
+            return self.failure("Waited %d secs for nodes: %s to stop" % 
(waited, str(still_up)))
+
+        self.CM.log("%s ALL good            (waited %d secs)" % (self.name, 
waited))
+        return self.success()
+
+
+
 GenTestClasses = []
 GenTestClasses.append(GenSimulStart)
 GenTestClasses.append(GenSimulStop)
+GenTestClasses.append(GenStopAllBeekhof)
 GenTestClasses.append(CpgMsgOrderBasic)
 GenTestClasses.append(CpgMsgOrderZcb)
 GenTestClasses.append(CpgCfgChgOnExecCrash)
@@ -1083,19 +1132,38 @@ def CoroTestList(cm, audits):
     a = ConfigContainer('none_5min')
     a['compatibility'] = 'none'
     a['totem/token'] = (5 * 60 * 1000)
+    a['totem/consensus'] = (5 * 60 * 1000 * 1.2)
     configs.append(a)
 
-    b = ConfigContainer('whitetank_5min')
+    b = ConfigContainer('pcmk_basic')
     b['compatibility'] = 'whitetank'
-    b['totem/token'] = (5 * 60 * 1000)
+    b['totem/token'] = 5000
+    b['totem/token_retransmits_before_loss_const'] = 10
+    b['totem/join'] = 1000
+    b['totem/consensus'] = 7500
     configs.append(b)
 
-    c = ConfigContainer('sec_nss')
+    c = ConfigContainer('pcmk_sec_nss')
     c['totem/secauth'] = 'on'
     c['totem/crypto_accept'] = 'new'
     c['totem/crypto_type'] = 'nss'
+    c['totem/token'] = 5000
+    c['totem/token_retransmits_before_loss_const'] = 10
+    c['totem/join'] = 1000
+    c['totem/consensus'] = 7500
     configs.append(c)
 
+    s = ConfigContainer('pcmk_vq')
+    s['quorum/provider'] = 'corosync_votequorum'
+    s['quorum/expected_votes'] = len(cm.Env["nodes"])
+    s['totem/token'] = 5000
+    s['totem/token_retransmits_before_loss_const'] = 10
+    s['totem/join'] = 1000
+    s['totem/vsftype'] = 'none'
+    s['totem/consensus'] = 7500
+    s['totem/max_messages'] = 20
+    configs.append(s)
+
     d = ConfigContainer('sec_sober')
     d['totem/secauth'] = 'on'
     d['totem/crypto_type'] = 'sober'
@@ -1105,11 +1173,6 @@ def CoroTestList(cm, audits):
     e['totem/threads'] = 4
     configs.append(e)
 
-    #quorum/provider=
-    #f = {}
-    #f['quorum/provider'] = 'corosync_quorum_ykd'
-    #configs.append(f)
-
     if not cm.Env["RrpBindAddr"] is None:
         g = ConfigContainer('rrp_passive')
         g['totem/rrp_mode'] = 'passive'
-- 
1.7.1

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to