good for merge On 07/14/2010 08:56 PM, Angus Salkeld wrote: > Bug: > sometimes shutdown can take a long time if all nodes > are shutdown together. > > Signed-off-by: Angus Salkeld<[email protected]> > --- > cts/agents/Makefile.am | 2 +- > cts/agents/cpg_test_agent.c | 147 > ++++++++++++++++++++++++++++++++++++------- > cts/corotests.py | 79 +++++++++++++++++++++--- > 3 files changed, 197 insertions(+), 31 deletions(-) > > diff --git a/cts/agents/Makefile.am b/cts/agents/Makefile.am > index 8a3278a..3e1f419 100644 > --- a/cts/agents/Makefile.am > +++ b/cts/agents/Makefile.am > @@ -58,7 +58,7 @@ endif > noinst_HEADERS = common_test_agent.h > > cpg_test_agent_SOURCES = cpg_test_agent.c common_test_agent.c > -cpg_test_agent_LDADD = -lcpg -lcoroipcc ../../exec/coropoll.o > ../../exec/crypto.o > +cpg_test_agent_LDADD = -lcpg -lcfg -lcoroipcc ../../exec/coropoll.o > ../../exec/crypto.o > cpg_test_agent_LDFLAGS = -L../../lib -L. > > confdb_test_agent_SOURCES = confdb_test_agent.c common_test_agent.c > diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c > index 1b6a067..7120c08 100644 > --- a/cts/agents/cpg_test_agent.c > +++ b/cts/agents/cpg_test_agent.c > @@ -50,6 +50,7 @@ > #include<corosync/totem/coropoll.h> > #include<corosync/list.h> > #include<corosync/cpg.h> > +#include<corosync/cfg.h> > #include "../../exec/crypto.h" > #include "common_test_agent.h" > > @@ -82,7 +83,9 @@ static char big_and_buf[HOW_BIG_AND_BUF]; > static int32_t record_config_events_g = 0; > static int32_t record_messages_g = 0; > static cpg_handle_t cpg_handle = 0; > +static corosync_cfg_handle_t cfg_handle = 0; > static int32_t cpg_fd = -1; > +static int32_t cfg_fd = -1; > static struct list_head config_chg_log_head; > static struct list_head msg_log_head; > static pid_t my_pid; > @@ -91,7 +94,7 @@ static int32_t my_seq; > static int32_t use_zcb = 0; > static int32_t my_msgs_to_send; > static int32_t total_stored_msgs = 0; > - > +static int32_t in_cnchg = 0; > > static void send_some_more_messages (void * unused); > > @@ -209,8 +212,25 @@ static void config_change_callback ( > list_add_tail (&log_pt->list,&config_chg_log_head); > } > } > + in_cnchg = 1; > + send_some_more_messages (NULL); > + in_cnchg = 0; > } > > +static void my_shutdown_callback (corosync_cfg_handle_t handle, > + corosync_cfg_shutdown_flags_t flags) > +{ > + syslog (LOG_CRIT, "%s flags:%d", __func__, flags); > + if (flags& COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) { > + corosync_cfg_replyto_shutdown (cfg_handle, > COROSYNC_CFG_SHUTDOWN_FLAG_YES); > + } > +} > + > + > +static corosync_cfg_callbacks_t cfg_callbacks = { > + .corosync_cfg_shutdown_callback = my_shutdown_callback, > + .corosync_cfg_state_track_callback = NULL, > +}; > static cpg_callbacks_t callbacks = { > .cpg_deliver_fn = delivery_callback, > .cpg_confchg_fn = config_change_callback, > @@ -365,6 +385,13 @@ free_buffer: > } > > > +#define cs_repeat(counter, max, code) do { \ > + code; \ > + if(res == CS_ERR_TRY_AGAIN) { \ > + counter++; \ > + sleep(counter); \ > + } \ > + } while(res == CS_ERR_TRY_AGAIN&& counter< max) > > static unsigned char buffer[200000]; > static void send_some_more_messages_normal (void) > @@ -377,6 +404,8 @@ static void send_some_more_messages_normal (void) > hash_state sha1_hash; > cs_error_t res; > cpg_flow_control_state_t fc_state; > + int retries = 0; > + time_t before; > > if (cpg_fd< 0) > return; > @@ -402,29 +431,42 @@ static void send_some_more_messages_normal (void) > iov[1].iov_base = buffer; > > for (i = 0; i< send_now; i++) { > - > - res = cpg_flow_control_state_get (cpg_handle,&fc_state); > - if (res == CS_OK&& fc_state == CPG_FLOW_CONTROL_ENABLED) { > - /* lets do this later */ > - send_some_more_messages_later (); > - syslog (LOG_INFO, "%s() flow control enabled.", > __func__); > - return; > - } > - > - res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2); > - if (res == CS_ERR_TRY_AGAIN) { > - /* lets do this later */ > - send_some_more_messages_later (); > - syslog (LOG_INFO, "%s() cpg_mcast_joined() says try > again.", > - __func__); > - return; > - } else > + if (in_cnchg) { > + retries = 0; > + before = time(NULL); > + cs_repeat(retries, 30, res = > cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1)); > + if (retries> 20) { > + syslog (LOG_ERR, "%s() -> cs_repeat: blocked > for :%lu secs.", > + __func__, (unsigned long)(time(NULL) - > before)); > + } > if (res != CS_OK) { > - syslog (LOG_ERR, "%s() -> cpg_mcast_joined > error:%d, exiting.", > + syslog (LOG_ERR, "%s() -> cpg_mcast_joined > error:%d.", > __func__, res); > - exit (-2); > + return; > + } > + } else { > + res = cpg_flow_control_state_get (cpg_handle,&fc_state); > + if (res == CS_OK&& fc_state == > CPG_FLOW_CONTROL_ENABLED) { > + /* lets do this later */ > + send_some_more_messages_later (); > + syslog (LOG_INFO, "%s() flow control enabled.", > __func__); > + return; > } > > + res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, > iov, 2); > + if (res == CS_ERR_TRY_AGAIN) { > + /* lets do this later */ > + send_some_more_messages_later (); > + syslog (LOG_INFO, "%s() cpg_mcast_joined() says > try again.", > + __func__); > + return; > + } else > + if (res != CS_OK) { > + syslog (LOG_ERR, "%s() -> > cpg_mcast_joined error:%d, exiting.", > + __func__, res); > + exit (-2); > + } > + } > my_msgs_to_send--; > } > } > @@ -493,18 +535,52 @@ static void msg_blaster_zcb (int sock, char* > num_to_send_str) > send_some_more_messages_zcb (); > } > > +static corosync_cfg_state_notification_t notification_buffer; > + > +static int cfg_dispatch_wrapper_fn (hdb_handle_t handle, > + int fd, > + int revents, > + void *data) > +{ > + cs_error_t error; > + if (revents& POLLHUP || revents& POLLERR) { > + syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CFG", > __func__); > + poll_dispatch_delete (ta_poll_handle_get(), cfg_fd); > + close (cfg_fd); > + cfg_fd = -1; > + return -1; > + } > + error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL); > + if (error == CS_ERR_LIBRARY) { > + syslog (LOG_ERR, "%s() got LIB error disconnecting from CFG.", > __func__); > + poll_dispatch_delete (ta_poll_handle_get(), cfg_fd); > + close (cfg_fd); > + cfg_fd = -1; > + return -1; > + } > + return 0; > +} > > static int cpg_dispatch_wrapper_fn (hdb_handle_t handle, > int fd, > int revents, > void *data) > { > - cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); > + cs_error_t error; > + if (revents& POLLHUP || revents& POLLERR) { > + syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CPG", > __func__); > + poll_dispatch_delete (ta_poll_handle_get(), cpg_fd); > + close (cpg_fd); > + cpg_fd = -1; > + return -1; > + } > + error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL); > if (error == CS_ERR_LIBRARY) { > - syslog (LOG_ERR, "%s() got LIB error disconnecting from > corosync.", __func__); > + syslog (LOG_ERR, "%s() got LIB error disconnecting from CPG", > __func__); > poll_dispatch_delete (ta_poll_handle_get(), cpg_fd); > close (cpg_fd); > cpg_fd = -1; > + return -1; > } > return 0; > } > @@ -602,6 +678,33 @@ static void do_command (int sock, char* func, > char*args[], int num_args) > } else if (strcmp ("are_you_ok_dude", func) == 0) { > snprintf (response, 100, "%s", OK_STR); > send (sock, response, strlen (response), 0); > + > + } else if (strcmp ("cfg_shutdown", func) == 0) { > + > + corosync_cfg_try_shutdown (cfg_handle, > COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST); > + > + } else if (strcmp ("cfg_initialize",func) == 0) { > + int retry_count = 0; > + > + syslog (LOG_INFO,"%s %s() called!", __func__, func); > + result = corosync_cfg_initialize (&cfg_handle,&cfg_callbacks); > + while (result != CS_OK) { > + syslog (LOG_ERR, > + "cfg_initialize error %d (attempt %d)\n", > + result, retry_count); > + if (retry_count>= 3) { > + exit (1); > + } > + sleep(1); > + retry_count++; > + result = corosync_cfg_initialize > (&cfg_handle,&cfg_callbacks); > + } > + > + corosync_cfg_fd_get (cfg_handle,&cfg_fd); > + > + corosync_cfg_state_track (cfg_handle, 0,¬ification_buffer); > + > + poll_dispatch_add (ta_poll_handle_get(), cfg_fd, > POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn); > } else { > syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func); > } > diff --git a/cts/corotests.py b/cts/corotests.py > index 10bdfc0..55c0792 100644 > --- a/cts/corotests.py > +++ b/cts/corotests.py > @@ -79,6 +79,7 @@ class CoroTest(CTSTest): > if self.need_all_up and self.CM.start_cpg: > self.CM.cpg_agent[n].clean_start() > self.CM.cpg_agent[n].cpg_join(self.name) > + self.CM.cpg_agent[n].cfg_initialize() > if not self.need_all_up and self.CM.StataCM(n): > self.incr("stopped") > self.stop(n) > @@ -1019,9 +1020,57 @@ class GenSimulStop(CoroTest): > return self.success() > > > +################################################################### > +class GenStopAllBeekhof(CoroTest): > + '''Stop all the nodes ~ simultaneously''' > + > + def __init__(self, cm): > + CoroTest.__init__(self,cm) > + self.name="GenStopAllBeekhof" > + self.need_all_up = True > + > + def __call__(self, node): > + '''Perform the 'GenStopAllBeekhof' test. ''' > + self.incr("calls") > + > + stopping = int(time.time()) > + for n in self.CM.Env["nodes"]: > + self.CM.cpg_agent[n].msg_blaster(10000) > + self.CM.cpg_agent[n].cfg_shutdown() > + self.CM.ShouldBeStatus[n] = "down" > + > + waited = 0 > + max_wait = 60 > + > + still_up = list(self.CM.Env["nodes"]) > + while len(still_up)> 0: > + waited = int(time.time()) - stopping > + self.CM.log("%s still up %s; waited %d secs" % (self.name, > str(still_up), waited)) > + if waited> max_wait: > + break > + time.sleep(3) > + for v in self.CM.Env["nodes"]: > + if v in still_up: > + self.CM.ShouldBeStatus[n] = "down" > + if not self.CM.StataCM(v): > + still_up.remove(v) > + > + waited = int(time.time()) - stopping > + if waited> max_wait: > + for v in still_up: > + self.CM.log("%s killing corosync on %s" % (self.name, v)) > + self.CM.rsh(v, 'killall -SIGSEGV corosync cpg_test_agent') > + return self.failure("Waited %d secs for nodes: %s to stop" % > (waited, str(still_up))) > + > + self.CM.log("%s ALL good (waited %d secs)" % (self.name, > waited)) > + return self.success() > + > + > + > GenTestClasses = [] > GenTestClasses.append(GenSimulStart) > GenTestClasses.append(GenSimulStop) > +GenTestClasses.append(GenStopAllBeekhof) > GenTestClasses.append(CpgMsgOrderBasic) > GenTestClasses.append(CpgMsgOrderZcb) > GenTestClasses.append(CpgCfgChgOnExecCrash) > @@ -1083,19 +1132,38 @@ def CoroTestList(cm, audits): > a = ConfigContainer('none_5min') > a['compatibility'] = 'none' > a['totem/token'] = (5 * 60 * 1000) > + a['totem/consensus'] = (5 * 60 * 1000 * 1.2) > configs.append(a) > > - b = ConfigContainer('whitetank_5min') > + b = ConfigContainer('pcmk_basic') > b['compatibility'] = 'whitetank' > - b['totem/token'] = (5 * 60 * 1000) > + b['totem/token'] = 5000 > + b['totem/token_retransmits_before_loss_const'] = 10 > + b['totem/join'] = 1000 > + b['totem/consensus'] = 7500 > configs.append(b) > > - c = ConfigContainer('sec_nss') > + c = ConfigContainer('pcmk_sec_nss') > c['totem/secauth'] = 'on' > c['totem/crypto_accept'] = 'new' > c['totem/crypto_type'] = 'nss' > + c['totem/token'] = 5000 > + c['totem/token_retransmits_before_loss_const'] = 10 > + c['totem/join'] = 1000 > + c['totem/consensus'] = 7500 > configs.append(c) > > + s = ConfigContainer('pcmk_vq') > + s['quorum/provider'] = 'corosync_votequorum' > + s['quorum/expected_votes'] = len(cm.Env["nodes"]) > + s['totem/token'] = 5000 > + s['totem/token_retransmits_before_loss_const'] = 10 > + s['totem/join'] = 1000 > + s['totem/vsftype'] = 'none' > + s['totem/consensus'] = 7500 > + s['totem/max_messages'] = 20 > + configs.append(s) > + > d = ConfigContainer('sec_sober') > d['totem/secauth'] = 'on' > d['totem/crypto_type'] = 'sober' > @@ -1105,11 +1173,6 @@ def CoroTestList(cm, audits): > e['totem/threads'] = 4 > configs.append(e) > > - #quorum/provider= > - #f = {} > - #f['quorum/provider'] = 'corosync_quorum_ykd' > - #configs.append(f) > - > if not cm.Env["RrpBindAddr"] is None: > g = ConfigContainer('rrp_passive') > g['totem/rrp_mode'] = 'passive'
_______________________________________________ Openais mailing list [email protected] https://lists.linux-foundation.org/mailman/listinfo/openais
