Integrating comments from Angus and generally cleaning up the code a bit for maintenance purposes.

Regards
-steve
Index: include/corosync/coroipc_ipc.h
===================================================================
--- include/corosync/coroipc_ipc.h      (revision 3000)
+++ include/corosync/coroipc_ipc.h      (working copy)
@@ -35,6 +35,9 @@
 #define COROIPC_IPC_H_DEFINED
 
 #include <unistd.h>
+#include <poll.h>
+#include <time.h>
+#include "corotypes.h"
 #include "config.h"
 
 /*
@@ -52,28 +55,40 @@
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
 #include <semaphore.h>
+#else
+#include <sys/sem.h>
 #endif
 
+/*
+ * Define sem_wait timeout (real timeout will be (n-1;n) )
+ */
+#define IPC_SEMWAIT_TIMEOUT 2
+
 enum req_init_types {
        MESSAGE_REQ_RESPONSE_INIT = 0,
        MESSAGE_REQ_DISPATCH_INIT = 1
 };
 
 #define MESSAGE_REQ_CHANGE_EUID                1
-#define MESSAGE_REQ_OUTQ_FLUSH         2
 
-#define MESSAGE_RES_OUTQ_EMPTY         0
-#define MESSAGE_RES_OUTQ_NOT_EMPTY     1
-#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
-#define MESSAGE_RES_OUTQ_FLUSH_NR      3
+enum ipc_semaphore_identifiers {
+       SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT      = 0,
+       SEMAPHORE_REQUEST                       = 1,
+       SEMAPHORE_RESPONSE                      = 2,
+       SEMAPHORE_DISPATCH                      = 3
+};
 
 struct control_buffer {
        unsigned int read;
        unsigned int write;
+       int flow_control_enabled;
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-       sem_t sem0;
-       sem_t sem1;
-       sem_t sem2;
+       sem_t sem_request_or_flush_or_exit;
+       sem_t sem_response;
+       sem_t sem_dispatch;
+       sem_t sem_request;
+#else
+       int semid;
 #endif
 };
 
@@ -145,4 +160,172 @@
 #define ZC_FREE_HEADER         0xFFFFFFFE
 #define ZC_EXECUTE_HEADER      0xFFFFFFFD
 
