On Mon, Jun 28, 2010 at 05:47:55PM -0700, Steven Dake wrote:
> This is take 1 of this patch. Needs bsd port. Will work on that
> soon. Wanted to get some feedback on the code.
>
> This fixes a problem where flow control and dispatch are intermixed
> into the same stream of data, resulting in a lockup of the ipc
> system under really heavy load. Can't really duplicate in corosync
> today unless running with some experimental patches which make
> cpg_mcast_joined async while triggering flow control on and off
> states.
Please put all this (os specific) sem code somewhere common. It's
getting messy.
#if _POSIX_THREAD_PROCESS_SHARED > 0
#else
#endif
When reading the code it is not obvious what each
sem is for (sem 1 2 3 4? - how about a real name).
cs_sem_wait(&request_msg_available);
It is becomming more and more difficult to figure out what is going
on this this file - not good for bug fixing.
The flow control diagnostics were useful, can we have
them back? Both the log mesages and the objdb stats.
-Angus
>
> Regards
> -steve
> Index: test/cpgbench.c
> ===================================================================
> --- test/cpgbench.c (revision 2971)
> +++ test/cpgbench.c (working copy)
> @@ -50,6 +50,7 @@
> #include <sys/socket.h>
> #include <netinet/in.h>
> #include <arpa/inet.h>
> +#include <pthread.h>
>
> #include <corosync/corotypes.h>
> #include <corosync/cpg.h>
> @@ -79,6 +80,7 @@
>
> static unsigned int write_count;
>
> +static int my_write = 0;
> static void cpg_bm_deliver_fn (
> cpg_handle_t handle,
> const struct cpg_name *group_name,
> @@ -97,6 +99,16 @@
>
> static char data[500000];
>
> +static void *dispatch_thread (void *param)
> +{
> + int res;
> + cpg_handle_t handle = *(cpg_handle_t *)param;
> +
> + res = cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
> +printf ("thread exiting\n");
> + return (0);
> +}
> +
> static void cpg_benchmark (
> cpg_handle_t handle,
> int write_size)
> @@ -104,7 +116,6 @@
> struct timeval tv1, tv2, tv_elapsed;
> struct iovec iov;
> unsigned int res;
> - cpg_flow_control_state_t flow_control_state;
>
> alarm_notice = 0;
> iov.iov_base = data;
> @@ -118,19 +129,11 @@
> /*
> * Test checkpoint write
> */
> - cpg_flow_control_state_get (handle, &flow_control_state);
> - if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
> retry:
> - res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov,
> 1);
> - if (res == CS_ERR_TRY_AGAIN) {
> - goto retry;
> - }
> + res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
> + if (res == CS_ERR_TRY_AGAIN) {
> + goto retry;
> }
> - res = cpg_dispatch (handle, CS_DISPATCH_ALL);
> - if (res != CS_OK) {
> - printf ("cpg dispatch returned error %d\n", res);
> - exit (1);
> - }
> } while (alarm_notice == 0);
> gettimeofday (&tv2, NULL);
> timersub (&tv2, &tv1, &tv_elapsed);
> @@ -160,14 +163,17 @@
> unsigned int size;
> int i;
> unsigned int res;
> + pthread_t thread;
>
> - size = 1000;
> +
> + size = 1;
> signal (SIGALRM, sigalrm_handler);
> res = cpg_initialize (&handle, &callbacks);
> if (res != CS_OK) {
> printf ("cpg_initialize failed with result %d\n", res);
> exit (1);
> }
> + pthread_create (&thread, NULL, dispatch_thread, &handle);
>
> res = cpg_join (handle, &group_name);
> if (res != CS_OK) {
> @@ -183,7 +189,7 @@
>
> res = cpg_finalize (handle);
> if (res != CS_OK) {
> - printf ("cpg_join failed with result %d\n", res);
> + printf ("cpg_finalize failed with result %d\n", res);
> exit (1);
> }
> return (0);
> Index: include/corosync/coroipc_ipc.h
> ===================================================================
> --- include/corosync/coroipc_ipc.h (revision 2971)
> +++ include/corosync/coroipc_ipc.h (working copy)
> @@ -74,7 +74,9 @@
> sem_t sem0;
> sem_t sem1;
> sem_t sem2;
> + sem_t sem3;
> #endif
> + int flow_control_enabled;
> };
>
> enum res_init_types {
> Index: exec/coroipcs.c
> ===================================================================
> --- exec/coroipcs.c (revision 2971)
> +++ exec/coroipcs.c (working copy)
> @@ -94,6 +94,9 @@
> #define MSG_SEND_LOCKED 0
> #define MSG_SEND_UNLOCKED 1
>
> +#define POLL_STATE_IN 1
> +#define POLL_STATE_INOUT 2
> +
> static struct coroipcs_init_state_v2 *api = NULL;
>
> DECLARE_LIST_INIT (conn_info_list_head);
> @@ -140,7 +143,6 @@
> pthread_attr_t thread_attr;
> unsigned int service;
> enum conn_state state;
> - int notify_flow_control_enabled;
> int flow_control_state;
> int refcount;
> hdb_handle_t stats_handle;
> @@ -165,6 +167,7 @@
> unsigned int setup_bytes_read;
> struct list_head zcb_mapped_list_head;
> char *sending_allowed_private_data[64];
> + int poll_state;
> };
>
> static int shared_mem_dispatch_bytes_left (const struct conn_info
> *conn_info);
> @@ -659,6 +662,8 @@
> #endif
>
> for (;;) {
> + int sem_value;
> +
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> retry_semwait:
> res = sem_wait (&conn_info->control_buffer->sem0);
> @@ -670,6 +675,13 @@
> api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> goto retry_semwait;
> }
> + outq_flush (conn_info);
> + sem_getvalue (&conn_info->control_buffer->sem3, &sem_value);
> + if (sem_value > 0) {
> + sem_wait (&conn_info->control_buffer->sem3);
> + } else {
> + continue;
> + }
> #else
>
> sop.sem_num = 0;
> @@ -934,6 +946,7 @@
> conn_info->client_pid = 0;
> conn_info->service = SOCKET_SERVICE_INIT;
> conn_info->state = CONN_STATE_THREAD_INACTIVE;
> + conn_info->poll_state = POLL_STATE_IN;
> list_init (&conn_info->outq_head);
> list_init (&conn_info->list);
> list_init (&conn_info->zcb_mapped_list_head);
> @@ -1272,36 +1285,6 @@
> conn_info->control_buffer->write = (write_idx + len) %
> conn_info->dispatch_size;
> }
>
> -/**
> - * simulate the behaviour in coroipcc.c
> - */
> -static int flow_control_event_send (struct conn_info *conn_info, char event)
> -{
> - int new_fc = 0;
> -
> - if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
> - event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> - new_fc = 1;
> - }
> -
> - if (conn_info->flow_control_state != new_fc) {
> - if (new_fc == 1) {
> - log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control
> for %d, event %d\n",
> - conn_info->client_pid, event);
> - } else {
> - log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control
> for %d, event %d\n",
> - conn_info->client_pid, event);
> - }
> - conn_info->flow_control_state = new_fc;
> - api->stats_update_value (conn_info->stats_handle,
> "flow_control",
> - &conn_info->flow_control_state,
> - sizeof(conn_info->flow_control_state));
> - api->stats_increment_value (conn_info->stats_handle,
> "flow_control_count");
> - }
> -
> - return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
> -}
> -
> static void msg_send (void *conn, const struct iovec *iov, unsigned int
> iov_len,
> int locked)
> {
> @@ -1311,30 +1294,23 @@
> #endif
> int res;
> int i;
> + char buf;
>
> for (i = 0; i < iov_len; i++) {
> memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
> }
>
> - if (list_empty (&conn_info->outq_head))
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_OUTQ_EMPTY);
> - else
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_OUTQ_NOT_EMPTY);
> -
> - if (res == -1 && errno == EAGAIN) {
> - if (locked == 0) {
> - pthread_mutex_lock (&conn_info->mutex);
> - }
> + buf = list_empty (&conn_info->outq_head);
> + res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> + if (res != 1) {
> conn_info->pending_semops += 1;
> - if (locked == 0) {
> - pthread_mutex_unlock (&conn_info->mutex);
> + if (conn_info->poll_state == POLL_STATE_IN) {
> + conn_info->poll_state = POLL_STATE_INOUT;
> + api->poll_dispatch_modify (conn_info->fd,
> + POLLIN|POLLOUT|POLLNVAL);
> }
> - api->poll_dispatch_modify (conn_info->fd,
> - POLLIN|POLLOUT|POLLNVAL);
> - } else
> - if (res == -1) {
> - ipc_disconnect (conn_info);
> }
> +
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> res = sem_post (&conn_info->control_buffer->sem2);
> #else
> @@ -1364,8 +1340,8 @@
>
> pthread_mutex_lock (&conn_info->mutex);
> if (list_empty (&conn_info->outq_head)) {
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_OUTQ_FLUSH_NR);
> pthread_mutex_unlock (&conn_info->mutex);
> + conn_info->control_buffer->flow_control_enabled = 0;
> return;
> }
> for (list = conn_info->outq_head.next;
> @@ -1375,6 +1351,7 @@
> outq_item = list_entry (list, struct outq_item, list);
> bytes_left = shared_mem_dispatch_bytes_left (conn_info);
> if (bytes_left > outq_item->mlen) {
> +printf ("flushing a message\n");
> iov.iov_base = outq_item->msg;
> iov.iov_len = outq_item->mlen;
> msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
> @@ -1460,6 +1437,7 @@
> bytes_msg += iov[i].iov_len;
> }
> if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
> + conn_info->control_buffer->flow_control_enabled = 1;
> outq_item = api->malloc (sizeof (struct outq_item));
> if (outq_item == NULL) {
> ipc_disconnect (conn);
> @@ -1480,11 +1458,6 @@
> outq_item->mlen = bytes_msg;
> list_init (&outq_item->list);
> pthread_mutex_lock (&conn_info->mutex);
> - if (list_empty (&conn_info->outq_head)) {
> - conn_info->notify_flow_control_enabled = 1;
> - api->poll_dispatch_modify (conn_info->fd,
> - POLLIN|POLLOUT|POLLNVAL);
> - }
> list_add_tail (&outq_item->list, &conn_info->outq_head);
> pthread_mutex_unlock (&conn_info->mutex);
> api->stats_increment_value (conn_info->stats_handle,
> "queue_size");
> @@ -1731,7 +1704,6 @@
>
> conn_info->service = req_setup->service;
> conn_info->refcount = 0;
> - conn_info->notify_flow_control_enabled = 0;
> conn_info->setup_bytes_read = 0;
>
> #if _POSIX_THREAD_PROCESS_SHARED < 1
> @@ -1783,9 +1755,6 @@
> res = recv (fd, &buf, 1, MSG_NOSIGNAL);
> if (res == 1) {
> switch (buf) {
> - case MESSAGE_REQ_OUTQ_FLUSH:
> - outq_flush (conn_info);
> - break;
> case MESSAGE_REQ_CHANGE_EUID:
> if (priv_change (conn_info) == -1) {
> ipc_disconnect (conn_info);
> @@ -1809,37 +1778,24 @@
> coroipcs_refcount_dec (conn_info);
> }
>
> - coroipcs_refcount_inc (conn_info);
> - pthread_mutex_lock (&conn_info->mutex);
> - if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent &
> POLLOUT)) {
> - if (list_empty (&conn_info->outq_head))
> - buf = MESSAGE_RES_OUTQ_EMPTY;
> - else
> - buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
> + if (revent & POLLOUT) {
> + int psop = conn_info->pending_semops;
> + int i;
>
> - for (; conn_info->pending_semops;) {
> - res = flow_control_event_send (conn_info, buf);
> - if (res == 1) {
> - conn_info->pending_semops--;
> + assert (psop != 0);
> + for (i = 0; i < psop; i++) {
> + res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> + if (res != 1) {
> + return (0);
> } else {
> - break;
> + conn_info->pending_semops -= 1;
> }
> }
> - if (conn_info->notify_flow_control_enabled) {
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_ENABLE_FLOWCONTROL);
> - if (res == 1) {
> - conn_info->notify_flow_control_enabled = 0;
> - }
> + if (conn_info->poll_state == POLL_STATE_INOUT) {
> + conn_info->poll_state = POLL_STATE_IN;
> + api->poll_dispatch_modify (conn_info->fd,
> POLLIN|POLLNVAL);
> }
> - if (conn_info->notify_flow_control_enabled == 0 &&
> - conn_info->pending_semops == 0) {
> -
> - api->poll_dispatch_modify (conn_info->fd,
> - POLLIN|POLLNVAL);
> - }
> }
> - pthread_mutex_unlock (&conn_info->mutex);
> - coroipcs_refcount_dec (conn_info);
>
> return (0);
> }
> Index: lib/coroipcc.c
> ===================================================================
> --- lib/coroipcc.c (revision 2971)
> +++ lib/coroipcc.c (working copy)
> @@ -116,6 +116,23 @@
> #define MSG_NOSIGNAL 0
> #endif
>
> +static int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
> +{
> + unsigned int n_read;
> + unsigned int n_write;
> + unsigned int bytes_left;
> +
> + n_read = context->control_buffer->read;
> + n_write = context->control_buffer->write;
> +
> + if (n_read <= n_write) {
> + bytes_left = context->dispatch_size - n_write + n_read;
> + } else {
> + bytes_left = n_read - n_write;
> + }
> + return (bytes_left);
> +}
> +
> static cs_error_t
> socket_send (
> int s,
> @@ -440,6 +457,10 @@
> }
>
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> + res = sem_post (&ipc_instance->control_buffer->sem3);
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> res = sem_post (&ipc_instance->control_buffer->sem0);
> if (res == -1) {
> return (CS_ERR_LIBRARY);
> @@ -449,18 +470,35 @@
> * Signal semaphore #0 indicting a new message from client
> * to server request queue
> */
> + sop.sem_num = 4;
> + sop.sem_op = 1;
> + sop.sem_flg = 0;
> +
> +retry_semop_four:
> + res = semop (ipc_instance->semid, &sop, 1);
> + if (res == -1 && errno == EINTR) {
> + return (CS_ERR_TRY_AGAIN);
> + } else
> + if (res == -1 && errno == EACCES) {
> + priv_change_send (ipc_instance);
> + goto retry_semop_four;
> + } else
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> +
> sop.sem_num = 0;
> sop.sem_op = 1;
> sop.sem_flg = 0;
>
> -retry_semop:
> +retry_semop_zero:
> res = semop (ipc_instance->semid, &sop, 1);
> if (res == -1 && errno == EINTR) {
> return (CS_ERR_TRY_AGAIN);
> } else
> if (res == -1 && errno == EACCES) {
> priv_change_send (ipc_instance);
> - goto retry_semop;
> + goto retry_semop_zero;
> } else
> if (res == -1) {
> return (CS_ERR_LIBRARY);
> @@ -706,6 +744,7 @@
> sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
> sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
> sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
> + sem_init (&ipc_instance->control_buffer->sem3, 1, 0);
> #else
> /*
> * Allocate a semaphore segment
> @@ -714,7 +753,7 @@
> semkey = random();
> ipc_instance->euid = geteuid ();
> if ((ipc_instance->semid
> - = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
> + = semget (semkey, 5, IPC_CREAT|IPC_EXCL|0600)) != -1) {
> break;
> }
> /*
> @@ -730,18 +769,14 @@
> }
> }
>
> - semun.val = 0;
> - sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
> - if (sys_res != 0) {
> - res = CS_ERR_LIBRARY;
> - goto error_exit;
> + for (i = 0; i < 5; i++) {
> + semun.val = 0;
> + sys_res = semctl (ipc_instance->semid, i, SETVAL, semun);
> + if (sys_res != 0) {
> + res = CS_ERR_LIBRARY;
> + goto error_exit;
> + }
> }
> -
> - sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
> - if (sys_res != 0) {
> - res = CS_ERR_LIBRARY;
> - goto error_exit;
> - }
> #endif
>
> /*
> @@ -877,8 +912,6 @@
> int poll_events;
> char buf;
> struct ipc_instance *ipc_instance;
> - int res;
> - char buf_two = 1;
> char *data_addr;
> cs_error_t error = CS_OK;
>
> @@ -911,47 +944,15 @@
> goto error_put;
> }
>
> - res = recv (ipc_instance->fd, &buf, 1, 0);
> - if (res == -1 && errno == EINTR) {
> - error = CS_ERR_TRY_AGAIN;
> - goto error_put;
> - } else
> - if (res == -1) {
> - error = CS_ERR_LIBRARY;
> - goto error_put;
> - } else
> - if (res == 0) {
> - /* Means that the peer closed cleanly the socket. However, it
> should
> - * happen only on BSD and Darwing systems since poll() returns a
> - * POLLHUP event on other systems.
> + error = socket_recv (ipc_instance->fd, &buf, 1);
> + assert (error == CS_OK);
> +
> + if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
> + /*
> + * Notify coroipcs to flush any pending dispatch messages
> */
> - error = CS_ERR_LIBRARY;
> - goto error_put;
> + sem_post (&ipc_instance->control_buffer->sem0);
> }
> - ipc_instance->flow_control_state = 0;
> - if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf ==
> MESSAGE_RES_ENABLE_FLOWCONTROL) {
> - ipc_instance->flow_control_state = 1;
> - }
> - /*
> - * Notify executive to flush any pending dispatch messages
> - */
> - if (ipc_instance->flow_control_state) {
> - buf_two = MESSAGE_REQ_OUTQ_FLUSH;
> - res = socket_send (ipc_instance->fd, &buf_two, 1);
> - assert (res == CS_OK); /* TODO */
> - }
> - /*
> - * This is just a notification of flow control starting at the addition
> - * of a new pending message, not a message to dispatch
> - */
> - if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> - error = CS_ERR_TRY_AGAIN;
> - goto error_put;
> - }
> - if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
> - error = CS_ERR_TRY_AGAIN;
> - goto error_put;
> - }
>
> data_addr = ipc_instance->dispatch_buffer;
>
> @@ -1017,6 +1018,11 @@
> return (res);
> }
>
> + if (ipc_instance->control_buffer->flow_control_enabled == 1) {
> + res = CS_ERR_TRY_AGAIN;
> + goto put_exit;
> + }
> +
> pthread_mutex_lock (&ipc_instance->mutex);
>
> res = msg_send (ipc_instance, iov, iov_len);
> @@ -1027,8 +1033,9 @@
> res = reply_receive (ipc_instance, res_msg, res_len);
>
> error_exit:
> + pthread_mutex_unlock (&ipc_instance->mutex);
> +put_exit:
> hdb_handle_put (&ipc_hdb, handle);
> - pthread_mutex_unlock (&ipc_instance->mutex);
>
> return (res);
> }
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais