Delay token processing for nodes with delivery congestion. If a node is processing it's delivered messages slower than other nodes (or one app is slower to receive than another app) then our delivery queues will get congested. The solution here is to delay the reception of the token so that the applications get time to process their new messages.
Results: much improved but ... If you really throttle the dispatch then the "token holding" times out and ipc queues keep filling up (but much slower). Signed-off-by: Angus Salkeld <[email protected]> --- exec/coroipcs.c | 20 ++++- exec/main.c | 6 ++ exec/totemsrp.c | 193 ++++++++++++++++++++++++++++++++++--------- include/corosync/coroipcs.h | 1 + 4 files changed, 178 insertions(+), 42 deletions(-) diff --git a/exec/coroipcs.c b/exec/coroipcs.c index 54448cc..4379afa 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -1189,6 +1189,20 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size; } +static int are_all_out_queues_empty (void) +{ + struct list_head *list; + struct conn_info *conn_info; + + for (list = conn_info_list_head.next; list != &conn_info_list_head; + list = list->next) { + + conn_info = list_entry (list, struct conn_info, list); + if (!list_empty (&conn_info->outq_head)) + return CS_FALSE; + } + return CS_TRUE; +} /** * simulate the behaviour in coroipcc.c */ @@ -1202,14 +1216,17 @@ static int flow_control_event_send (struct conn_info *conn_info, char event) } if (conn_info->flow_control_state != new_fc) { + conn_info->flow_control_state = new_fc; if (new_fc == 1) { log_printf (LOGSYS_LEVEL_INFO, "Enabling flow control for %d, event %d\n", conn_info->client_pid, event); + api->ipc_queue_state_change (1 /* == congested */); } else { log_printf (LOGSYS_LEVEL_INFO, "Disabling flow control for %d, event %d\n", conn_info->client_pid, event); + if (are_all_out_queues_empty ()) + api->ipc_queue_state_change (0 /* == flushed*/); } - conn_info->flow_control_state = new_fc; api->stats_update_value (conn_info->stats_handle, "flow_control", &conn_info->flow_control_state, sizeof(conn_info->flow_control_state)); @@ -1399,6 +1416,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int pthread_mutex_lock (&conn_info->mutex); if (list_empty (&conn_info->outq_head)) { conn_info->notify_flow_control_enabled = 1; + api->ipc_queue_state_change (1 /* == congested */); api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLOUT|POLLNVAL); } diff --git a/exec/main.c b/exec/main.c index ee31bf6..e56ff18 100644 --- a/exec/main.c +++ b/exec/main.c @@ -1054,6 +1054,11 @@ static void corosync_stats_decrement_value (hdb_handle_t handle, &key_incr_dummy); } +static void corosync_queue_state_change (int queue_state) +{ + totempg_event_signal (TOTEM_EVENT_DELIVERY_CONGESTED, queue_state); +} + static struct coroipcs_init_state_v2 ipc_init_state_v2 = { .socket_name = COROSYNC_SOCKET_NAME, .sched_policy = SCHED_OTHER, @@ -1080,6 +1085,7 @@ static struct coroipcs_init_state_v2 ipc_init_state_v2 = { .stats_update_value = corosync_stats_update_value, .stats_increment_value = corosync_stats_increment_value, .stats_decrement_value = corosync_stats_decrement_value, + .ipc_queue_state_change = corosync_queue_state_change, }; static void corosync_setscheduler (void) diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 23a1732..9abb1cd 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -418,6 +418,8 @@ struct totemsrp_instance { poll_timer_handle timer_heartbeat_timeout; + poll_timer_handle timer_delayed_orf_token_processing_timeout; + /* * Function and data used to log messages */ @@ -467,6 +469,10 @@ struct totemsrp_instance { int my_token_held; + int forward_token; + + int delivery_congested; + unsigned long long token_ring_id_seq; unsigned int last_released; @@ -505,6 +511,7 @@ struct totemsrp_instance { void * token_recv_event_handle; void * token_sent_event_handle; char commit_token_storage[9000]; + char orf_token_storage[1500]; }; struct message_handlers { @@ -607,6 +614,8 @@ static void timer_function_heartbeat_timeout (void *data); static void timer_function_token_retransmit_timeout (void *data); static void timer_function_token_hold_retransmit_timeout (void *data); static void timer_function_merge_detect_timeout (void *data); +static void timer_function_delayed_orf_token_processing_timeout (void *data); +static void complete_orf_token_processing (void *data); void main_deliver_fn ( void *context, @@ -665,6 +674,10 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->my_high_delivered = SEQNO_START_MSG; instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; + + instance->delivery_congested = 0; + + instance->timer_delayed_orf_token_processing_timeout = 0; } static void main_token_seqid_get ( @@ -1476,6 +1489,15 @@ static void start_token_hold_retransmit_timeout (struct totemsrp_instance *insta &instance->timer_orf_token_hold_retransmit_timeout); } +static void start_orf_token_delay_timeout (struct totemsrp_instance *instance, int timeout) +{ + poll_timer_add (instance->totemsrp_poll_handle, + timeout, + (void *)instance, + timer_function_delayed_orf_token_processing_timeout, + &instance->timer_delayed_orf_token_processing_timeout); +} + static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance) { poll_timer_delete (instance->totemsrp_poll_handle, @@ -2090,13 +2112,55 @@ originated: return; } +static void timer_function_delayed_orf_token_processing_timeout (void *data) +{ + struct totemsrp_instance *instance = data; + + if (instance->delivery_congested == 1) { + log_printf (instance->totemsrp_log_level_debug, + "orf token delay timer expired: delivery congestion not cleared\n"); + } else { + log_printf (instance->totemsrp_log_level_debug, + "orf token delay timer expired: delivery congestion cleared\n"); + } + instance->timer_delayed_orf_token_processing_timeout = 0; + complete_orf_token_processing (instance); +} + void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; - token_hold_cancel_send (instance); + switch (type) { + case TOTEM_EVENT_NEW_MSG: + token_hold_cancel_send (instance); + break; - return (0); + case TOTEM_EVENT_DELIVERY_CONGESTED: + if (value == 0) + instance->delivery_congested = 0; + else + instance->delivery_congested = 1; + + log_printf (instance->totemsrp_log_level_debug, + "delivery congestion event %d\n", instance->delivery_congested); + + if (instance->timer_delayed_orf_token_processing_timeout != 0 && + instance->delivery_congested == 0) { + poll_timer_delete (instance->totemsrp_poll_handle, + instance->timer_delayed_orf_token_processing_timeout); + instance->timer_delayed_orf_token_processing_timeout = 0; + log_printf (instance->totemsrp_log_level_debug, + "Cleared delivery congestion\n"); + + complete_orf_token_processing (instance); + } + break; + default: + log_printf (instance->totemsrp_log_level_debug, + "totem: unknown event %d signaled\n", type); + break; + } } int totemsrp_mcast ( @@ -2587,8 +2651,7 @@ static void timer_function_merge_detect_timeout(void *data) */ static int token_send ( struct totemsrp_instance *instance, - struct orf_token *orf_token, - int forward_token) + struct orf_token *orf_token) { int res = 0; unsigned int orf_token_size; @@ -2601,7 +2664,7 @@ static int token_send ( orf_token->header.nodeid = instance->my_id.addr[0].nodeid; assert (orf_token->header.nodeid); - if (forward_token == 0) { + if (instance->forward_token == 0) { return (0); } @@ -2676,7 +2739,8 @@ static int orf_token_send_initial (struct totemsrp_instance *instance) orf_token.rtr_list_entries = 0; - res = token_send (instance, &orf_token, 1); + instance->forward_token = 1; + res = token_send (instance, &orf_token); return (res); } @@ -3309,10 +3373,8 @@ static int message_handler_orf_token ( size_t msg_len, int endian_conversion_needed) { - char token_storage[1500]; char token_convert[1500]; struct orf_token *token = NULL; - int forward_token; unsigned int transmits_allowed; unsigned int mcasted_retransmit; unsigned int mcasted_regular; @@ -3336,6 +3398,18 @@ static int message_handler_orf_token ( } #endif + if (instance->timer_delayed_orf_token_processing_timeout != 0) { + log_printf (instance->totemsrp_log_level_debug, + "Received orf token whilst delay timer pending.\n"); + + poll_timer_delete (instance->totemsrp_poll_handle, + instance->timer_delayed_orf_token_processing_timeout); + instance->timer_delayed_orf_token_processing_timeout = 0; + + complete_orf_token_processing (instance); + } + + if (endian_conversion_needed) { orf_token_endian_convert ((struct orf_token *)msg, (struct orf_token *)token_convert); @@ -3346,7 +3420,7 @@ static int message_handler_orf_token ( * Make copy of token and retransmit list in case we have * to flush incoming messages from the kernel queue */ - token = (struct orf_token *)token_storage; + token = (struct orf_token *)instance->orf_token_storage; memcpy (token, msg, sizeof (struct orf_token)); memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token), sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX); @@ -3391,10 +3465,10 @@ static int message_handler_orf_token ( * Hold onto token when there is no activity on ring and * this processor is the ring rep */ - forward_token = 1; + instance->forward_token = 1; if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) { if (instance->my_token_held) { - forward_token = 0; + instance->forward_token = 0; } } @@ -3422,7 +3496,7 @@ static int message_handler_orf_token ( if (memcmp (&token->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)) != 0) { - if ((forward_token) + if ((instance->forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } @@ -3446,7 +3520,7 @@ static int message_handler_orf_token ( */ reset_token_timeout (instance); - if ((forward_token) + if ((instance->forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } @@ -3565,41 +3639,25 @@ printf ("token seq %d\n", token->seq); instance->my_retrans_flg_count = 0; } } - - totemrrp_send_flush (instance->totemrrp_context); - token_send (instance, token, forward_token); - -#ifdef GIVEINFO - tv_current = timerlist_nano_current_get (); - tv_diff = tv_current - tv_old; - tv_old = tv_current; - log_printf (instance->totemsrp_log_level_debug, - "I held %0.4f ms\n", - ((float)tv_diff) / 1000000.0); -#endif - if (instance->memb_state == MEMB_STATE_OPERATIONAL) { - messages_deliver_to_app (instance, 0, - instance->my_high_seq_received); - } - /* - * Deliver messages after token has been transmitted - * to improve performance + * delay token processing for nodes with delivery congestion + * If a node is processing it's delivered messages slower than + * other nodes (or one app is slower to receive than another app) + * then our delivery queues will get congested. The solution here + * is to delay the reception of the token so that the applications + * get time to process their new messages. */ - reset_token_timeout (instance); // REVIEWED - reset_token_retransmit_timeout (instance); // REVIEWED - if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) && - instance->my_token_held == 1) { - - start_token_hold_retransmit_timeout (instance); + if (instance->delivery_congested == 1) { + start_orf_token_delay_timeout (instance, instance->totem_config->token_timeout - 250); + } else { + complete_orf_token_processing (instance); } - - token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT); + return (0); } break; } - if ((forward_token) + if ((instance->forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } @@ -3608,6 +3666,59 @@ printf ("token seq %d\n", token->seq); } return (0); + +} + +static void complete_orf_token_processing (void *data) +{ + struct totemsrp_instance *instance = data; + struct orf_token *token = NULL; +#ifdef GIVEINFO + unsigned long long tv_current; + unsigned long long tv_diff; +#endif + + token = (struct orf_token *)instance->orf_token_storage; + + totemrrp_send_flush (instance->totemrrp_context); + token_send (instance, token); + +#ifdef GIVEINFO + tv_current = timerlist_nano_current_get (); + tv_diff = tv_current - tv_old; + tv_old = tv_current; + log_printf (instance->totemsrp_log_level_debug, + "I held %0.4f ms\n", + ((float)tv_diff) / 1000000.0); +#endif + if (instance->memb_state == MEMB_STATE_OPERATIONAL) { + messages_deliver_to_app (instance, 0, + instance->my_high_seq_received); + } + + /* + * Deliver messages after token has been transmitted + * to improve performance + */ + reset_token_timeout (instance); // REVIEWED + reset_token_retransmit_timeout (instance); // REVIEWED + if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) && + instance->my_token_held == 1) { + + start_token_hold_retransmit_timeout (instance); + } + + token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT); + + + if ((instance->forward_token) + && instance->use_heartbeat) { + reset_heartbeat_timeout(instance); + } + else { + cancel_heartbeat_timeout(instance); + } + return; } static void messages_deliver_to_app ( diff --git a/include/corosync/coroipcs.h b/include/corosync/coroipcs.h index 3838af8..9904609 100644 --- a/include/corosync/coroipcs.h +++ b/include/corosync/coroipcs.h @@ -116,6 +116,7 @@ struct coroipcs_init_state_v2 { ...) __attribute__((format(printf, 5, 6))); int log_subsys_id; void (*stats_decrement_value) (hdb_handle_t handle, const char* name); + void (*ipc_queue_state_change) (int queue_state); }; extern void coroipcs_ipc_init ( -- 1.6.3.4
>From e7a407d554871c6043a5ab6f3e228247d8dcf078 Mon Sep 17 00:00:00 2001 From: Angus Salkeld <[email protected]> Date: Tue, 1 Dec 2009 07:35:50 +1300 Subject: [PATCH] totem: allow orf token reception to be delayed. Delay token processing for nodes with delivery congestion. If a node is processing it's delivered messages slower than other nodes (or one app is slower to receive than another app) then our delivery queues will get congested. The solution here is to delay the reception of the token so that the applications get time to process their new messages. Results: much improved but ... If you really throttle the dispatch then the "token holding" times out and ipc queues keep filling up (but much slower). Signed-off-by: Angus Salkeld <[email protected]> --- exec/coroipcs.c | 20 ++++- exec/main.c | 6 ++ exec/totemsrp.c | 193 ++++++++++++++++++++++++++++++++++--------- include/corosync/coroipcs.h | 1 + 4 files changed, 178 insertions(+), 42 deletions(-) diff --git a/exec/coroipcs.c b/exec/coroipcs.c index 54448cc..4379afa 100644 --- a/exec/coroipcs.c +++ b/exec/coroipcs.c @@ -1189,6 +1189,20 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size; } +static int are_all_out_queues_empty (void) +{ + struct list_head *list; + struct conn_info *conn_info; + + for (list = conn_info_list_head.next; list != &conn_info_list_head; + list = list->next) { + + conn_info = list_entry (list, struct conn_info, list); + if (!list_empty (&conn_info->outq_head)) + return CS_FALSE; + } + return CS_TRUE; +} /** * simulate the behaviour in coroipcc.c */ @@ -1202,14 +1216,17 @@ static int flow_control_event_send (struct conn_info *conn_info, char event) } if (conn_info->flow_control_state != new_fc) { + conn_info->flow_control_state = new_fc; if (new_fc == 1) { log_printf (LOGSYS_LEVEL_INFO, "Enabling flow control for %d, event %d\n", conn_info->client_pid, event); + api->ipc_queue_state_change (1 /* == congested */); } else { log_printf (LOGSYS_LEVEL_INFO, "Disabling flow control for %d, event %d\n", conn_info->client_pid, event); + if (are_all_out_queues_empty ()) + api->ipc_queue_state_change (0 /* == flushed*/); } - conn_info->flow_control_state = new_fc; api->stats_update_value (conn_info->stats_handle, "flow_control", &conn_info->flow_control_state, sizeof(conn_info->flow_control_state)); @@ -1399,6 +1416,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int pthread_mutex_lock (&conn_info->mutex); if (list_empty (&conn_info->outq_head)) { conn_info->notify_flow_control_enabled = 1; + api->ipc_queue_state_change (1 /* == congested */); api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLOUT|POLLNVAL); } diff --git a/exec/main.c b/exec/main.c index ee31bf6..e56ff18 100644 --- a/exec/main.c +++ b/exec/main.c @@ -1054,6 +1054,11 @@ static void corosync_stats_decrement_value (hdb_handle_t handle, &key_incr_dummy); } +static void corosync_queue_state_change (int queue_state) +{ + totempg_event_signal (TOTEM_EVENT_DELIVERY_CONGESTED, queue_state); +} + static struct coroipcs_init_state_v2 ipc_init_state_v2 = { .socket_name = COROSYNC_SOCKET_NAME, .sched_policy = SCHED_OTHER, @@ -1080,6 +1085,7 @@ static struct coroipcs_init_state_v2 ipc_init_state_v2 = { .stats_update_value = corosync_stats_update_value, .stats_increment_value = corosync_stats_increment_value, .stats_decrement_value = corosync_stats_decrement_value, + .ipc_queue_state_change = corosync_queue_state_change, }; static void corosync_setscheduler (void) diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 23a1732..9abb1cd 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -418,6 +418,8 @@ struct totemsrp_instance { poll_timer_handle timer_heartbeat_timeout; + poll_timer_handle timer_delayed_orf_token_processing_timeout; + /* * Function and data used to log messages */ @@ -467,6 +469,10 @@ struct totemsrp_instance { int my_token_held; + int forward_token; + + int delivery_congested; + unsigned long long token_ring_id_seq; unsigned int last_released; @@ -505,6 +511,7 @@ struct totemsrp_instance { void * token_recv_event_handle; void * token_sent_event_handle; char commit_token_storage[9000]; + char orf_token_storage[1500]; }; struct message_handlers { @@ -607,6 +614,8 @@ static void timer_function_heartbeat_timeout (void *data); static void timer_function_token_retransmit_timeout (void *data); static void timer_function_token_hold_retransmit_timeout (void *data); static void timer_function_merge_detect_timeout (void *data); +static void timer_function_delayed_orf_token_processing_timeout (void *data); +static void complete_orf_token_processing (void *data); void main_deliver_fn ( void *context, @@ -665,6 +674,10 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->my_high_delivered = SEQNO_START_MSG; instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; + + instance->delivery_congested = 0; + + instance->timer_delayed_orf_token_processing_timeout = 0; } static void main_token_seqid_get ( @@ -1476,6 +1489,15 @@ static void start_token_hold_retransmit_timeout (struct totemsrp_instance *insta &instance->timer_orf_token_hold_retransmit_timeout); } +static void start_orf_token_delay_timeout (struct totemsrp_instance *instance, int timeout) +{ + poll_timer_add (instance->totemsrp_poll_handle, + timeout, + (void *)instance, + timer_function_delayed_orf_token_processing_timeout, + &instance->timer_delayed_orf_token_processing_timeout); +} + static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance) { poll_timer_delete (instance->totemsrp_poll_handle, @@ -2090,13 +2112,55 @@ originated: return; } +static void timer_function_delayed_orf_token_processing_timeout (void *data) +{ + struct totemsrp_instance *instance = data; + + if (instance->delivery_congested == 1) { + log_printf (instance->totemsrp_log_level_debug, + "orf token delay timer expired: delivery congestion not cleared\n"); + } else { + log_printf (instance->totemsrp_log_level_debug, + "orf token delay timer expired: delivery congestion cleared\n"); + } + instance->timer_delayed_orf_token_processing_timeout = 0; + complete_orf_token_processing (instance); +} + void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; - token_hold_cancel_send (instance); + switch (type) { + case TOTEM_EVENT_NEW_MSG: + token_hold_cancel_send (instance); + break; - return (0); + case TOTEM_EVENT_DELIVERY_CONGESTED: + if (value == 0) + instance->delivery_congested = 0; + else + instance->delivery_congested = 1; + + log_printf (instance->totemsrp_log_level_debug, + "delivery congestion event %d\n", instance->delivery_congested); + + if (instance->timer_delayed_orf_token_processing_timeout != 0 && + instance->delivery_congested == 0) { + poll_timer_delete (instance->totemsrp_poll_handle, + instance->timer_delayed_orf_token_processing_timeout); + instance->timer_delayed_orf_token_processing_timeout = 0; + log_printf (instance->totemsrp_log_level_debug, + "Cleared delivery congestion\n"); + + complete_orf_token_processing (instance); + } + break; + default: + log_printf (instance->totemsrp_log_level_debug, + "totem: unknown event %d signaled\n", type); + break; + } } int totemsrp_mcast ( @@ -2587,8 +2651,7 @@ static void timer_function_merge_detect_timeout(void *data) */ static int token_send ( struct totemsrp_instance *instance, - struct orf_token *orf_token, - int forward_token) + struct orf_token *orf_token) { int res = 0; unsigned int orf_token_size; @@ -2601,7 +2664,7 @@ static int token_send ( orf_token->header.nodeid = instance->my_id.addr[0].nodeid; assert (orf_token->header.nodeid); - if (forward_token == 0) { + if (instance->forward_token == 0) { return (0); } @@ -2676,7 +2739,8 @@ static int orf_token_send_initial (struct totemsrp_instance *instance) orf_token.rtr_list_entries = 0; - res = token_send (instance, &orf_token, 1); + instance->forward_token = 1; + res = token_send (instance, &orf_token); return (res); } @@ -3309,10 +3373,8 @@ static int message_handler_orf_token ( size_t msg_len, int endian_conversion_needed) { - char token_storage[1500]; char token_convert[1500]; struct orf_token *token = NULL; - int forward_token; unsigned int transmits_allowed; unsigned int mcasted_retransmit; unsigned int mcasted_regular; @@ -3336,6 +3398,18 @@ static int message_handler_orf_token ( } #endif + if (instance->timer_delayed_orf_token_processing_timeout != 0) { + log_printf (instance->totemsrp_log_level_debug, + "Received orf token whilst delay timer pending.\n"); + + poll_timer_delete (instance->totemsrp_poll_handle, + instance->timer_delayed_orf_token_processing_timeout); + instance->timer_delayed_orf_token_processing_timeout = 0; + + complete_orf_token_processing (instance); + } + + if (endian_conversion_needed) { orf_token_endian_convert ((struct orf_token *)msg, (struct orf_token *)token_convert); @@ -3346,7 +3420,7 @@ static int message_handler_orf_token ( * Make copy of token and retransmit list in case we have * to flush incoming messages from the kernel queue */ - token = (struct orf_token *)token_storage; + token = (struct orf_token *)instance->orf_token_storage; memcpy (token, msg, sizeof (struct orf_token)); memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token), sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX); @@ -3391,10 +3465,10 @@ static int message_handler_orf_token ( * Hold onto token when there is no activity on ring and * this processor is the ring rep */ - forward_token = 1; + instance->forward_token = 1; if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) { if (instance->my_token_held) { - forward_token = 0; + instance->forward_token = 0; } } @@ -3422,7 +3496,7 @@ static int message_handler_orf_token ( if (memcmp (&token->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)) != 0) { - if ((forward_token) + if ((instance->forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } @@ -3446,7 +3520,7 @@ static int message_handler_orf_token ( */ reset_token_timeout (instance); - if ((forward_token) + if ((instance->forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } @@ -3565,41 +3639,25 @@ printf ("token seq %d\n", token->seq); instance->my_retrans_flg_count = 0; } } - - totemrrp_send_flush (instance->totemrrp_context); - token_send (instance, token, forward_token); - -#ifdef GIVEINFO - tv_current = timerlist_nano_current_get (); - tv_diff = tv_current - tv_old; - tv_old = tv_current; - log_printf (instance->totemsrp_log_level_debug, - "I held %0.4f ms\n", - ((float)tv_diff) / 1000000.0); -#endif - if (instance->memb_state == MEMB_STATE_OPERATIONAL) { - messages_deliver_to_app (instance, 0, - instance->my_high_seq_received); - } - /* - * Deliver messages after token has been transmitted - * to improve performance + * delay token processing for nodes with delivery congestion + * If a node is processing it's delivered messages slower than + * other nodes (or one app is slower to receive than another app) + * then our delivery queues will get congested. The solution here + * is to delay the reception of the token so that the applications + * get time to process their new messages. */ - reset_token_timeout (instance); // REVIEWED - reset_token_retransmit_timeout (instance); // REVIEWED - if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) && - instance->my_token_held == 1) { - - start_token_hold_retransmit_timeout (instance); + if (instance->delivery_congested == 1) { + start_orf_token_delay_timeout (instance, instance->totem_config->token_timeout - 250); + } else { + complete_orf_token_processing (instance); } - - token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT); + return (0); } break; } - if ((forward_token) + if ((instance->forward_token) && instance->use_heartbeat) { reset_heartbeat_timeout(instance); } @@ -3608,6 +3666,59 @@ printf ("token seq %d\n", token->seq); } return (0); + +} + +static void complete_orf_token_processing (void *data) +{ + struct totemsrp_instance *instance = data; + struct orf_token *token = NULL; +#ifdef GIVEINFO + unsigned long long tv_current; + unsigned long long tv_diff; +#endif + + token = (struct orf_token *)instance->orf_token_storage; + + totemrrp_send_flush (instance->totemrrp_context); + token_send (instance, token); + +#ifdef GIVEINFO + tv_current = timerlist_nano_current_get (); + tv_diff = tv_current - tv_old; + tv_old = tv_current; + log_printf (instance->totemsrp_log_level_debug, + "I held %0.4f ms\n", + ((float)tv_diff) / 1000000.0); +#endif + if (instance->memb_state == MEMB_STATE_OPERATIONAL) { + messages_deliver_to_app (instance, 0, + instance->my_high_seq_received); + } + + /* + * Deliver messages after token has been transmitted + * to improve performance + */ + reset_token_timeout (instance); // REVIEWED + reset_token_retransmit_timeout (instance); // REVIEWED + if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) && + instance->my_token_held == 1) { + + start_token_hold_retransmit_timeout (instance); + } + + token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT); + + + if ((instance->forward_token) + && instance->use_heartbeat) { + reset_heartbeat_timeout(instance); + } + else { + cancel_heartbeat_timeout(instance); + } + return; } static void messages_deliver_to_app ( diff --git a/include/corosync/coroipcs.h b/include/corosync/coroipcs.h index 3838af8..9904609 100644 --- a/include/corosync/coroipcs.h +++ b/include/corosync/coroipcs.h @@ -116,6 +116,7 @@ struct coroipcs_init_state_v2 { ...) __attribute__((format(printf, 5, 6))); int log_subsys_id; void (*stats_decrement_value) (hdb_handle_t handle, const char* name); + void (*ipc_queue_state_change) (int queue_state); }; extern void coroipcs_ipc_init ( -- 1.6.3.4
_______________________________________________ Openais mailing list [email protected] https://lists.linux-foundation.org/mailman/listinfo/openais
