See SUBJ and Patch Regards, Honza
commit ee0023d2592c4f17924a7bb82fd795329b7cc05b Author: Jan Friesse <jfrie...@redhat.com> Date: Tue Jul 14 11:51:18 2009 +0200 Ability to detect process pause Add ability to detect process pause and not implode the membership algorithm when this occurs. This patch is backport from trunk (2304) diff --git a/branches/whitetank/exec/totemnet.c b/branches/whitetank/exec/totemnet.c index 3686fd1..592e784 100644 --- a/branches/whitetank/exec/totemnet.c +++ b/branches/whitetank/exec/totemnet.c @@ -1528,4 +1528,57 @@ error_exit: return (res); } +int totemnet_recv_mcast_empty ( + totemnet_handle handle) +{ + struct totemnet_instance *instance; + unsigned int res; + struct sockaddr_storage system_from; + struct msghdr msg_recv; + struct pollfd ufd; + int nfds; + int msg_processed = 0; + + res = hdb_handle_get (&totemnet_instance_database, handle, + (void *)&instance); + if (res != 0) { + goto error_exit; + } + + /* + * Receive datagram + */ + msg_recv.msg_name = &system_from; + msg_recv.msg_namelen = sizeof (struct sockaddr_storage); + msg_recv.msg_iov = &instance->totemnet_iov_recv_flush; + msg_recv.msg_iovlen = 1; +#if !defined(COROSYNC_SOLARIS) + msg_recv.msg_control = 0; + msg_recv.msg_controllen = 0; + msg_recv.msg_flags = 0; +#else + msg_recv.msg_accrights = NULL; + msg_recv.msg_accrightslen = 0; +#endif + + do { + ufd.fd = instance->totemnet_sockets.mcast_recv; + ufd.events = POLLIN; + nfds = poll (&ufd, 1, 0); + if (nfds == 1 && ufd.revents & POLLIN) { + res = recvmsg (instance->totemnet_sockets.mcast_recv, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res != -1) { + msg_processed = 1; + } else { + msg_processed = -1; + } + } + } while (nfds == 1); + + hdb_handle_put (&totemnet_instance_database, handle); + + return (msg_processed); +error_exit: + return (res); +} diff --git a/branches/whitetank/exec/totemnet.h b/branches/whitetank/exec/totemnet.h index f4788ab..e1b6ca1 100644 --- a/branches/whitetank/exec/totemnet.h +++ b/branches/whitetank/exec/totemnet.h @@ -108,4 +108,7 @@ extern int totemnet_token_target_set ( totemnet_handle handle, struct totem_ip_address *token_target); +extern int totemnet_recv_mcast_empty ( + totemnet_handle handle); + #endif /* TOTEMNET_H_DEFINED */ diff --git a/branches/whitetank/exec/totemrrp.c b/branches/whitetank/exec/totemrrp.c index f471c5b..445da96 100644 --- a/branches/whitetank/exec/totemrrp.c +++ b/branches/whitetank/exec/totemrrp.c @@ -151,6 +151,9 @@ struct rrp_algo { void (*ring_reenable) ( struct totemrrp_instance *instance); + + int (*mcast_recv_empty) ( + struct totemrrp_instance *instance); }; struct totemrrp_instance { @@ -265,6 +268,9 @@ static void none_token_target_set ( static void none_ring_reenable ( struct totemrrp_instance *instance); +static int none_mcast_recv_empty ( + struct totemrrp_instance *instance); + /* * Passive Replication Forward Declerations */ @@ -323,6 +329,9 @@ static void passive_token_target_set ( static void passive_ring_reenable ( struct totemrrp_instance *instance); +static int passive_mcast_recv_empty ( + struct totemrrp_instance *instance); + /* * Active Replication Forward Definitions */ @@ -381,6 +390,9 @@ static void active_token_target_set ( static void active_ring_reenable ( struct totemrrp_instance *instance); +static int active_mcast_recv_empty ( + struct totemrrp_instance *instance); + static void active_timer_expired_token_start ( struct active_instance *active_instance); @@ -406,7 +418,8 @@ struct rrp_algo none_algo = { .iface_check = none_iface_check, .processor_count_set = none_processor_count_set, .token_target_set = none_token_target_set, - .ring_reenable = none_ring_reenable + .ring_reenable = none_ring_reenable, + .mcast_recv_empty = none_mcast_recv_empty }; struct rrp_algo passive_algo = { @@ -422,7 +435,8 @@ struct rrp_algo passive_algo = { .iface_check = passive_iface_check, .processor_count_set = passive_processor_count_set, .token_target_set = passive_token_target_set, - .ring_reenable = passive_ring_reenable + .ring_reenable = passive_ring_reenable, + .mcast_recv_empty = passive_mcast_recv_empty }; struct rrp_algo active_algo = { @@ -438,7 +452,8 @@ struct rrp_algo active_algo = { .iface_check = active_iface_check, .processor_count_set = active_processor_count_set, .token_target_set = active_token_target_set, - .ring_reenable = active_ring_reenable + .ring_reenable = active_ring_reenable, + .mcast_recv_empty = active_mcast_recv_empty }; struct rrp_algo *rrp_algos[] = { @@ -558,6 +573,16 @@ static void none_ring_reenable ( */ } +static int none_mcast_recv_empty ( + struct totemrrp_instance *instance) +{ + int res; + + res = totemnet_recv_mcast_empty (instance->net_handles[0]); + + return (res); +} + /* * Passive Replication Implementation */ @@ -885,6 +910,26 @@ static void passive_token_target_set ( totemnet_token_target_set (instance->net_handles[iface_no], token_target); } +static int passive_mcast_recv_empty ( + struct totemrrp_instance *instance) +{ + int res; + int msgs_emptied = 0; + int i; + + for (i = 0; i < instance->interface_count; i++) { + res = totemnet_recv_mcast_empty (instance->net_handles[i]); + if (res == -1) { + return (-1); + } + if (res == 1) { + msgs_emptied = 1; + } + } + + return (msgs_emptied); +} + static void passive_ring_reenable ( struct totemrrp_instance *instance) { @@ -1236,6 +1281,26 @@ static void active_token_target_set ( totemnet_token_target_set (instance->net_handles[iface_no], token_target); } +static int active_mcast_recv_empty ( + struct totemrrp_instance *instance) +{ + int res; + int msgs_emptied = 0; + int i; + + for (i = 0; i < instance->interface_count; i++) { + res = totemnet_recv_mcast_empty (instance->net_handles[i]); + if (res == -1) { + return (-1); + } + if (res == 1) { + msgs_emptied = 1; + } + } + + return (msgs_emptied); +} + static void active_ring_reenable ( struct totemrrp_instance *instance) { @@ -1714,3 +1779,24 @@ int totemrrp_ring_reenable ( error_exit: return (res); } + +extern int totemrrp_mcast_recv_empty ( + totemrrp_handle handle) +{ + struct totemrrp_instance *instance; + int res; + + res = hdb_handle_get (&totemrrp_instance_database, handle, + (void *)&instance); + if (res != 0) { + res = ENOENT; + goto error_exit; + } + + res = instance->rrp_algo->mcast_recv_empty (instance); + + hdb_handle_put (&totemrrp_instance_database, handle); + +error_exit: + return (res); +} diff --git a/branches/whitetank/exec/totemrrp.h b/branches/whitetank/exec/totemrrp.h index fad81d7..e6fcba8 100644 --- a/branches/whitetank/exec/totemrrp.h +++ b/branches/whitetank/exec/totemrrp.h @@ -118,4 +118,7 @@ extern int totemrrp_ifaces_get ( extern int totemrrp_ring_reenable ( totemrrp_handle handle); +extern int totemrrp_mcast_recv_empty ( + totemrrp_handle handle); + #endif /* TOTEMRRP_H_DEFINED */ diff --git a/branches/whitetank/exec/totemsrp.c b/branches/whitetank/exec/totemsrp.c index 74f87e5..5f27927 100644 --- a/branches/whitetank/exec/totemsrp.c +++ b/branches/whitetank/exec/totemsrp.c @@ -401,6 +401,8 @@ struct totemsrp_instance { /* * Timers */ + poll_timer_handle timer_pause_timeout; + poll_timer_handle timer_orf_token_timeout; poll_timer_handle timer_orf_token_retransmit_timeout; @@ -495,6 +497,8 @@ struct totemsrp_instance { unsigned int my_pbl; unsigned int my_cbl; + + struct timeval pause_timestamp; }; struct message_handlers { @@ -570,6 +574,7 @@ static void memb_merge_detect_endian_convert ( struct memb_merge_detect *out); static void srp_addr_copy_endian_convert (struct srp_addr *out, struct srp_addr *in); static void timer_function_orf_token_timeout (void *data); +static void timer_function_pause_timeout (void *data); static void timer_function_heartbeat_timeout (void *data); static void timer_function_token_retransmit_timeout (void *data); static void timer_function_token_hold_retransmit_timeout (void *data); @@ -655,6 +660,30 @@ unsigned int main_msgs_missing (void) return (0); } +static int pause_flush (struct totemsrp_instance *instance) +{ + struct timeval now; + uint64_t now_msec; + uint64_t timestamp_msec; + int res = 0; + + gettimeofday (&now, NULL); + now_msec = ((now.tv_sec * 1000ULL) + (now.tv_usec / 1000ULL)); + timestamp_msec = ((instance->pause_timestamp.tv_sec * 1000ULL) + (instance->pause_timestamp.tv_usec/1000ULL)); + + if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) { + log_printf (instance->totemsrp_log_level_notice, + "Process pause detected for %lld ms, flushing membership messages.\n", (now_msec - timestamp_msec)); + /* + * -1 indicates an error from recvmsg + */ + do { + res = totemrrp_mcast_recv_empty (instance->totemrrp_handle); + } while (res == -1); + } + return (res); +} + /* * Exported interfaces */ @@ -781,6 +810,8 @@ int totemsrp_initialize ( instance->totemsrp_confchg_fn = confchg_fn; instance->use_heartbeat = 1; + gettimeofday (&instance->pause_timestamp, NULL); + if ( totem_config->heartbeat_failures_allowed == 0 ) { log_printf (instance->totemsrp_log_level_notice, "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0\n"); @@ -1335,6 +1366,16 @@ static void old_ring_state_reset (struct totemsrp_instance *instance) instance->old_ring_state_saved = 0; } +static void reset_pause_timeout (struct totemsrp_instance *instance) +{ + poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_pause_timeout); + poll_timer_add (instance->totemsrp_poll_handle, + instance->totem_config->token_timeout / 5, + (void *)instance, + timer_function_pause_timeout, + &instance->timer_pause_timeout); +} + static void reset_token_timeout (struct totemsrp_instance *instance) { poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout); poll_timer_add (instance->totemsrp_poll_handle, @@ -1415,6 +1456,14 @@ static void memb_merge_detect_transmit (struct totemsrp_instance *instance); /* * Timers used for various states of the membership algorithm */ +static void timer_function_pause_timeout (void *data) +{ + struct totemsrp_instance *instance = data; + + gettimeofday (&instance->pause_timestamp, NULL); + reset_pause_timeout (instance); +} + static void timer_function_orf_token_timeout (void *data) { struct totemsrp_instance *instance = (struct totemsrp_instance *)data; @@ -1680,6 +1729,8 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) instance->my_received_flg = 1; + reset_pause_timeout (instance); + return; } @@ -3961,6 +4012,14 @@ static int message_handler_memb_join ( } else { memb_join = (struct memb_join *)msg; } + /* + * If the process paused because it wasn't scheduled in a timely + * fashion, flush the join messages because they may be queued + * entries + */ + if (pause_flush (instance)) { + return (0); + } if (instance->token_ring_id_seq < memb_join->ring_seq) { instance->token_ring_id_seq = memb_join->ring_seq;
_______________________________________________ Openais mailing list Openais@lists.linux-foundation.org https://lists.linux-foundation.org/mailman/listinfo/openais