This is take 1 of this patch. Needs bsd port. Will work on that soon.
Wanted to get some feedback on the code.
This fixes a problem where flow control and dispatch are intermixed into
the same stream of data, resulting in a lockup of the ipc system under
really heavy load. Can't really duplicate in corosync today unless
running with some experimental patches which make cpg_mcast_joined async
while triggering flow control on and off states.
Regards
-steve
Index: test/cpgbench.c
===================================================================
--- test/cpgbench.c (revision 2971)
+++ test/cpgbench.c (working copy)
@@ -50,6 +50,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
+#include <pthread.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
@@ -79,6 +80,7 @@
static unsigned int write_count;
+static int my_write = 0;
static void cpg_bm_deliver_fn (
cpg_handle_t handle,
const struct cpg_name *group_name,
@@ -97,6 +99,16 @@
static char data[500000];
+static void *dispatch_thread (void *param)
+{
+ int res;
+ cpg_handle_t handle = *(cpg_handle_t *)param;
+
+ res = cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
+printf ("thread exiting\n");
+ return (0);
+}
+
static void cpg_benchmark (
cpg_handle_t handle,
int write_size)
@@ -104,7 +116,6 @@
struct timeval tv1, tv2, tv_elapsed;
struct iovec iov;
unsigned int res;
- cpg_flow_control_state_t flow_control_state;
alarm_notice = 0;
iov.iov_base = data;
@@ -118,19 +129,11 @@
/*
* Test checkpoint write
*/
- cpg_flow_control_state_get (handle, &flow_control_state);
- if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
retry:
- res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov,
1);
- if (res == CS_ERR_TRY_AGAIN) {
- goto retry;
- }
+ res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
+ if (res == CS_ERR_TRY_AGAIN) {
+ goto retry;
}
- res = cpg_dispatch (handle, CS_DISPATCH_ALL);
- if (res != CS_OK) {
- printf ("cpg dispatch returned error %d\n", res);
- exit (1);
- }
} while (alarm_notice == 0);
gettimeofday (&tv2, NULL);
timersub (&tv2, &tv1, &tv_elapsed);
@@ -160,14 +163,17 @@
unsigned int size;
int i;
unsigned int res;
+ pthread_t thread;
- size = 1000;
+
+ size = 1;
signal (SIGALRM, sigalrm_handler);
res = cpg_initialize (&handle, &callbacks);
if (res != CS_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
+ pthread_create (&thread, NULL, dispatch_thread, &handle);
res = cpg_join (handle, &group_name);
if (res != CS_OK) {
@@ -183,7 +189,7 @@
res = cpg_finalize (handle);
if (res != CS_OK) {
- printf ("cpg_join failed with result %d\n", res);
+ printf ("cpg_finalize failed with result %d\n", res);
exit (1);
}
return (0);
Index: include/corosync/coroipc_ipc.h
===================================================================
--- include/corosync/coroipc_ipc.h (revision 2971)
+++ include/corosync/coroipc_ipc.h (working copy)
@@ -74,7 +74,9 @@
sem_t sem0;
sem_t sem1;
sem_t sem2;
+ sem_t sem3;
#endif
+ int flow_control_enabled;
};
enum res_init_types {
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c (revision 2971)
+++ exec/coroipcs.c (working copy)
@@ -94,6 +94,9 @@
#define MSG_SEND_LOCKED 0
#define MSG_SEND_UNLOCKED 1
+#define POLL_STATE_IN 1
+#define POLL_STATE_INOUT 2
+
static struct coroipcs_init_state_v2 *api = NULL;
DECLARE_LIST_INIT (conn_info_list_head);
@@ -140,7 +143,6 @@
pthread_attr_t thread_attr;
unsigned int service;
enum conn_state state;
- int notify_flow_control_enabled;
int flow_control_state;
int refcount;
hdb_handle_t stats_handle;
@@ -165,6 +167,7 @@
unsigned int setup_bytes_read;
struct list_head zcb_mapped_list_head;
char *sending_allowed_private_data[64];
+ int poll_state;
};
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -659,6 +662,8 @@
#endif
for (;;) {
+ int sem_value;
+
#if _POSIX_THREAD_PROCESS_SHARED > 0
retry_semwait:
res = sem_wait (&conn_info->control_buffer->sem0);
@@ -670,6 +675,13 @@
api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
goto retry_semwait;
}
+ outq_flush (conn_info);
+ sem_getvalue (&conn_info->control_buffer->sem3, &sem_value);
+ if (sem_value > 0) {
+ sem_wait (&conn_info->control_buffer->sem3);
+ } else {
+ continue;
+ }
#else
sop.sem_num = 0;
@@ -934,6 +946,7 @@
conn_info->client_pid = 0;
conn_info->service = SOCKET_SERVICE_INIT;
conn_info->state = CONN_STATE_THREAD_INACTIVE;
+ conn_info->poll_state = POLL_STATE_IN;
list_init (&conn_info->outq_head);
list_init (&conn_info->list);
list_init (&conn_info->zcb_mapped_list_head);
@@ -1272,36 +1285,6 @@
conn_info->control_buffer->write = (write_idx + len) %
conn_info->dispatch_size;
}
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
- int new_fc = 0;
-
- if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
- event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- new_fc = 1;
- }
-
- if (conn_info->flow_control_state != new_fc) {
- if (new_fc == 1) {
- log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control
for %d, event %d\n",
- conn_info->client_pid, event);
- } else {
- log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control
for %d, event %d\n",
- conn_info->client_pid, event);
- }
- conn_info->flow_control_state = new_fc;
- api->stats_update_value (conn_info->stats_handle,
"flow_control",
- &conn_info->flow_control_state,
- sizeof(conn_info->flow_control_state));
- api->stats_increment_value (conn_info->stats_handle,
"flow_control_count");
- }
-
- return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
static void msg_send (void *conn, const struct iovec *iov, unsigned int
iov_len,
int locked)
{
@@ -1311,30 +1294,23 @@
#endif
int res;
int i;
+ char buf;
for (i = 0; i < iov_len; i++) {
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
}
- if (list_empty (&conn_info->outq_head))
- res = flow_control_event_send (conn_info,
MESSAGE_RES_OUTQ_EMPTY);
- else
- res = flow_control_event_send (conn_info,
MESSAGE_RES_OUTQ_NOT_EMPTY);
-
- if (res == -1 && errno == EAGAIN) {
- if (locked == 0) {
- pthread_mutex_lock (&conn_info->mutex);
- }
+ buf = list_empty (&conn_info->outq_head);
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res != 1) {
conn_info->pending_semops += 1;
- if (locked == 0) {
- pthread_mutex_unlock (&conn_info->mutex);
+ if (conn_info->poll_state == POLL_STATE_IN) {
+ conn_info->poll_state = POLL_STATE_INOUT;
+ api->poll_dispatch_modify (conn_info->fd,
+ POLLIN|POLLOUT|POLLNVAL);
}
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLOUT|POLLNVAL);
- } else
- if (res == -1) {
- ipc_disconnect (conn_info);
}
+
#if _POSIX_THREAD_PROCESS_SHARED > 0
res = sem_post (&conn_info->control_buffer->sem2);
#else
@@ -1364,8 +1340,8 @@
pthread_mutex_lock (&conn_info->mutex);
if (list_empty (&conn_info->outq_head)) {
- res = flow_control_event_send (conn_info,
MESSAGE_RES_OUTQ_FLUSH_NR);
pthread_mutex_unlock (&conn_info->mutex);
+ conn_info->control_buffer->flow_control_enabled = 0;
return;
}
for (list = conn_info->outq_head.next;
@@ -1375,6 +1351,7 @@
outq_item = list_entry (list, struct outq_item, list);
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
if (bytes_left > outq_item->mlen) {
+printf ("flushing a message\n");
iov.iov_base = outq_item->msg;
iov.iov_len = outq_item->mlen;
msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
@@ -1460,6 +1437,7 @@
bytes_msg += iov[i].iov_len;
}
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+ conn_info->control_buffer->flow_control_enabled = 1;
outq_item = api->malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
ipc_disconnect (conn);
@@ -1480,11 +1458,6 @@
outq_item->mlen = bytes_msg;
list_init (&outq_item->list);
pthread_mutex_lock (&conn_info->mutex);
- if (list_empty (&conn_info->outq_head)) {
- conn_info->notify_flow_control_enabled = 1;
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLOUT|POLLNVAL);
- }
list_add_tail (&outq_item->list, &conn_info->outq_head);
pthread_mutex_unlock (&conn_info->mutex);
api->stats_increment_value (conn_info->stats_handle,
"queue_size");
@@ -1731,7 +1704,6 @@
conn_info->service = req_setup->service;
conn_info->refcount = 0;
- conn_info->notify_flow_control_enabled = 0;
conn_info->setup_bytes_read = 0;
#if _POSIX_THREAD_PROCESS_SHARED < 1
@@ -1783,9 +1755,6 @@
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
if (res == 1) {
switch (buf) {
- case MESSAGE_REQ_OUTQ_FLUSH:
- outq_flush (conn_info);
- break;
case MESSAGE_REQ_CHANGE_EUID:
if (priv_change (conn_info) == -1) {
ipc_disconnect (conn_info);
@@ -1809,37 +1778,24 @@
coroipcs_refcount_dec (conn_info);
}
- coroipcs_refcount_inc (conn_info);
- pthread_mutex_lock (&conn_info->mutex);
- if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent &
POLLOUT)) {
- if (list_empty (&conn_info->outq_head))
- buf = MESSAGE_RES_OUTQ_EMPTY;
- else
- buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+ if (revent & POLLOUT) {
+ int psop = conn_info->pending_semops;
+ int i;
- for (; conn_info->pending_semops;) {
- res = flow_control_event_send (conn_info, buf);
- if (res == 1) {
- conn_info->pending_semops--;
+ assert (psop != 0);
+ for (i = 0; i < psop; i++) {
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res != 1) {
+ return (0);
} else {
- break;
+ conn_info->pending_semops -= 1;
}
}
- if (conn_info->notify_flow_control_enabled) {
- res = flow_control_event_send (conn_info,
MESSAGE_RES_ENABLE_FLOWCONTROL);
- if (res == 1) {
- conn_info->notify_flow_control_enabled = 0;
- }
+ if (conn_info->poll_state == POLL_STATE_INOUT) {
+ conn_info->poll_state = POLL_STATE_IN;
+ api->poll_dispatch_modify (conn_info->fd,
POLLIN|POLLNVAL);
}
- if (conn_info->notify_flow_control_enabled == 0 &&
- conn_info->pending_semops == 0) {
-
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLNVAL);
- }
}
- pthread_mutex_unlock (&conn_info->mutex);
- coroipcs_refcount_dec (conn_info);
return (0);
}
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c (revision 2971)
+++ lib/coroipcc.c (working copy)
@@ -116,6 +116,23 @@
#define MSG_NOSIGNAL 0
#endif
+static int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+ unsigned int n_read;
+ unsigned int n_write;
+ unsigned int bytes_left;
+
+ n_read = context->control_buffer->read;
+ n_write = context->control_buffer->write;
+
+ if (n_read <= n_write) {
+ bytes_left = context->dispatch_size - n_write + n_read;
+ } else {
+ bytes_left = n_read - n_write;
+ }
+ return (bytes_left);
+}
+
static cs_error_t
socket_send (
int s,
@@ -440,6 +457,10 @@
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
+ res = sem_post (&ipc_instance->control_buffer->sem3);
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
res = sem_post (&ipc_instance->control_buffer->sem0);
if (res == -1) {
return (CS_ERR_LIBRARY);
@@ -449,18 +470,35 @@
* Signal semaphore #0 indicting a new message from client
* to server request queue
*/
+ sop.sem_num = 4;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop_four:
+ res = semop (ipc_instance->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ return (CS_ERR_TRY_AGAIN);
+ } else
+ if (res == -1 && errno == EACCES) {
+ priv_change_send (ipc_instance);
+ goto retry_semop_four;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+
sop.sem_num = 0;
sop.sem_op = 1;
sop.sem_flg = 0;
-retry_semop:
+retry_semop_zero:
res = semop (ipc_instance->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
return (CS_ERR_TRY_AGAIN);
} else
if (res == -1 && errno == EACCES) {
priv_change_send (ipc_instance);
- goto retry_semop;
+ goto retry_semop_zero;
} else
if (res == -1) {
return (CS_ERR_LIBRARY);
@@ -706,6 +744,7 @@
sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem3, 1, 0);
#else
/*
* Allocate a semaphore segment
@@ -714,7 +753,7 @@
semkey = random();
ipc_instance->euid = geteuid ();
if ((ipc_instance->semid
- = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+ = semget (semkey, 5, IPC_CREAT|IPC_EXCL|0600)) != -1) {
break;
}
/*
@@ -730,18 +769,14 @@
}
}
- semun.val = 0;
- sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
- if (sys_res != 0) {
- res = CS_ERR_LIBRARY;
- goto error_exit;
+ for (i = 0; i < 5; i++) {
+ semun.val = 0;
+ sys_res = semctl (ipc_instance->semid, i, SETVAL, semun);
+ if (sys_res != 0) {
+ res = CS_ERR_LIBRARY;
+ goto error_exit;
+ }
}
-
- sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
- if (sys_res != 0) {
- res = CS_ERR_LIBRARY;
- goto error_exit;
- }
#endif
/*
@@ -877,8 +912,6 @@
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
- int res;
- char buf_two = 1;
char *data_addr;
cs_error_t error = CS_OK;
@@ -911,47 +944,15 @@
goto error_put;
}
- res = recv (ipc_instance->fd, &buf, 1, 0);
- if (res == -1 && errno == EINTR) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- } else
- if (res == -1) {
- error = CS_ERR_LIBRARY;
- goto error_put;
- } else
- if (res == 0) {
- /* Means that the peer closed cleanly the socket. However, it
should
- * happen only on BSD and Darwing systems since poll() returns a
- * POLLHUP event on other systems.
+ error = socket_recv (ipc_instance->fd, &buf, 1);
+ assert (error == CS_OK);
+
+ if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+ /*
+ * Notify coroipcs to flush any pending dispatch messages
*/
- error = CS_ERR_LIBRARY;
- goto error_put;
+ sem_post (&ipc_instance->control_buffer->sem0);
}
- ipc_instance->flow_control_state = 0;
- if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf ==
MESSAGE_RES_ENABLE_FLOWCONTROL) {
- ipc_instance->flow_control_state = 1;
- }
- /*
- * Notify executive to flush any pending dispatch messages
- */
- if (ipc_instance->flow_control_state) {
- buf_two = MESSAGE_REQ_OUTQ_FLUSH;
- res = socket_send (ipc_instance->fd, &buf_two, 1);
- assert (res == CS_OK); /* TODO */
- }
- /*
- * This is just a notification of flow control starting at the addition
- * of a new pending message, not a message to dispatch
- */
- if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- }
- if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- }
data_addr = ipc_instance->dispatch_buffer;
@@ -1017,6 +1018,11 @@
return (res);
}
+ if (ipc_instance->control_buffer->flow_control_enabled == 1) {
+ res = CS_ERR_TRY_AGAIN;
+ goto put_exit;
+ }
+
pthread_mutex_lock (&ipc_instance->mutex);
res = msg_send (ipc_instance, iov, iov_len);
@@ -1027,8 +1033,9 @@
res = reply_receive (ipc_instance, res_msg, res_len);
error_exit:
+ pthread_mutex_unlock (&ipc_instance->mutex);
+put_exit:
hdb_handle_put (&ipc_hdb, handle);
- pthread_mutex_unlock (&ipc_instance->mutex);
return (res);
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais