good for merge

On 07/14/2010 08:56 PM, Angus Salkeld wrote:
> Bug:
> sometimes shutdown can take a long time if all nodes
> are shutdown together.
>
> Signed-off-by: Angus Salkeld<[email protected]>
> ---
>   cts/agents/Makefile.am      |    2 +-
>   cts/agents/cpg_test_agent.c |  147 
> ++++++++++++++++++++++++++++++++++++-------
>   cts/corotests.py            |   79 +++++++++++++++++++++---
>   3 files changed, 197 insertions(+), 31 deletions(-)
>
> diff --git a/cts/agents/Makefile.am b/cts/agents/Makefile.am
> index 8a3278a..3e1f419 100644
> --- a/cts/agents/Makefile.am
> +++ b/cts/agents/Makefile.am
> @@ -58,7 +58,7 @@ endif
>   noinst_HEADERS          = common_test_agent.h
>
>   cpg_test_agent_SOURCES = cpg_test_agent.c common_test_agent.c
> -cpg_test_agent_LDADD =  -lcpg -lcoroipcc ../../exec/coropoll.o 
> ../../exec/crypto.o
> +cpg_test_agent_LDADD =  -lcpg -lcfg -lcoroipcc ../../exec/coropoll.o 
> ../../exec/crypto.o
>   cpg_test_agent_LDFLAGS =  -L../../lib -L.
>
>   confdb_test_agent_SOURCES = confdb_test_agent.c common_test_agent.c
> diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
> index 1b6a067..7120c08 100644
> --- a/cts/agents/cpg_test_agent.c
> +++ b/cts/agents/cpg_test_agent.c
> @@ -50,6 +50,7 @@
>   #include<corosync/totem/coropoll.h>
>   #include<corosync/list.h>
>   #include<corosync/cpg.h>
> +#include<corosync/cfg.h>
>   #include "../../exec/crypto.h"
>   #include "common_test_agent.h"
>
> @@ -82,7 +83,9 @@ static char big_and_buf[HOW_BIG_AND_BUF];
>   static int32_t record_config_events_g = 0;
>   static int32_t record_messages_g = 0;
>   static cpg_handle_t cpg_handle = 0;
> +static corosync_cfg_handle_t cfg_handle = 0;
>   static int32_t cpg_fd = -1;
> +static int32_t cfg_fd = -1;
>   static struct list_head config_chg_log_head;
>   static struct list_head msg_log_head;
>   static pid_t my_pid;
> @@ -91,7 +94,7 @@ static int32_t my_seq;
>   static int32_t use_zcb = 0;
>   static int32_t my_msgs_to_send;
>   static int32_t total_stored_msgs = 0;
> -
> +static int32_t in_cnchg = 0;
>
>   static void send_some_more_messages (void * unused);
>
> @@ -209,8 +212,25 @@ static void config_change_callback (
>                       list_add_tail (&log_pt->list,&config_chg_log_head);
>               }
>       }
> +     in_cnchg = 1;
> +     send_some_more_messages (NULL);
> +     in_cnchg = 0;
>   }
>
> +static void my_shutdown_callback (corosync_cfg_handle_t handle,
> +     corosync_cfg_shutdown_flags_t flags)
> +{
> +     syslog (LOG_CRIT, "%s flags:%d", __func__, flags);
> +     if (flags&  COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
> +             corosync_cfg_replyto_shutdown (cfg_handle, 
> COROSYNC_CFG_SHUTDOWN_FLAG_YES);
> +     }
> +}
> +
> +
> +static corosync_cfg_callbacks_t cfg_callbacks = {
> +     .corosync_cfg_shutdown_callback = my_shutdown_callback,
> +     .corosync_cfg_state_track_callback = NULL,
> +};
>   static cpg_callbacks_t callbacks = {
>       .cpg_deliver_fn = delivery_callback,
>       .cpg_confchg_fn = config_change_callback,
> @@ -365,6 +385,13 @@ free_buffer:
>   }
>
>
> +#define cs_repeat(counter, max, code) do {           \
> +     code;                                           \
> +     if(res == CS_ERR_TRY_AGAIN) {                   \
> +         counter++;                                  \
> +         sleep(counter);                             \
> +     }                                               \
> +    } while(res == CS_ERR_TRY_AGAIN&&  counter<  max)
>
>   static unsigned char buffer[200000];
>   static void send_some_more_messages_normal (void)
> @@ -377,6 +404,8 @@ static void send_some_more_messages_normal (void)
>       hash_state sha1_hash;
>       cs_error_t res;
>       cpg_flow_control_state_t fc_state;
> +     int retries = 0;
> +     time_t before;
>
>       if (cpg_fd<  0)
>               return;
> @@ -402,29 +431,42 @@ static void send_some_more_messages_normal (void)
>       iov[1].iov_base = buffer;
>
>       for (i = 0; i<  send_now; i++) {
> -
> -             res = cpg_flow_control_state_get (cpg_handle,&fc_state);
> -             if (res == CS_OK&&  fc_state == CPG_FLOW_CONTROL_ENABLED) {
> -                     /* lets do this later */
> -                     send_some_more_messages_later ();
> -                     syslog (LOG_INFO, "%s() flow control enabled.", 
> __func__);
> -                     return;
> -             }
> -
> -             res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
> -             if (res == CS_ERR_TRY_AGAIN) {
> -                     /* lets do this later */
> -                     send_some_more_messages_later ();
> -                     syslog (LOG_INFO, "%s() cpg_mcast_joined() says try 
> again.",
> -                             __func__);
> -                     return;
> -             } else
> +             if (in_cnchg) {
> +                     retries = 0;
> +                     before = time(NULL);
> +                     cs_repeat(retries, 30, res = 
> cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1));
> +                     if (retries>  20) {
> +                             syslog (LOG_ERR, "%s() ->  cs_repeat: blocked 
> for :%lu secs.",
> +                                     __func__, (unsigned long)(time(NULL) - 
> before));
> +                     }
>                       if (res != CS_OK) {
> -                             syslog (LOG_ERR, "%s() ->  cpg_mcast_joined 
> error:%d, exiting.",
> +                             syslog (LOG_ERR, "%s() ->  cpg_mcast_joined 
> error:%d.",
>                                       __func__, res);
> -                             exit (-2);
> +                             return;
> +                     }
> +             } else {
> +                     res = cpg_flow_control_state_get (cpg_handle,&fc_state);
> +                     if (res == CS_OK&&  fc_state == 
> CPG_FLOW_CONTROL_ENABLED) {
> +                             /* lets do this later */
> +                             send_some_more_messages_later ();
> +                             syslog (LOG_INFO, "%s() flow control enabled.", 
> __func__);
> +                             return;
>                       }
>
> +                     res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, 
> iov, 2);
> +                     if (res == CS_ERR_TRY_AGAIN) {
> +                             /* lets do this later */
> +                             send_some_more_messages_later ();
> +                             syslog (LOG_INFO, "%s() cpg_mcast_joined() says 
> try again.",
> +                                     __func__);
> +                             return;
> +                     } else
> +                             if (res != CS_OK) {
> +                                     syslog (LOG_ERR, "%s() ->  
> cpg_mcast_joined error:%d, exiting.",
> +                                             __func__, res);
> +                                     exit (-2);
> +                             }
> +             }
>               my_msgs_to_send--;
>       }
>   }
> @@ -493,18 +535,52 @@ static void msg_blaster_zcb (int sock, char* 
> num_to_send_str)
>       send_some_more_messages_zcb ();
>   }
>
> +static corosync_cfg_state_notification_t notification_buffer;
> +
> +static int cfg_dispatch_wrapper_fn (hdb_handle_t handle,
> +     int fd,
> +     int revents,
> +     void *data)
> +{
> +     cs_error_t error;
> +     if (revents&  POLLHUP || revents&  POLLERR) {
> +             syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CFG", 
> __func__);
> +             poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
> +             close (cfg_fd);
> +             cfg_fd = -1;
> +             return -1;
> +     }
> +     error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
> +     if (error == CS_ERR_LIBRARY) {
> +             syslog (LOG_ERR, "%s() got LIB error disconnecting from CFG.", 
> __func__);
> +             poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
> +             close (cfg_fd);
> +             cfg_fd = -1;
> +             return -1;
> +     }
> +     return 0;
> +}
>
>   static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
>       int fd,
>       int revents,
>       void *data)
>   {
> -     cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
> +     cs_error_t error;
> +     if (revents&  POLLHUP || revents&  POLLERR) {
> +             syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CPG", 
> __func__);
> +             poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
> +             close (cpg_fd);
> +             cpg_fd = -1;
> +             return -1;
> +     }
> +     error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
>       if (error == CS_ERR_LIBRARY) {
> -             syslog (LOG_ERR, "%s() got LIB error disconnecting from 
> corosync.", __func__);
> +             syslog (LOG_ERR, "%s() got LIB error disconnecting from CPG", 
> __func__);
>               poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
>               close (cpg_fd);
>               cpg_fd = -1;
> +             return -1;
>       }
>       return 0;
>   }
> @@ -602,6 +678,33 @@ static void do_command (int sock, char* func, 
> char*args[], int num_args)
>       } else if (strcmp ("are_you_ok_dude", func) == 0) {
>               snprintf (response, 100, "%s", OK_STR);
>               send (sock, response, strlen (response), 0);
> +
> +     } else if (strcmp ("cfg_shutdown", func) == 0) {
> +
> +             corosync_cfg_try_shutdown (cfg_handle, 
> COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
> +
> +     } else if (strcmp ("cfg_initialize",func) == 0) {
> +             int retry_count = 0;
> +
> +             syslog (LOG_INFO,"%s %s() called!", __func__, func);
> +             result = corosync_cfg_initialize (&cfg_handle,&cfg_callbacks);
> +             while (result != CS_OK) {
> +                     syslog (LOG_ERR,
> +                             "cfg_initialize error %d (attempt %d)\n",
> +                             result, retry_count);
> +                     if (retry_count>= 3) {
> +                             exit (1);
> +                     }
> +                     sleep(1);
> +                     retry_count++;
> +                     result = corosync_cfg_initialize 
> (&cfg_handle,&cfg_callbacks);
> +             }
> +
> +             corosync_cfg_fd_get (cfg_handle,&cfg_fd);
> +
> +             corosync_cfg_state_track (cfg_handle, 0,&notification_buffer);
> +
> +             poll_dispatch_add (ta_poll_handle_get(), cfg_fd, 
> POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn);
>       } else {
>               syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
>       }
> diff --git a/cts/corotests.py b/cts/corotests.py
> index 10bdfc0..55c0792 100644
> --- a/cts/corotests.py
> +++ b/cts/corotests.py
> @@ -79,6 +79,7 @@ class CoroTest(CTSTest):
>               if self.need_all_up and self.CM.start_cpg:
>                   self.CM.cpg_agent[n].clean_start()
>                   self.CM.cpg_agent[n].cpg_join(self.name)
> +                self.CM.cpg_agent[n].cfg_initialize()
>               if not self.need_all_up and self.CM.StataCM(n):
>                   self.incr("stopped")
>                   self.stop(n)
> @@ -1019,9 +1020,57 @@ class GenSimulStop(CoroTest):
>           return self.success()
>
>
> +###################################################################
> +class GenStopAllBeekhof(CoroTest):
> +    '''Stop all the nodes ~ simultaneously'''
> +
> +    def __init__(self, cm):
> +        CoroTest.__init__(self,cm)
> +        self.name="GenStopAllBeekhof"
> +        self.need_all_up = True
> +
> +    def __call__(self, node):
> +        '''Perform the 'GenStopAllBeekhof' test. '''
> +        self.incr("calls")
> +
> +        stopping = int(time.time())
> +        for n in self.CM.Env["nodes"]:
> +            self.CM.cpg_agent[n].msg_blaster(10000)
> +            self.CM.cpg_agent[n].cfg_shutdown()
> +            self.CM.ShouldBeStatus[n] = "down"
> +
> +        waited = 0
> +        max_wait = 60
> +
> +        still_up = list(self.CM.Env["nodes"])
> +        while len(still_up)>  0:
> +            waited = int(time.time()) - stopping
> +            self.CM.log("%s still up %s; waited %d secs" % (self.name, 
> str(still_up), waited))
> +            if waited>  max_wait:
> +                break
> +            time.sleep(3)
> +            for v in self.CM.Env["nodes"]:
> +                if v in still_up:
> +                    self.CM.ShouldBeStatus[n] = "down"
> +                    if not self.CM.StataCM(v):
> +                        still_up.remove(v)
> +
> +        waited = int(time.time()) - stopping
> +        if waited>  max_wait:
> +            for v in still_up:
> +                self.CM.log("%s killing corosync on %s" % (self.name, v))
> +                self.CM.rsh(v, 'killall -SIGSEGV corosync cpg_test_agent')
> +            return self.failure("Waited %d secs for nodes: %s to stop" % 
> (waited, str(still_up)))
> +
> +        self.CM.log("%s ALL good            (waited %d secs)" % (self.name, 
> waited))
> +        return self.success()
> +
> +
> +
>   GenTestClasses = []
>   GenTestClasses.append(GenSimulStart)
>   GenTestClasses.append(GenSimulStop)
> +GenTestClasses.append(GenStopAllBeekhof)
>   GenTestClasses.append(CpgMsgOrderBasic)
>   GenTestClasses.append(CpgMsgOrderZcb)
>   GenTestClasses.append(CpgCfgChgOnExecCrash)
> @@ -1083,19 +1132,38 @@ def CoroTestList(cm, audits):
>       a = ConfigContainer('none_5min')
>       a['compatibility'] = 'none'
>       a['totem/token'] = (5 * 60 * 1000)
> +    a['totem/consensus'] = (5 * 60 * 1000 * 1.2)
>       configs.append(a)
>
> -    b = ConfigContainer('whitetank_5min')
> +    b = ConfigContainer('pcmk_basic')
>       b['compatibility'] = 'whitetank'
> -    b['totem/token'] = (5 * 60 * 1000)
> +    b['totem/token'] = 5000
> +    b['totem/token_retransmits_before_loss_const'] = 10
> +    b['totem/join'] = 1000
> +    b['totem/consensus'] = 7500
>       configs.append(b)
>
> -    c = ConfigContainer('sec_nss')
> +    c = ConfigContainer('pcmk_sec_nss')
>       c['totem/secauth'] = 'on'
>       c['totem/crypto_accept'] = 'new'
>       c['totem/crypto_type'] = 'nss'
> +    c['totem/token'] = 5000
> +    c['totem/token_retransmits_before_loss_const'] = 10
> +    c['totem/join'] = 1000
> +    c['totem/consensus'] = 7500
>       configs.append(c)
>
> +    s = ConfigContainer('pcmk_vq')
> +    s['quorum/provider'] = 'corosync_votequorum'
> +    s['quorum/expected_votes'] = len(cm.Env["nodes"])
> +    s['totem/token'] = 5000
> +    s['totem/token_retransmits_before_loss_const'] = 10
> +    s['totem/join'] = 1000
> +    s['totem/vsftype'] = 'none'
> +    s['totem/consensus'] = 7500
> +    s['totem/max_messages'] = 20
> +    configs.append(s)
> +
>       d = ConfigContainer('sec_sober')
>       d['totem/secauth'] = 'on'
>       d['totem/crypto_type'] = 'sober'
> @@ -1105,11 +1173,6 @@ def CoroTestList(cm, audits):
>       e['totem/threads'] = 4
>       configs.append(e)
>
> -    #quorum/provider=
> -    #f = {}
> -    #f['quorum/provider'] = 'corosync_quorum_ykd'
> -    #configs.append(f)
> -
>       if not cm.Env["RrpBindAddr"] is None:
>           g = ConfigContainer('rrp_passive')
>           g['totem/rrp_mode'] = 'passive'

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to