Hi Steve,
> Could you send me your backlog backoff calculation code (or preferably
> tarball of source tree)? I'd like to see what you have.
Attached is a patch that should apply to trunk, which contains the changes
we've made. It's based off Angus's patch you referred to, which holds onto the
token for longer (plus a reworked patch Angus did to use a poll mechanism).
My fudge tinkers with the token backlog in fcc_token_update() and
fcc_calculate(), and also uses a smaller MESSAGE_QUEUE_MAX.
Also attached is a log of the commit descriptions. If there are any problems
with the patch then I can send you a tarball.
One problem is that when you get more than one node that's congested, you get
lost tokens. We're only using up to 4 node clusters, so this shouldn't be a
huge problem for us.
The problem I'm looking into currently is that when the token is lost, it
doesn't recover that well (the recovery token keeps getting lost). I've added
extra checks so the backlog_calc/wait_for_delivery() changes are only done when
the ring is OPERATIONAL, but the problem's still happening.
Regards,
Tim
diff -Naur corosync-orig//exec/coroipcs.c corosync//exec/coroipcs.c
--- corosync-orig//exec/coroipcs.c 2010-04-01 13:26:05.000000000 +1300
+++ corosync//exec/coroipcs.c 2010-04-01 13:20:53.000000000 +1300
@@ -688,13 +683,14 @@
* parameter, such as an invalid size
*/
if (send_ok == -1) {
+ api->stats_increment_value (conn_info->stats_handle, "invalid_request");
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_INVALID_PARAM;
coroipcs_response_send (conn_info,
&coroipc_response_header,
sizeof (coroipc_response_header_t));
- } else
+ } else
if (send_ok) {
api->serialize_lock();
api->stats_increment_value (conn_info->stats_handle, "requests");
@@ -704,7 +700,7 @@
/*
* Overload, tell library to retry
*/
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
+ api->stats_increment_value (conn_info->stats_handle, "overload");
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -1248,6 +1191,21 @@
conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
}
+static int are_all_out_queues_empty (void)
+{
+ struct list_head *list;
+ struct conn_info *conn_info;
+
+ for (list = conn_info_list_head.next; list != &conn_info_list_head;
+ list = list->next) {
+
+ conn_info = list_entry (list, struct conn_info, list);
+ if (!list_empty (&conn_info->outq_head))
+ return CS_FALSE;
+ }
+ return CS_TRUE;
+}
+
/**
* simulate the behaviour in coroipcc.c
*/
@@ -1261,14 +1219,19 @@
}
if (conn_info->flow_control_state != new_fc) {
+ conn_info->flow_control_state = new_fc;
if (new_fc == 1) {
- log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control for %d, event %d\n",
+ log_printf (LOGSYS_LEVEL_INFO,
+ "Enabling flow control for %d, event %d\n",
conn_info->client_pid, event);
+ api->ipc_queue_state_change (1 /* == congested */);
} else {
- log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control for %d, event %d\n",
+ log_printf (LOGSYS_LEVEL_INFO,
+ "Disabling flow control for %d, event %d\n",
conn_info->client_pid, event);
+ if (are_all_out_queues_empty ())
+ api->ipc_queue_state_change (0 /* == flushed*/);
}
- conn_info->flow_control_state = new_fc;
api->stats_update_value (conn_info->stats_handle, "flow_control",
&conn_info->flow_control_state,
sizeof(conn_info->flow_control_state));
@@ -1458,6 +1421,7 @@
pthread_mutex_lock (&conn_info->mutex);
if (list_empty (&conn_info->outq_head)) {
conn_info->notify_flow_control_enabled = 1;
+ api->ipc_queue_state_change (1 /* == congested */);
api->poll_dispatch_modify (conn_info->fd,
POLLIN|POLLOUT|POLLNVAL);
}
diff -Naur corosync-orig//exec/main.c corosync//exec/main.c
--- corosync-orig//exec/main.c 2010-04-01 13:26:05.000000000 +1300
+++ corosync//exec/main.c 2010-04-01 12:35:33.000000000 +1300
@@ -475,6 +445,12 @@
stats = api->totem_get_stats();
+ objdb->object_key_replace (stats->hdr.handle,
+ "msg_reserved", strlen("msg_reserved"),
+ &stats->msg_reserved, sizeof (stats->msg_reserved));
+ objdb->object_key_replace (stats->hdr.handle,
+ "msg_queue_avail", strlen("msg_queue_avail"),
+ &stats->msg_queue_avail, sizeof (stats->msg_queue_avail));
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"orf_token_tx", strlen("orf_token_tx"),
&stats->mrp->srp->orf_token_tx, sizeof (stats->mrp->srp->orf_token_tx));
@@ -544,6 +520,9 @@
objdb->object_key_replace (stats->mrp->srp->hdr.handle,
"rx_msg_dropped", strlen("rx_msg_dropped"),
&stats->mrp->srp->rx_msg_dropped, sizeof (stats->mrp->srp->rx_msg_dropped));
+ objdb->object_key_replace (stats->mrp->srp->hdr.handle,
+ "token_retransmit", strlen("rx_msg_dropped"),
+ &stats->mrp->srp->token_retransmit, sizeof (stats->mrp->srp->token_retransmit));
total_mtt_rx_token = 0;
total_token_holdtime = 0;
@@ -619,7 +598,12 @@
objdb->object_create (stats->mrp->hdr.handle,
&stats->mrp->srp->hdr.handle,
"srp", strlen ("srp"));
-
+ objdb->object_key_create_typed (stats->hdr.handle,
+ "msg_reserved", &stats->msg_reserved,
+ sizeof (stats->msg_reserved), OBJDB_VALUETYPE_UINT32);
+ objdb->object_key_create_typed (stats->hdr.handle,
+ "msg_queue_avail", &stats->msg_queue_avail,
+ sizeof (stats->msg_queue_avail), OBJDB_VALUETYPE_UINT32);
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"orf_token_tx", &stats->mrp->srp->orf_token_tx,
sizeof (stats->mrp->srp->orf_token_tx), OBJDB_VALUETYPE_UINT64);
@@ -698,6 +682,9 @@
objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
"rx_msg_dropped", &zero_64,
sizeof (zero_64), OBJDB_VALUETYPE_UINT64);
+ objdb->object_key_create_typed (stats->mrp->srp->hdr.handle,
+ "token_retransmit", &zero_64,
+ sizeof (zero_64), OBJDB_VALUETYPE_UINT64);
}
/* start stats timer */
@@ -880,6 +867,10 @@
reserve_iovec.iov_base = (char *)header;
reserve_iovec.iov_len = header->size;
+ if (service == CONFDB_SERVICE) {
+ return 1;
+ }
+
pd->reserved_msgs = totempg_groups_joined_reserve (
corosync_group_handle,
&reserve_iovec, 1);
@@ -1044,6 +1029,16 @@
&zero_64, sizeof (zero_64),
OBJDB_VALUETYPE_UINT64);
+ objdb->object_key_create_typed (object_handle,
+ "invalid_request",
+ &zero_64, sizeof (zero_64),
+ OBJDB_VALUETYPE_UINT64);
+
+ objdb->object_key_create_typed (object_handle,
+ "overload",
+ &zero_64, sizeof (zero_64),
+ OBJDB_VALUETYPE_UINT64);
+
return object_handle;
}
@@ -1089,6 +1084,11 @@
&key_incr_dummy);
}
+static void corosync_queue_state_change (int queue_state)
+{
+ totempg_event_signal (TOTEM_EVENT_DELIVERY_CONGESTED, queue_state);
+}
+
static struct coroipcs_init_state_v2 ipc_init_state_v2 = {
.socket_name = COROSYNC_SOCKET_NAME,
.sched_policy = SCHED_OTHER,
@@ -1116,6 +1115,7 @@
.stats_update_value = corosync_stats_update_value,
.stats_increment_value = corosync_stats_increment_value,
.stats_decrement_value = corosync_stats_decrement_value,
+ .ipc_queue_state_change = corosync_queue_state_change,
};
static void corosync_setscheduler (void)
diff -Naur corosync-orig//exec/tlist.h corosync//exec/tlist.h
--- corosync-orig//exec/tlist.h 2009-12-15 12:45:03.000000000 +1300
+++ corosync//exec/tlist.h 2010-04-01 13:21:07.000000000 +1300
@@ -206,6 +206,9 @@
{
struct timerlist_timer *timer = (struct timerlist_timer *)_timer_handle;
+ if (timer == NULL)
+ return;
+
memset (timer->handle_addr, 0, sizeof (struct timerlist_timer *));
/*
* If the next timer after the currently expiring timer because
diff -Naur corosync-orig//exec/totempg.c corosync//exec/totempg.c
--- corosync-orig//exec/totempg.c 2009-12-15 12:45:03.000000000 +1300
+++ corosync//exec/totempg.c 2009-12-11 11:30:35.000000000 +1300
@@ -933,6 +933,7 @@
int avail = 0;
avail = totemmrp_avail ();
+ totempg_stats.msg_queue_avail = avail;
return ((avail - totempg_reserved) > msg_count);
}
@@ -957,6 +958,7 @@
msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
totempg_reserved += msg_count;
+ totempg_stats.msg_reserved = totempg_reserved;
return (msg_count);
}
@@ -965,6 +967,7 @@
int msg_count)
{
totempg_reserved -= msg_count;
+ totempg_stats.msg_reserved = totempg_reserved;
}
int totempg_callback_token_create (
diff -Naur corosync-orig//exec/totemsrp.c corosync//exec/totemsrp.c
--- corosync-orig//exec/totemsrp.c 2010-04-01 13:26:05.000000000 +1300
+++ corosync//exec/totemsrp.c 2010-04-01 13:14:36.000000000 +1300
@@ -499,6 +501,9 @@
struct memb_commit_token *commit_token;
+ int signal_fds[2];
+ int delivery_congested;
+
totemsrp_stats_t stats;
void * token_recv_event_handle;
void * token_sent_event_handle;
@@ -934,6 +939,12 @@
0,
token_event_stats_collector,
instance);
+
+ instance->delivery_congested = 0;
+ assert(pipe (instance->signal_fds) != -1);
+ fcntl (instance->signal_fds[0], F_SETFL, O_NONBLOCK);
+ fcntl (instance->signal_fds[1], F_SETFL, O_NONBLOCK);
+
*srp_context = instance;
return (0);
@@ -2092,9 +2089,19 @@
{
struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
- token_hold_cancel_send (instance);
+ switch (type) {
+ case TOTEM_EVENT_NEW_MSG:
+ token_hold_cancel_send (instance);
+ break;
- return;
+ case TOTEM_EVENT_DELIVERY_CONGESTED:
+ write (instance->signal_fds[1], &value, sizeof(int));
+ break;
+ default:
+ log_printf (instance->totemsrp_log_level_debug,
+ "totem: unknown event %d signaled\n", type);
+ break;
+ }
}
int totemsrp_mcast (
@@ -2110,7 +2117,7 @@
unsigned int addr_idx;
if (cs_queue_is_full (&instance->new_message_queue)) {
- log_printf (instance->totemsrp_log_level_debug, "queue full\n");
+ log_printf (instance->totemsrp_log_level_error, "queue full\n");
return (-1);
}
@@ -2519,6 +2526,7 @@
static void token_retransmit (struct totemsrp_instance *instance)
{
+ instance->stats.token_retransmit++;
totemrrp_token_send (instance->totemrrp_context,
instance->orf_token_retransmit,
instance->orf_token_retransmit_size);
@@ -3243,6 +3246,8 @@
{
unsigned int transmits_allowed;
unsigned int backlog_calc;
+ unsigned int backlog_sum;
+ unsigned int my_cbl;
transmits_allowed = instance->totem_config->max_messages;
@@ -3252,18 +3257,19 @@
instance->my_cbl = backlog_get (instance);
- /*
- * Only do backlog calculation if there is a backlog otherwise
- * we would result in div by zero
- */
- if (token->backlog + instance->my_cbl - instance->my_pbl) {
- backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
- (token->backlog + instance->my_cbl - instance->my_pbl);
- if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
+ backlog_sum = token->backlog + instance->my_cbl - instance->my_pbl;
+
+ /* guard against divide by zero */
+ if (backlog_sum != 0) {
+ my_cbl = MIN(instance->my_cbl, transmits_allowed);
+ backlog_calc = (instance->totem_config->window_size * my_cbl) / backlog_sum;
+ if (backlog_calc == 0) {
+ backlog_calc = 1; /* if < 1 (e.g. 0.5), round up */
+ }
+ if (transmits_allowed > backlog_calc) {
transmits_allowed = backlog_calc;
}
}
-
return (transmits_allowed);
}
@@ -3292,10 +3298,58 @@
unsigned int msgs_transmitted)
{
token->fcc += msgs_transmitted - instance->my_trc;
- token->backlog += instance->my_cbl - instance->my_pbl;
- assert (token->backlog >= 0);
instance->my_trc = msgs_transmitted;
- instance->my_pbl = instance->my_cbl;
+ if (instance->delivery_congested) {
+ token->backlog += instance->totem_config->window_size;
+ instance->my_pbl += instance->totem_config->window_size;
+ } else {
+ token->backlog += instance->my_cbl - instance->my_pbl;
+ instance->my_pbl = instance->my_cbl;
+ }
+ assert (token->backlog >= 0);
+}
+
+static void wait_for_delivery (struct totemsrp_instance *instance)
+{
+ struct pollfd signal_in;
+ int poll_res;
+ int congested[100];
+ int new_state;
+ size_t read_size;
+ int waited = 0;
+
+ signal_in.fd = instance->signal_fds[0];
+ signal_in.events = POLLIN;
+
+ poll_res = poll (&signal_in, 1, 0);
+ if (poll_res != 1 && instance->delivery_congested == 0) {
+ /* no message and we are not congested*/
+ return;
+ }
+
+ /* only poll again if there is not a message waiting */
+ if (poll_res == 0) {
+poll_wait_some_more:
+ waited = (2 * instance->totem_config->token_retransmit_timeout / 3);
+ poll_res = poll (&signal_in, 1, waited);
+ }
+ if (poll_res == 1 && signal_in.revents == POLLIN) {
+ read_size = read (signal_in.fd, congested,
+ sizeof (congested));
+ /* only get the last message */
+ new_state = congested[(read_size/sizeof (int))-1];
+
+ if (new_state != instance->delivery_congested) {
+ instance->delivery_congested = new_state;
+ log_printf (instance->totemsrp_log_level_notice,
+ "new congestion state:%d\n", instance->delivery_congested);
+ }
+ if (new_state == 1 && waited == 0) {
+ /* after the initial poll/read
+ * we are still congested */
+ goto poll_wait_some_more;
+ }
+ }
}
/*
@@ -3570,6 +3624,9 @@
}
}
+
+ wait_for_delivery (instance);
+
totemrrp_send_flush (instance->totemrrp_context);
token_send (instance, token, forward_token);
diff -Naur corosync-orig//include/corosync/coroipcs.h corosync//include/corosync/coroipcs.h
--- corosync-orig//include/corosync/coroipcs.h 2010-04-01 13:26:05.000000000 +1300
+++ corosync//include/corosync/coroipcs.h 2009-12-15 14:21:59.000000000 +1300
@@ -116,6 +116,7 @@
...) __attribute__((format(printf, 5, 6)));
int log_subsys_id;
void (*stats_decrement_value) (hdb_handle_t handle, const char* name);
+ void (*ipc_queue_state_change) (int queue_state);
};
extern void coroipcs_ipc_init (
diff -Naur corosync-orig//include/corosync/engine/coroapi.h corosync//include/corosync/engine/coroapi.h
--- corosync-orig//include/corosync/engine/coroapi.h 2010-04-01 13:26:05.000000000 +1300
+++ corosync//include/corosync/engine/coroapi.h 2010-04-01 13:13:01.000000000 +1300
@@ -70,7 +70,7 @@
#ifdef HAVE_SMALL_MEMORY_FOOTPRINT
#define PROCESSOR_COUNT_MAX 16
#define MESSAGE_SIZE_MAX 1024*64
-#define MESSAGE_QUEUE_MAX 512
+#define MESSAGE_QUEUE_MAX 64
#else
#define PROCESSOR_COUNT_MAX 384
#define MESSAGE_SIZE_MAX 1024*1024 /* (1MB) */
diff -Naur corosync-orig//include/corosync/totem/totem.h corosync//include/corosync/totem/totem.h
--- corosync-orig//include/corosync/totem/totem.h 2009-12-15 12:45:03.000000000 +1300
+++ corosync//include/corosync/totem/totem.h 2010-04-01 13:12:57.000000000 +1300
@@ -40,7 +40,7 @@
#ifdef HAVE_SMALL_MEMORY_FOOTPRINT
#define PROCESSOR_COUNT_MAX 16
#define MESSAGE_SIZE_MAX 1024*64
-#define MESSAGE_QUEUE_MAX 512
+#define MESSAGE_QUEUE_MAX 64
#else
#define PROCESSOR_COUNT_MAX 384
#define MESSAGE_SIZE_MAX 1024*1024 /* (1MB) */
@@ -240,6 +240,7 @@
uint64_t recovery_token_lost;
uint64_t consensus_timeouts;
uint64_t rx_msg_dropped;
+ uint64_t token_retransmit;
int earliest_token;
int latest_token;
@@ -256,6 +257,8 @@
typedef struct {
totem_stats_header_t hdr;
totemmrp_stats_t *mrp;
+ uint32_t msg_reserved;
+ uint32_t msg_queue_avail;
} totempg_stats_t;
#endif /* TOTEM_H_DEFINED */
Improve corosync performance underheavy congestion
Corosync CPG flow control isn't end-to-end. So a slow node could get a massive
queue of CPG messages backed up and the faster node wouldn't back-off (eventually
the process on the slow node would healthcheck).
Force backoff of other nodes in the cluster by incrementing the token backlog
on each token rotation until congestion eases. This gradually reduces how much
other nodes can transmit on each token rotation until they can only transmit one
message per rotation (note corosync bundles messages, so this one message may
still contain several CPG messages). When the congestion eases, the token gets
reset back to what it was previously.
Tweak the backlog calc so that it uses the current backlog insetad of the previous
backlog (as per totem spec). This should mean it's always non-zero if there's
something to send (it was just skipped if the node sent no messages on the last
token rotation). Also if backlog_calc is between 0 and 1, round it up to 1 rather
than ignoring the backlog_calc completely.
----------------------------------------------------------------------------------
Reduce the max queue size (MESSAGE_QUEUE_MAX) for transmitting messages
This define is still quite high relative to how much the other defines were
decreased for low memory footprints. This is the number of totem messages (each
totem message can contain several CPG messages) that can get queued for
transmission before the app sending gets a TRY_AGAIN error. A shorter queue means
the app will get feedback quicker that a node is congested and backoff (however
it also means that no more totem/CPG messages can be transmitted, no matter how
important they are).
The long queue length meant that a faster node could queue up several hundred CPG
messages and then really bombard a slow node with them as soon as congestion eased.
----------------------------------------------------------------------------------
Alternative approach to token flowcontrol.
Rework Angus's patch ('totem: allow orf token reception to be delayed') so that when
a node is congested totem will hold the token until congestion has eased (or for max
2/3s of the token retransmit timeout, so the token doesn't get lost). This is done
using a pipe to poll() until the threads delivering the messages have finished.
This does not completely solve the flow control problem, because the other nodes
will still be queueing CPG messages even though they don't have the token, and
they'll then send max_messages the next time they get the token. (So the slow node
still needs to process max_messages plus clear some of its backlog in the time
that it holds the token).
----------------------------------------------------------------------------------
Add more stats to help with flow control.
----------------------------------------------------------------------------------
totem: allow orf token reception to be delayed
(as at http://www.mail-archive.com/[email protected]/msg03811.html)
Delay token processing for nodes with delivery congestion.
If a node is processing it's delivered messages slower than
other nodes (or one app is slower to receive than another app)
then our delivery queues will get congested. The solution here
is to delay the reception of the token so that the applications
get time to process their new messages.
Results: much improved but ...
If you really throttle the dispatch then the "token holding"
times out and ipc queues keep filling up (but much slower).
----------------------------------------------------------------------------------_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais