This is take 1 of this patch. Needs bsd port. Will work on that soon. Wanted to get some feedback on the code.

This fixes a problem where flow control and dispatch are intermixed into the same stream of data, resulting in a lockup of the ipc system under really heavy load. Can't really duplicate in corosync today unless running with some experimental patches which make cpg_mcast_joined async while triggering flow control on and off states.

Regards
-steve
Index: test/cpgbench.c
===================================================================
--- test/cpgbench.c     (revision 2971)
+++ test/cpgbench.c     (working copy)
@@ -50,6 +50,7 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
+#include <pthread.h>
 
 #include <corosync/corotypes.h>
 #include <corosync/cpg.h>
@@ -79,6 +80,7 @@
 
 static unsigned int write_count;
 
+static int my_write = 0;
 static void cpg_bm_deliver_fn (
         cpg_handle_t handle,
         const struct cpg_name *group_name,
@@ -97,6 +99,16 @@
 
 static char data[500000];
 
+static void *dispatch_thread (void *param)
+{
+       int res;
+       cpg_handle_t handle = *(cpg_handle_t *)param;
+
+       res = cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
+printf ("thread exiting\n");
+       return (0);
+}
+
 static void cpg_benchmark (
        cpg_handle_t handle,
        int write_size)
@@ -104,7 +116,6 @@
        struct timeval tv1, tv2, tv_elapsed;
        struct iovec iov;
        unsigned int res;
-       cpg_flow_control_state_t flow_control_state;
 
        alarm_notice = 0;
        iov.iov_base = data;
@@ -118,19 +129,11 @@
                /*
                 * Test checkpoint write
                 */
-               cpg_flow_control_state_get (handle, &flow_control_state);
-               if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
 retry:
-                       res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 
1);
-                       if (res == CS_ERR_TRY_AGAIN) {
-                               goto retry;
-                       }
+               res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
+               if (res == CS_ERR_TRY_AGAIN) {
+                       goto retry;
                }
-               res = cpg_dispatch (handle, CS_DISPATCH_ALL);
-               if (res != CS_OK) {
-                       printf ("cpg dispatch returned error %d\n", res);
-                       exit (1);
-               }
        } while (alarm_notice == 0);
        gettimeofday (&tv2, NULL);
        timersub (&tv2, &tv1, &tv_elapsed);
@@ -160,14 +163,17 @@
        unsigned int size;
        int i;
        unsigned int res;
+       pthread_t thread;
 
-       size = 1000;
+
+       size = 1;
        signal (SIGALRM, sigalrm_handler);
        res = cpg_initialize (&handle, &callbacks);
        if (res != CS_OK) {
                printf ("cpg_initialize failed with result %d\n", res);
                exit (1);
        }
+       pthread_create (&thread, NULL, dispatch_thread, &handle);
 
        res = cpg_join (handle, &group_name);
        if (res != CS_OK) {
@@ -183,7 +189,7 @@
 
        res = cpg_finalize (handle);
        if (res != CS_OK) {
-               printf ("cpg_join failed with result %d\n", res);
+               printf ("cpg_finalize failed with result %d\n", res);
                exit (1);
        }
        return (0);
Index: include/corosync/coroipc_ipc.h
===================================================================
--- include/corosync/coroipc_ipc.h      (revision 2971)
+++ include/corosync/coroipc_ipc.h      (working copy)
@@ -74,7 +74,9 @@
        sem_t sem0;
        sem_t sem1;
        sem_t sem2;
+       sem_t sem3;
 #endif
+       int flow_control_enabled;
 };
 
 enum res_init_types {
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c     (revision 2971)
+++ exec/coroipcs.c     (working copy)
@@ -94,6 +94,9 @@
 #define MSG_SEND_LOCKED                0
 #define MSG_SEND_UNLOCKED      1
 
+#define POLL_STATE_IN          1
+#define POLL_STATE_INOUT       2
+
 static struct coroipcs_init_state_v2 *api = NULL;
 
 DECLARE_LIST_INIT (conn_info_list_head);
@@ -140,7 +143,6 @@
        pthread_attr_t thread_attr;
        unsigned int service;
        enum conn_state state;
-       int notify_flow_control_enabled;
        int flow_control_state;
        int refcount;
        hdb_handle_t stats_handle;
@@ -165,6 +167,7 @@
        unsigned int setup_bytes_read;
        struct list_head zcb_mapped_list_head;
        char *sending_allowed_private_data[64];
+       int poll_state;
 };
 
 static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -659,6 +662,8 @@
 #endif
 
        for (;;) {
+               int sem_value;
+
 #if _POSIX_THREAD_PROCESS_SHARED > 0
 retry_semwait:
                res = sem_wait (&conn_info->control_buffer->sem0);
@@ -670,6 +675,13 @@
                        api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
                        goto retry_semwait;
                }