+static inline cs_error_t
+ipc_sem_wait (
+       struct control_buffer *control_buffer,
+       enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+       struct sembuf sop;
+#else
+       struct timespec timeout;
+       sem_t *sem = NULL;
+#endif
+       int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       switch (sem_id) {
+       case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+               sem = &control_buffer->sem_request_or_flush_or_exit;
+               break;
+       case SEMAPHORE_RESPONSE:
+               sem = &control_buffer->sem_request;
+               break;
+       case SEMAPHORE_DISPATCH:
+               sem = &control_buffer->sem_response;
+               break;
+       case SEMAPHORE_REQUEST:
+               sem = &control_buffer->sem_dispatch;
+               break;
+       }
+
+       timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
+       timeout.tv_nsec = 0;
+
+retry_sem_timedwait:
+       res = sem_timedwait (sem, &timeout);
+       if (res == -1 && errno == ETIMEDOUT) {
+               return (CS_ERR_LIBRARY);
+       } else
+       if (res == -1 && errno == EINTR) {
+               goto retry_sem_timedwait;
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+#else
+       /*
+        * Wait for semaphore indicating a new message from server
+        * to client in queue
+        */
+       sop.sem_num = sem_id;
+       sop.sem_op = -1;
+       sop.sem_flg = 0;
+
+retry_semop:
+       res = semop (control_buffer->semid, &sop, 1);
+       if (res == -1 && errno == EINTR) {
+               return (CS_ERR_TRY_AGAIN);
+               goto retry_semop;
+       } else
+       if (res == -1 && errno == EACCES) {
+               return (CS_ERR_TRY_AGAIN);
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+#endif
+       return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_post (
+       struct control_buffer *control_buffer,
+       enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+       struct sembuf sop;
+#else
+       sem_t *sem = NULL;
+#endif
+       int res;
+       
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       switch (sem_id) {
+       case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+               sem = &control_buffer->sem_request_or_flush_or_exit;
+               break;
+       case SEMAPHORE_RESPONSE:
+               sem = &control_buffer->sem_request;
+               break;
+       case SEMAPHORE_DISPATCH:
+               sem = &control_buffer->sem_response;
+               break;
+       case SEMAPHORE_REQUEST:
+               sem = &control_buffer->sem_dispatch;
+               break;
+       }
+
+       res = sem_post (sem);
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+#else
+       sop.sem_num = sem_id;
+       sop.sem_op = 1;
+       sop.sem_flg = 0;
+
+retry_semop:
+       res = semop (control_buffer->semid, &sop, 1);
+       if (res == -1 && errno == EINTR) {
+               goto retry_semop;
+       } else
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+#endif
+       return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_getvalue (
+       struct control_buffer *control_buffer,
+       enum ipc_semaphore_identifiers sem_id,
+       int *sem_value)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+       struct sembuf sop;
+       int sem_value_hold;
+#else
+       sem_t *sem = NULL;
+#endif
+       
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+       switch (sem_id) {
+       case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+               sem = &control_buffer->sem_request_or_flush_or_exit;
+               break;
+       case SEMAPHORE_RESPONSE:
+               sem = &control_buffer->sem_request;
+               break;
+       case SEMAPHORE_DISPATCH:
+               sem = &control_buffer->sem_response;
+               break;
+       case SEMAPHORE_REQUEST:
+               sem = &control_buffer->sem_dispatch;
+               break;
+       }
+
+       res = sem_getvalue (sem, sem_value);
+       if (res == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+#else
+       sop.sem_num = sem_id;
+       sop.sem_op = 1;
+       sop.sem_flg = 0;
+
+retry_semctl:
+       sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
+       if (sem_value_hold == -1 && errno == EINTR) {
+               goto retry_semctl;
+       } else
+       if (sem_value_hold == -1) {
+               return (CS_ERR_LIBRARY);
+       }
+       *sem_value = sem_value_hold;
+#endif
+       return (CS_OK);
+}
+
 #endif /* COROIPC_IPC_H_DEFINED */
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c     (revision 3000)
+++ exec/coroipcs.c     (working copy)
@@ -95,6 +95,9 @@
 #define MSG_SEND_LOCKED                0
 #define MSG_SEND_UNLOCKED      1
 
+#define POLL_STATE_IN          1
+#define POLL_STATE_INOUT       2
+
 static struct coroipcs_init_state_v2 *api = NULL;
 
 DECLARE_LIST_INIT (conn_info_list_head);
@@ -141,13 +144,10 @@
        pthread_attr_t thread_attr;
        unsigned int service;
        enum conn_state state;
-       int notify_flow_control_enabled;
-       int flow_control_state;
        int refcount;
        hdb_handle_t stats_handle;
 #if _POSIX_THREAD_PROCESS_SHARED < 1
        key_t semkey;
-       int semid;
 #endif
        unsigned int pending_semops;
        pthread_mutex_t mutex;
@@ -166,6 +166,7 @@
        unsigned int setup_bytes_read;
        struct list_head zcb_mapped_list_head;
        char *sending_allowed_private_data[64];
+       int poll_state;
 };
 
 static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -221,34 +222,6 @@
 {
 }
 
-static void sem_post_exit_thread (struct conn_info *conn_info)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#endif
-       int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semop:
-       res = sem_post (&conn_info->control_buffer->sem0);
-       if (res == -1 && errno == EINTR) {
-               api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-               goto retry_semop;
-       }
-#else
-       sop.sem_num = 0;
-       sop.sem_op = 1;
-       sop.sem_flg = 0;
-
-retry_semop:
-       res = semop (conn_info->semid, &sop, 1);
-       if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-               api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-               goto retry_semop;
-       }
-#endif
-}
-
 static int
 memory_map (
        const char *path,
@@ -383,6 +356,34 @@
        return (res);
 }
 
+static inline void flow_control_state_set (
+       struct conn_info *conn_info,
+       int flow_control_state)
+{
+       if (conn_info->control_buffer->flow_control_enabled == 
flow_control_state) {
+               return;
+       }
+       if (flow_control_state == 0) {
+               log_printf (LOGSYS_LEVEL_DEBUG,
+                       "Disabling flow control for %d\n",
+                       conn_info->client_pid);
+       } else
+       if (flow_control_state == 1) {
+               log_printf (LOGSYS_LEVEL_DEBUG,
+                       "Enabling flow control for %d\n",
+                       conn_info->client_pid);
+       }
+
+
+       conn_info->control_buffer->flow_control_enabled = flow_control_state;
+       api->stats_update_value (conn_info->stats_handle,
+               "flow_control",
+               &flow_control_state,
+               sizeof(flow_control_state));
+       api->stats_increment_value (conn_info->stats_handle,
+               "flow_control_count");
+}
+
 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
 {
        unsigned int res;
@@ -517,7 +518,7 @@
        }
 
        if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
-               sem_post_exit_thread (conn_info);
+               ipc_sem_post (conn_info->control_buffer, 
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
                return (0);
        }
 
@@ -546,11 +547,12 @@
        pthread_mutex_unlock (&conn_info->mutex);
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-       sem_destroy (&conn_info->control_buffer->sem0);
-       sem_destroy (&conn_info->control_buffer->sem1);
-       sem_destroy (&conn_info->control_buffer->sem2);
+       sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+       sem_destroy (&conn_info->control_buffer->sem_request);
+       sem_destroy (&conn_info->control_buffer->sem_response);
+       sem_destroy (&conn_info->control_buffer->sem_dispatch);
 #else
-       semctl (conn_info->semid, 0, IPC_RMID);
+       semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
 #endif
        /*
         * Destroy shared memory segment and semaphore
@@ -653,14 +655,12 @@
 static void *pthread_ipc_consumer (void *conn)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#endif
        int res;
        coroipc_request_header_t *header;
        coroipc_response_header_t coroipc_response_header;
        int send_ok;
        unsigned int new_message;
+       int sem_value = 0;
 
 #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
        if (api->sched_policy != 0) {
@@ -670,43 +670,28 @@
 #endif
 
        for (;;) {
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semwait:
-               res = sem_wait (&conn_info->control_buffer->sem0);
+               ipc_sem_wait (conn_info->control_buffer, 
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
                if (ipc_thread_active (conn_info) == 0) {
                        coroipcs_refcount_dec (conn_info);
                        pthread_exit (0);
                }
-               if ((res == -1) && (errno == EINTR)) {
-                       api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-                       goto retry_semwait;
-               }
-#else
 
-               sop.sem_num = 0;
-               sop.sem_op = -1;
-               sop.sem_flg = 0;
-retry_semop:
-               res = semop (conn_info->semid, &sop, 1);
-               if (ipc_thread_active (conn_info) == 0) {
-                       coroipcs_refcount_dec (conn_info);
-                       pthread_exit (0);
-               }
-               if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-                       api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-                       goto retry_semop;
-               } else
-               if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-                       coroipcs_refcount_dec (conn_info);
-                       pthread_exit (0);
-               }
-#endif
+               outq_flush (conn_info);
 
+               ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, 
&sem_value);
+               if (sem_value > 0) {
+               
+                       res = ipc_sem_wait (conn_info->control_buffer, 
SEMAPHORE_REQUEST);
+               } else {
+                       continue;
+               }
+       
                zerocopy_operations_process (conn_info, &header, &new_message);
                /*
                 * There is no new message to process, continue for loop
                 */
                if (new_message == 0) {
+printf ("continuing\n");
                        continue;
                }
 
@@ -738,7 +723,6 @@
                        /*
                         * Overload, tell library to retry
                         */
-                       api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
                        coroipc_response_header.size = sizeof 
(coroipc_response_header_t);
                        coroipc_response_header.id = 0;
                        coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -928,7 +912,7 @@
        conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
        pthread_mutex_unlock (&conn_info->mutex);
 
-       sem_post_exit_thread (conn_info);
+       ipc_sem_post (conn_info->control_buffer, 
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 }
 
 static int conn_info_create (int fd)
@@ -945,6 +929,7 @@
        conn_info->client_pid = 0;
        conn_info->service = SOCKET_SERVICE_INIT;
        conn_info->state = CONN_STATE_THREAD_INACTIVE;
+        conn_info->poll_state = POLL_STATE_IN;
        list_init (&conn_info->outq_head);
        list_init (&conn_info->list);
        list_init (&conn_info->zcb_mapped_list_head);
@@ -1103,11 +1088,12 @@
                ipc_disconnect (conn_info);
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-               sem_destroy (&conn_info->control_buffer->sem0);
-               sem_destroy (&conn_info->control_buffer->sem1);
-               sem_destroy (&conn_info->control_buffer->sem2);
+               sem_destroy 
(&conn_info->control_buffer->sem_request_or_flush_or_exit);
+               sem_destroy (&conn_info->control_buffer->sem_request);
+               sem_destroy (&conn_info->control_buffer->sem_response);
+               sem_destroy (&conn_info->control_buffer->sem_dispatch);
 #else
-               semctl (conn_info->semid, 0, IPC_RMID);
+               semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
 #endif
 
                /*
@@ -1181,33 +1167,11 @@
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#endif
-       int res;
 
        memcpy (conn_info->response_buffer, msg, mlen);
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-       res = sem_post (&conn_info->control_buffer->sem1);
-       if (res == -1) {
-               return (-1);
-       }
-#else
-       sop.sem_num = 1;
-       sop.sem_op = 1;
-       sop.sem_flg = 0;
+       ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
 
-retry_semop:
-       res = semop (conn_info->semid, &sop, 1);
-       if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-               api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-               goto retry_semop;
-       } else
-       if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-               return (0);
-       }
-#endif
        api->stats_increment_value (conn_info->stats_handle, "responses");
        return (0);
 }
@@ -1215,10 +1179,6 @@
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned 
int iov_len)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#endif
-       int res;
        int write_idx = 0;
        int i;
 
@@ -1228,26 +1188,8 @@
                write_idx += iov[i].iov_len;
        }
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-       res = sem_post (&conn_info->control_buffer->sem1);
-       if (res == -1) {
-               return (-1);
-       }
-#else
-       sop.sem_num = 1;
-       sop.sem_op = 1;
-       sop.sem_flg = 0;
+       ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
 
-retry_semop:
-       res = semop (conn_info->semid, &sop, 1);
-       if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-               api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-               goto retry_semop;
-       } else
-       if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-               return (0);
-       }
-#endif
        api->stats_increment_value (conn_info->stats_handle, "responses");
        return (0);
 }
@@ -1283,86 +1225,31 @@
        conn_info->control_buffer->write = (write_idx + len) % 
conn_info->dispatch_size;
 }
 
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
-       int new_fc = 0;
-
-       if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
-               event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-               new_fc = 1;
-       }
-
-       if (conn_info->flow_control_state != new_fc) {
-               if (new_fc == 1) {
-                       log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control 
for %d, event %d\n",
-                               conn_info->client_pid, event);
-               } else {
-                       log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control 
for %d, event %d\n",
-                               conn_info->client_pid, event);
-               }
-               conn_info->flow_control_state = new_fc;
-               api->stats_update_value (conn_info->stats_handle, 
"flow_control",
-                       &conn_info->flow_control_state,
-                       sizeof(conn_info->flow_control_state));
-               api->stats_increment_value (conn_info->stats_handle, 
"flow_control_count");
-       }
-
-       return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
 static void msg_send (void *conn, const struct iovec *iov, unsigned int 
iov_len,
                      int locked)
 {
        struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#endif
        int res;
        int i;
+       char buf;
 
        for (i = 0; i < iov_len; i++) {
                memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
        }
 
-       if (list_empty (&conn_info->outq_head))
-               res = flow_control_event_send (conn_info, 
MESSAGE_RES_OUTQ_EMPTY);
-       else
-               res = flow_control_event_send (conn_info, 
MESSAGE_RES_OUTQ_NOT_EMPTY);
-
-       if (res == -1 && errno == EAGAIN) {
-               if (locked == 0) {
-                       pthread_mutex_lock (&conn_info->mutex);
-               }
+       buf = list_empty (&conn_info->outq_head);
+       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+       if (res != 1) {
                conn_info->pending_semops += 1;
-               if (locked == 0) {
-                       pthread_mutex_unlock (&conn_info->mutex);
+               if (conn_info->poll_state == POLL_STATE_IN) {
+                       conn_info->poll_state = POLL_STATE_INOUT;
+                       api->poll_dispatch_modify (conn_info->fd,
+                               POLLIN|POLLOUT|POLLNVAL);
                }
-               api->poll_dispatch_modify (conn_info->fd,
-                       POLLIN|POLLOUT|POLLNVAL);
-       } else
-       if (res == -1) {
-               ipc_disconnect (conn_info);
        }
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-       res = sem_post (&conn_info->control_buffer->sem2);
-#else
-       sop.sem_num = 2;
-       sop.sem_op = 1;
-       sop.sem_flg = 0;
 
-retry_semop:
-       res = semop (conn_info->semid, &sop, 1);
-       if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-               api->stats_increment_value (conn_info->stats_handle, 
"sem_retry_count");
-               goto retry_semop;
-       } else
-       if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-               return;
-       }
-#endif
+       ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
+
        api->stats_increment_value (conn_info->stats_handle, "dispatched");
 }
 
@@ -1371,11 +1258,10 @@
        struct outq_item *outq_item;
        unsigned int bytes_left;
        struct iovec iov;
-       int res;
 
        pthread_mutex_lock (&conn_info->mutex);
        if (list_empty (&conn_info->outq_head)) {
-               res = flow_control_event_send (conn_info, 
MESSAGE_RES_OUTQ_FLUSH_NR);
+               flow_control_state_set (conn_info, 0);
                pthread_mutex_unlock (&conn_info->mutex);
                return;
        }
@@ -1441,7 +1327,7 @@
        semun.buf = &ipc_set;
 
        for (i = 0; i < 3; i++) {
-               res = semctl (conn_info->semid, 0, IPC_SET, semun);
+               res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, 
semun);
                if (res == -1) {
                        return (-1);
                }
@@ -1471,6 +1357,7 @@
                bytes_msg += iov[i].iov_len;
        }
        if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+               flow_control_state_set (conn_info, 1);
                outq_item = api->malloc (sizeof (struct outq_item));
                if (outq_item == NULL) {
                        ipc_disconnect (conn);
@@ -1491,11 +1378,6 @@
                outq_item->mlen = bytes_msg;
                list_init (&outq_item->list);
                pthread_mutex_lock (&conn_info->mutex);
-               if (list_empty (&conn_info->outq_head)) {
-                       conn_info->notify_flow_control_enabled = 1;
-                       api->poll_dispatch_modify (conn_info->fd,
-                               POLLIN|POLLOUT|POLLNVAL);
-               }
                list_add_tail (&outq_item->list, &conn_info->outq_head);
                pthread_mutex_unlock (&conn_info->mutex);
                api->stats_increment_value (conn_info->stats_handle, 
"queue_size");
@@ -1742,11 +1624,10 @@
 
                conn_info->service = req_setup->service;
                conn_info->refcount = 0;
-               conn_info->notify_flow_control_enabled = 0;
                conn_info->setup_bytes_read = 0;
 
 #if _POSIX_THREAD_PROCESS_SHARED < 1
-               conn_info->semid = semget (conn_info->semkey, 3, 0600);
+               conn_info->control_buffer->semid = semget (conn_info->semkey, 
3, 0600);
 #endif
                conn_info->pending_semops = 0;
 
@@ -1794,9 +1675,6 @@
                res = recv (fd, &buf, 1, MSG_NOSIGNAL);
                if (res == 1) {
                        switch (buf) {
-                       case MESSAGE_REQ_OUTQ_FLUSH:
-                               outq_flush (conn_info);
-                               break;
                        case MESSAGE_REQ_CHANGE_EUID:
                                if (priv_change (conn_info) == -1) {
                                        ipc_disconnect (conn_info);
@@ -1820,37 +1698,24 @@
                coroipcs_refcount_dec (conn_info);
        }
 
-       coroipcs_refcount_inc (conn_info);
-       pthread_mutex_lock (&conn_info->mutex);
-       if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & 
POLLOUT)) {
-               if (list_empty (&conn_info->outq_head))
-                       buf = MESSAGE_RES_OUTQ_EMPTY;
-               else
-                       buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+       if (revent & POLLOUT) {
+               int psop = conn_info->pending_semops;
+               int i;
 
-               for (; conn_info->pending_semops;) {
-                       res = flow_control_event_send (conn_info, buf);
-                       if (res == 1) {
-                               conn_info->pending_semops--;
+               assert (psop != 0);
+               for (i = 0; i < psop; i++) {
+                       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+                       if (res != 1) {
+                               return (0);
                        } else {
-                               break;
+                               conn_info->pending_semops -= 1;
                        }
                }
-               if (conn_info->notify_flow_control_enabled) {
-                       res = flow_control_event_send (conn_info, 
MESSAGE_RES_ENABLE_FLOWCONTROL);
-                       if (res == 1) {
-                               conn_info->notify_flow_control_enabled = 0;
-                       }
+               if (conn_info->poll_state == POLL_STATE_INOUT) {
+                       conn_info->poll_state = POLL_STATE_IN;
+                       api->poll_dispatch_modify (conn_info->fd, 
POLLIN|POLLNVAL);
                }
-               if (conn_info->notify_flow_control_enabled == 0 &&
-                       conn_info->pending_semops == 0) {
-
-                       api->poll_dispatch_modify (conn_info->fd,
-                               POLLIN|POLLNVAL);
-               }
        }
-       pthread_mutex_unlock (&conn_info->mutex);
-       coroipcs_refcount_dec (conn_info);
 
        return (0);
 }
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c      (revision 3000)
+++ lib/coroipcc.c      (working copy)
@@ -72,17 +72,8 @@
 
 #include "util.h"
 
-/*
- * Define sem_wait timeout (real timeout will be (n-1;n) )
- */
-#define IPC_SEMWAIT_TIMEOUT 2
-
 struct ipc_instance {
        int fd;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       int semid;
-#endif
-       int flow_control_state;
        struct control_buffer *control_buffer;
        char *request_buffer;
        char *response_buffer;
@@ -117,6 +108,23 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+static inline int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+       unsigned int n_read;
+       unsigned int n_write;
+       unsigned int bytes_left;
+
+       n_read = context->control_buffer->read;
+       n_write = context->control_buffer->write;
+
+       if (n_read <= n_write) {
+               bytes_left = context->dispatch_size - n_write + n_read;
+       } else {
+               bytes_left = n_read - n_write;
+       }
+       return (bytes_left);
+}
+
 static cs_error_t
 socket_send (
        int s,
@@ -238,10 +246,10 @@
        return (res);
 }
 
-#if _POSIX_THREAD_PROCESS_SHARED < 1
 static int
 priv_change_send (struct ipc_instance *ipc_instance)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
        char buf_req;
        mar_req_priv_change req_priv_change;
        unsigned int res;
@@ -268,19 +276,12 @@
        }
 
        ipc_instance->euid = req_priv_change.euid;