+               outq_flush (conn_info);
+               sem_getvalue (&conn_info->control_buffer->sem3, &sem_value);
+               if (sem_value > 0) {
+                       sem_wait (&conn_info->control_buffer->sem3);
+               } else {
+                       continue;
+               }
 #else
 
                sop.sem_num = 0;
@@ -934,6 +946,7 @@
        conn_info->client_pid = 0;
        conn_info->service = SOCKET_SERVICE_INIT;
        conn_info->state = CONN_STATE_THREAD_INACTIVE;
+        conn_info->poll_state = POLL_STATE_IN;
        list_init (&conn_info->outq_head);
        list_init (&conn_info->list);
        list_init (&conn_info->zcb_mapped_list_head);
@@ -1272,36 +1285,6 @@
        conn_info->control_buffer->write = (write_idx + len) % 
conn_info->dispatch_size;
 }
 
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
-       int new_fc = 0;
-
-       if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
-               event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-               new_fc = 1;
-       }
-
-       if (conn_info->flow_control_state != new_fc) {
-               if (new_fc == 1) {
-                       log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control 
for %d, event %d\n",
-                               conn_info->client_pid, event);
-               } else {
-                       log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control 
for %d, event %d\n",
-                               conn_info->client_pid, event);
-               }
-               conn_info->flow_control_state = new_fc;
-               api->stats_update_value (conn_info->stats_handle, 
"flow_control",
-                       &conn_info->flow_control_state,
-                       sizeof(conn_info->flow_control_state));
-               api->stats_increment_value (conn_info->stats_handle, 
"flow_control_count");
-       }
-
-       return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
 static void msg_send (void *conn, const struct iovec *iov, unsigned int 
iov_len,
                      int locked)
 {
@@ -1311,30 +1294,23 @@
 #endif
        int res;
        int i;
+       char buf;
 
        for (i = 0; i < iov_len; i++) {
                memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
        }
 
-       if (list_empty (&conn_info->outq_head))
-               res = flow_control_event_send (conn_info, 
MESSAGE_RES_OUTQ_EMPTY);
-       else
-               res = flow_control_event_send (conn_info, 
MESSAGE_RES_OUTQ_NOT_EMPTY);
-
-       if (res == -1 && errno == EAGAIN) {
-               if (locked == 0) {
-                       pthread_mutex_lock (&conn_info->mutex);
-               }
+       buf = list_empty (&conn_info->outq_head);
+       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+       if (res != 1) {
                conn_info->pending_semops += 1;
-               if (locked == 0) {
-                       pthread_mutex_unlock (&conn_info->mutex);
+               if (conn_info->poll_state == POLL_STATE_IN) {
+                       conn_info->poll_state = POLL_STATE_INOUT;
+                       api->poll_dispatch_modify (conn_info->fd,
+                               POLLIN|POLLOUT|POLLNVAL);
                }
-               api->poll_dispatch_modify (conn_info->fd,
-                       POLLIN|POLLOUT|POLLNVAL);
-       } else
-       if (res == -1) {
-               ipc_disconnect (conn_info);
        }
+
 #if _POSIX_THREAD_PROCESS_SHARED > 0
        res = sem_post (&conn_info->control_buffer->sem2);
 #else
@@ -1364,8 +1340,8 @@
 
        pthread_mutex_lock (&conn_info->mutex);
        if (list_empty (&conn_info->outq_head)) {
-               res = flow_control_event_send (conn_info, 
MESSAGE_RES_OUTQ_FLUSH_NR);
                pthread_mutex_unlock (&conn_info->mutex);
+               conn_info->control_buffer->flow_control_enabled = 0;
                return;
        }
        for (list = conn_info->outq_head.next;
@@ -1375,6 +1351,7 @@
                outq_item = list_entry (list, struct outq_item, list);
                bytes_left = shared_mem_dispatch_bytes_left (conn_info);
                if (bytes_left > outq_item->mlen) {
+printf ("flushing a message\n");
                        iov.iov_base = outq_item->msg;
                        iov.iov_len = outq_item->mlen;
                        msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
@@ -1460,6 +1437,7 @@
                bytes_msg += iov[i].iov_len;
        }
        if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+               conn_info->control_buffer->flow_control_enabled = 1;
                outq_item = api->malloc (sizeof (struct outq_item));
                if (outq_item == NULL) {
                        ipc_disconnect (conn);
@@ -1480,11 +1458,6 @@
                outq_item->mlen = bytes_msg;
                list_init (&outq_item->list);
                pthread_mutex_lock (&conn_info->mutex);
-               if (list_empty (&conn_info->outq_head)) {
-                       conn_info->notify_flow_control_enabled = 1;
-                       api->poll_dispatch_modify (conn_info->fd,
-                               POLLIN|POLLOUT|POLLNVAL);
-               }
                list_add_tail (&outq_item->list, &conn_info->outq_head);
                pthread_mutex_unlock (&conn_info->mutex);
                api->stats_increment_value (conn_info->stats_handle, 
"queue_size");
@@ -1731,7 +1704,6 @@
 
                conn_info->service = req_setup->service;
                conn_info->refcount = 0;
-               conn_info->notify_flow_control_enabled = 0;
                conn_info->setup_bytes_read = 0;
 
 #if _POSIX_THREAD_PROCESS_SHARED < 1
@@ -1783,9 +1755,6 @@
                res = recv (fd, &buf, 1, MSG_NOSIGNAL);
                if (res == 1) {
                        switch (buf) {
-                       case MESSAGE_REQ_OUTQ_FLUSH:
-                               outq_flush (conn_info);
-                               break;
                        case MESSAGE_REQ_CHANGE_EUID:
                                if (priv_change (conn_info) == -1) {
                                        ipc_disconnect (conn_info);
@@ -1809,37 +1778,24 @@
                coroipcs_refcount_dec (conn_info);
        }
 
-       coroipcs_refcount_inc (conn_info);
-       pthread_mutex_lock (&conn_info->mutex);
-       if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & 
POLLOUT)) {
-               if (list_empty (&conn_info->outq_head))
-                       buf = MESSAGE_RES_OUTQ_EMPTY;
-               else
-                       buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+       if (revent & POLLOUT) {
+               int psop = conn_info->pending_semops;
+               int i;
 
-               for (; conn_info->pending_semops;) {
-                       res = flow_control_event_send (conn_info, buf);
-                       if (res == 1) {
-                               conn_info->pending_semops--;
+               assert (psop != 0);
+               for (i = 0; i < psop; i++) {
+                       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+                       if (res != 1) {
+                               return (0);
                        } else {
-                               break;
+                               conn_info->pending_semops -= 1;
                        }
                }
-               if (conn_info->notify_flow_control_enabled) {
-                       res = flow_control_event_send (conn_info, 
MESSAGE_RES_ENABLE_FLOWCONTROL);
-                       if (res == 1) {
-                               conn_info->notify_flow_control_enabled = 0;
-                       }
+               if (conn_info->poll_state == POLL_STATE_INOUT) {
+                       conn_info->poll_state = POLL_STATE_IN;
+                       api->poll_dispatch_modify (conn_info->fd, 
POLLIN|POLLNVAL);
                }
-               if (conn_info->notify_flow_control_enabled == 0 &&
-                       conn_info->pending_semops == 0) {
-
-                       api->poll_dispatch_modify (conn_info->fd,
-                               POLLIN|POLLNVAL);
-               }
        }
-       pthread_mutex_unlock (&conn_info->mutex);
-       coroipcs_refcount_dec (conn_info);
 
        return (0);
 }
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c      (revision 2971)
+++ lib/coroipcc.c      (working copy)
@@ -116,6 +116,23 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+static int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+       unsigned int n_read;
+       unsigned int n_write;
+       unsigned int bytes_left;
+
+       n_read = context->control_buffer->read;
+       n_write = context->control_buffer->write;
+
+       if (n_read <= n_write) {
+               bytes_left = context->dispatch_size - n_write + n_read;
+       } else {
+               bytes_left = n_read - n_write;
+       }
+       return (bytes_left);
+}
+
 static cs_error_t
 socket_send (
        int s,
@@ -440,6 +457,10 @@
        }
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
+       res = sem_post (&ipc_instance->control_buffer->sem3);
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
        res = sem_post (&ipc_instance->control_buffer->sem0);
        if (res == -1) {
                return (CS_ERR_LIBRARY);
@@ -449,18 +470,35 @@
         * Signal semaphore #0 indicting a new message from client
         * to server request queue
         */
+       sop.sem_num = 4;
+       sop.sem_op = 1;
+       sop.sem_flg = 0;
+
+retry_semop_four:
+       res = semop (ipc_instance->semid, &sop, 1);
+       if (res == -1 && errno == EINTR) {
+               return (CS_ERR_TRY_AGAIN);
+       } else
+       if (res == -1 && errno == EACCES) {
+               priv_change_send (ipc_instance);
+               goto retry_semop_four;
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+
        sop.sem_num = 0;
        sop.sem_op = 1;
        sop.sem_flg = 0;
 
-retry_semop:
+retry_semop_zero:
        res = semop (ipc_instance->semid, &sop, 1);
        if (res == -1 && errno == EINTR) {
                return (CS_ERR_TRY_AGAIN);
        } else
        if (res == -1 && errno == EACCES) {
                priv_change_send (ipc_instance);
-               goto retry_semop;
+               goto retry_semop_zero;
        } else
        if (res == -1) {
                return (CS_ERR_LIBRARY);
@@ -706,6 +744,7 @@
        sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
        sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
        sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+       sem_init (&ipc_instance->control_buffer->sem3, 1, 0);
 #else
        /*
         * Allocate a semaphore segment
@@ -714,7 +753,7 @@
                semkey = random();
                ipc_instance->euid = geteuid ();
                if ((ipc_instance->semid
-                    = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+                    = semget (semkey, 5, IPC_CREAT|IPC_EXCL|0600)) != -1) {
                      break;
                }
                /*
@@ -730,18 +769,14 @@
                }
        }
 
-       semun.val = 0;
-       sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
-       if (sys_res != 0) {
-               res = CS_ERR_LIBRARY;
-               goto error_exit;
+       for (i = 0; i < 5; i++) {
+               semun.val = 0;
+               sys_res = semctl (ipc_instance->semid, i, SETVAL, semun);
+               if (sys_res != 0) {
+                       res = CS_ERR_LIBRARY;
+                       goto error_exit;
+               }
        }
-
-       sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
-       if (sys_res != 0) {
-               res = CS_ERR_LIBRARY;
-               goto error_exit;
-       }
 #endif
 
        /*
@@ -877,8 +912,6 @@
        int poll_events;
        char buf;
        struct ipc_instance *ipc_instance;
-       int res;
-       char buf_two = 1;
        char *data_addr;
        cs_error_t error = CS_OK;
 
@@ -911,47 +944,15 @@
                goto error_put;
        }
 
-       res = recv (ipc_instance->fd, &buf, 1, 0);
-       if (res == -1 && errno == EINTR) {
-               error = CS_ERR_TRY_AGAIN;
-               goto error_put;
-       } else
-       if (res == -1) {
-               error = CS_ERR_LIBRARY;
-               goto error_put;
-       } else
-       if (res == 0) {
-               /* Means that the peer closed cleanly the socket. However, it 
should
-                * happen only on BSD and Darwing systems since poll() returns a
-                * POLLHUP event on other systems.
+       error = socket_recv (ipc_instance->fd, &buf, 1);
+       assert (error == CS_OK);
+
+       if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+               /*
+                * Notify coroipcs to flush any pending dispatch messages
                 */
-               error = CS_ERR_LIBRARY;
-               goto error_put;
+               sem_post (&ipc_instance->control_buffer->sem0);
        }
-       ipc_instance->flow_control_state = 0;
-       if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == 
MESSAGE_RES_ENABLE_FLOWCONTROL) {
-               ipc_instance->flow_control_state = 1;
-       }
-       /*
-        * Notify executive to flush any pending dispatch messages
-        */
-       if (ipc_instance->flow_control_state) {
-               buf_two = MESSAGE_REQ_OUTQ_FLUSH;
-               res = socket_send (ipc_instance->fd, &buf_two, 1);
-               assert (res == CS_OK); /* TODO */
-       }
-       /*
-        * This is just a notification of flow control starting at the addition
-        * of a new pending message, not a message to dispatch
-        */
-       if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-               error = CS_ERR_TRY_AGAIN;
-               goto error_put;
-       }
-       if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
-               error = CS_ERR_TRY_AGAIN;
-               goto error_put;
-       }
 
        data_addr = ipc_instance->dispatch_buffer;
 
@@ -1017,6 +1018,11 @@
                return (res);
        }
 
+       if (ipc_instance->control_buffer->flow_control_enabled == 1) {
+               res = CS_ERR_TRY_AGAIN;
+               goto put_exit;
+       }
+
        pthread_mutex_lock (&ipc_instance->mutex);
 
        res = msg_send (ipc_instance, iov, iov_len);
@@ -1027,8 +1033,9 @@
        res = reply_receive (ipc_instance, res_msg, res_len);
 
 error_exit:
+       pthread_mutex_unlock (&ipc_instance->mutex);
+put_exit:
        hdb_handle_put (&ipc_hdb, handle);
-       pthread_mutex_unlock (&ipc_instance->mutex);
 
        return (res);
 }
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to