+#else
+       ipc_instance = NULL;
+#endif
        return (0);
 }
 
-#if defined(_SEM_SEMUN_UNDEFINED)
-union semun {
-        int val;
-        struct semid_ds *buf;
-        unsigned short int *array;
-        struct seminfo *__buf;
-};
-#endif
-#endif
-
 static int
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 {
@@ -471,10 +472,6 @@
        const struct iovec *iov,
        unsigned int iov_len)
 {
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#endif
-
        int i;
        int res;
        int req_buffer_idx = 0;
@@ -490,117 +487,18 @@
                req_buffer_idx += iov[i].iov_len;
        }
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-       res = sem_post (&ipc_instance->control_buffer->sem0);
-       if (res == -1) {
-               return (CS_ERR_LIBRARY);
-       }
-#else 
        /*
-        * Signal semaphore #0 indicting a new message from client
+        * Signal semaphore #3 and #0 indicting a new message from client
         * to server request queue
         */
-       sop.sem_num = 0;
-       sop.sem_op = 1;
-       sop.sem_flg = 0;
-
-retry_semop:
-       res = semop (ipc_instance->semid, &sop, 1);
-       if (res == -1 && errno == EINTR) {
-               return (CS_ERR_TRY_AGAIN);
-       } else
-       if (res == -1 && errno == EACCES) {
-               priv_change_send (ipc_instance);
-               goto retry_semop;
-       } else
-       if (res == -1) {
+       res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
+       if (res != CS_OK) {
                return (CS_ERR_LIBRARY);
        }
-#endif
-       return (CS_OK);
-}
-
-inline static cs_error_t
-ipc_sem_wait (
-       struct ipc_instance *ipc_instance,
-       int sem_num)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-       struct sembuf sop;
-#else
-       struct timespec timeout;
-       struct pollfd pfd;
-       sem_t *sem = NULL;
-#endif
-       int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-       switch (sem_num) {
-       case 0:
-               sem = &ipc_instance->control_buffer->sem0;
-               break;
-       case 1:
-               sem = &ipc_instance->control_buffer->sem1;
-               break;
-       case 2:
-               sem = &ipc_instance->control_buffer->sem2;
-               break;
-       }
-
-retry_semwait:
-       timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
-       timeout.tv_nsec = 0;
-
-       res = sem_timedwait (sem, &timeout);
-       if (res == -1 && errno == ETIMEDOUT) {
-               pfd.fd = ipc_instance->fd;
-               pfd.events = 0;
-
-               res = poll (&pfd, 1, 0);
-
-               if (res == -1 && errno == EINTR) {
-                       return (CS_ERR_TRY_AGAIN);
-               } else
-               if (res == -1) {
-                       return (CS_ERR_LIBRARY);
-               }
-
-               if (res == 1) {
-                       if (pfd.revents == POLLERR || pfd.revents == POLLHUP || 
pfd.revents == POLLNVAL) {
-                               return (CS_ERR_LIBRARY);
-                       }
-               }
-
-               goto retry_semwait;
-       } else
-       if (res == -1 && errno == EINTR) {
-               return (CS_ERR_TRY_AGAIN);
-       } else
-       if (res == -1) {
+       res = ipc_sem_post (ipc_instance->control_buffer, 
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+       if (res != CS_OK) {
                return (CS_ERR_LIBRARY);
        }
-#else
-       /*
-        * Wait for semaphore indicating a new message from server
-        * to client in queue
-        */
-       sop.sem_num = sem_num;
-       sop.sem_op = -1;
-       sop.sem_flg = 0;
-
-retry_semop:
-       res = semop (ipc_instance->semid, &sop, 1);
-       if (res == -1 && errno == EINTR) {
-               return (CS_ERR_TRY_AGAIN);
-       } else
-       if (res == -1 && errno == EACCES) {
-               priv_change_send (ipc_instance);
-               goto retry_semop;
-       } else
-       if (res == -1) {
-               return (CS_ERR_LIBRARY);
-       }
-#endif
        return (CS_OK);
 }
 
@@ -611,10 +509,17 @@
        size_t res_len)
 {
        coroipc_response_header_t *response_header;
-       cs_error_t err;
+       cs_error_t res;
 
-       if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
-               return (err);
+retry_ipc_sem_wait:
+       res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+       if (res != CS_OK) {
+               if (res == CS_ERR_TRY_AGAIN) {
+                       priv_change_send (ipc_instance);
+                       goto retry_ipc_sem_wait;
+               } else {
+                       return (res);
+               }
        }
 
        response_header = (coroipc_response_header_t 
*)ipc_instance->response_buffer;
@@ -631,10 +536,17 @@
        struct ipc_instance *ipc_instance,
        void **res_msg)
 {
-       cs_error_t err;
+       cs_error_t res;
 
-       if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
-               return (err);
+retry_ipc_sem_wait:
+       res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+       if (res != CS_OK) {
+               if (res == CS_ERR_TRY_AGAIN) {
+                       priv_change_send (ipc_instance);
+                       goto retry_ipc_sem_wait;
+               } else {
+                       return (res);
+               }
        }
 
        *res_msg = (char *)ipc_instance->response_buffer;
@@ -754,18 +666,22 @@
        }
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-       sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
-       sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
-       sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+       sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit, 
1, 0);
+       sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
+       sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
+       sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
 #else
+{
+       int i;
+
        /*
         * Allocate a semaphore segment
         */
        while (1) {
                semkey = random();
                ipc_instance->euid = geteuid ();
-               if ((ipc_instance->semid
-                    = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+               if ((ipc_instance->control_buffer->semid
+                    = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
                      break;
                }
                /*
@@ -781,18 +697,15 @@
                }
        }
 
-       semun.val = 0;
-       sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
-       if (sys_res != 0) {
-               res = CS_ERR_LIBRARY;
-               goto error_exit;
+       for (i = 0; i < 4; i++) {
+               semun.val = 0;
+               sys_res = semctl (ipc_instance->control_buffer->semid, i, 
SETVAL, semun);
+               if (sys_res != 0) {
+                       res = CS_ERR_LIBRARY;
+                       goto error_exit;
+               }
        }
-
-       sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
-       if (sys_res != 0) {
-               res = CS_ERR_LIBRARY;
-               goto error_exit;
-       }
+}
 #endif
 
        /*
@@ -822,7 +735,6 @@
        }
 
        ipc_instance->fd = request_fd;
-       ipc_instance->flow_control_state = 0;
 
        if (res_setup.error == CS_ERR_TRY_AGAIN) {
                res = res_setup.error;
@@ -842,8 +754,8 @@
 
 error_exit:
 #if _POSIX_THREAD_PROCESS_SHARED < 1
-       if (ipc_instance->semid > 0)
-               semctl (ipc_instance->semid, 0, IPC_RMID);
+       if (ipc_instance->control_buffer->semid > 0)
+               semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
 #endif
        memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
 error_dispatch_buffer:
@@ -893,7 +805,7 @@
                return (res);
        }
 
-       *flow_control_state = ipc_instance->flow_control_state;
+       *flow_control_state = 
ipc_instance->control_buffer->flow_control_enabled;
 
        hdb_handle_put (&ipc_hdb, handle);
        return (res);
@@ -928,10 +840,9 @@
        int poll_events;
        char buf;
        struct ipc_instance *ipc_instance;
-       int res;
-       char buf_two = 1;
        char *data_addr;
        cs_error_t error = CS_OK;
+       int res;
 
        error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void 
**)&ipc_instance));
        if (error != CS_OK) {
@@ -962,47 +873,22 @@
                goto error_put;
        }
 
-       res = recv (ipc_instance->fd, &buf, 1, 0);
-       if (res == -1 && errno == EINTR) {
-               error = CS_ERR_TRY_AGAIN;
-               goto error_put;
-       } else
-       if (res == -1) {
-               error = CS_ERR_LIBRARY;
-               goto error_put;
-       } else
-       if (res == 0) {
-               /* Means that the peer closed cleanly the socket. However, it 
should
-                * happen only on BSD and Darwing systems since poll() returns a
-                * POLLHUP event on other systems.
+       error = socket_recv (ipc_instance->fd, &buf, 1);
+       assert (error == CS_OK);
+
+       if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+               /*
+                * Notify coroipcs to flush any pending dispatch messages
                 */
-               error = CS_ERR_LIBRARY;
-               goto error_put;
+               
+               res = ipc_sem_post (ipc_instance->control_buffer, 
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+               if (res != CS_OK) {
+                       error = CS_ERR_LIBRARY;
+                       goto error_put;
+               }
+
+
        }
-       ipc_instance->flow_control_state = 0;
-       if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == 
MESSAGE_RES_ENABLE_FLOWCONTROL) {
-               ipc_instance->flow_control_state = 1;
-       }
-       /*
-        * Notify executive to flush any pending dispatch messages
-        */
-       if (ipc_instance->flow_control_state) {
-               buf_two = MESSAGE_REQ_OUTQ_FLUSH;
-               res = socket_send (ipc_instance->fd, &buf_two, 1);
-               assert (res == CS_OK); /* TODO */
-       }
-       /*
-        * This is just a notification of flow control starting at the addition
-        * of a new pending message, not a message to dispatch
-        */
-       if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-               error = CS_ERR_TRY_AGAIN;
-               goto error_put;
-       }
-       if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
-               error = CS_ERR_TRY_AGAIN;
-               goto error_put;
-       }
 
        data_addr = ipc_instance->dispatch_buffer;
 
@@ -1030,8 +916,15 @@
                return (res);
        }
 
-       if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
-               goto error_exit;
+retry_ipc_sem_wait:
+       res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
+       if (res != CS_OK) {
+               if (res == CS_ERR_TRY_AGAIN) {
+                       priv_change_send (ipc_instance);
+                       goto retry_ipc_sem_wait;
+               } else {
+                       goto error_exit;
+               }
        }
 
        addr = ipc_instance->dispatch_buffer;
@@ -1078,8 +971,8 @@
        res = reply_receive (ipc_instance, res_msg, res_len);
 
 error_exit:
-       hdb_handle_put (&ipc_hdb, handle);
        pthread_mutex_unlock (&ipc_instance->mutex);
+       hdb_handle_put (&ipc_hdb, handle);
 
        return (res);
 }
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to