On Wed, Jul 21, 2010 at 12:33:15AM -0700, Steven Dake wrote:
> Integrating comments from Angus and generally cleaning up the code a
> bit for maintenance purposes.

Much easier to read Steve.

Two very minor comments:
I am not convinced flow_control_state_set() needs to be inline.
Do we really need to assign enums values?
 BLA_BLA = 1,
 FOO_FOO = 2,
 ...

I'll have more of a look in the morning.

CTS is underway at the time of writing, looks ok so far.
(I'll let you know how it goes)

Angus

> 
> Regards
> -steve

> Index: include/corosync/coroipc_ipc.h
> ===================================================================
> --- include/corosync/coroipc_ipc.h    (revision 3000)
> +++ include/corosync/coroipc_ipc.h    (working copy)
> @@ -35,6 +35,9 @@
>  #define COROIPC_IPC_H_DEFINED
>  
>  #include <unistd.h>
> +#include <poll.h>
> +#include <time.h>
> +#include "corotypes.h"
>  #include "config.h"
>  
>  /*
> @@ -52,28 +55,40 @@
>  
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
>  #include <semaphore.h>
> +#else
> +#include <sys/sem.h>
>  #endif
>  
> +/*
> + * Define sem_wait timeout (real timeout will be (n-1;n) )
> + */
> +#define IPC_SEMWAIT_TIMEOUT 2
> +
>  enum req_init_types {
>       MESSAGE_REQ_RESPONSE_INIT = 0,
>       MESSAGE_REQ_DISPATCH_INIT = 1
>  };
>  
>  #define MESSAGE_REQ_CHANGE_EUID              1
> -#define MESSAGE_REQ_OUTQ_FLUSH               2
>  
> -#define MESSAGE_RES_OUTQ_EMPTY         0
> -#define MESSAGE_RES_OUTQ_NOT_EMPTY     1
> -#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
> -#define MESSAGE_RES_OUTQ_FLUSH_NR      3
> +enum ipc_semaphore_identifiers {
> +     SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT      = 0,
> +     SEMAPHORE_REQUEST                       = 1,
> +     SEMAPHORE_RESPONSE                      = 2,
> +     SEMAPHORE_DISPATCH                      = 3
> +};
>  
>  struct control_buffer {
>       unsigned int read;
>       unsigned int write;
> +     int flow_control_enabled;
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
> -     sem_t sem0;
> -     sem_t sem1;
> -     sem_t sem2;
> +     sem_t sem_request_or_flush_or_exit;
> +     sem_t sem_response;
> +     sem_t sem_dispatch;
> +     sem_t sem_request;
> +#else
> +     int semid;
>  #endif
>  };
>  
> @@ -145,4 +160,172 @@
>  #define ZC_FREE_HEADER               0xFFFFFFFE
>  #define ZC_EXECUTE_HEADER    0xFFFFFFFD
>  
> +static inline cs_error_t
> +ipc_sem_wait (
> +     struct control_buffer *control_buffer,
> +     enum ipc_semaphore_identifiers sem_id)
> +{
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> +     struct sembuf sop;
> +#else
> +     struct timespec timeout;
> +     sem_t *sem = NULL;
> +#endif
> +     int res;
> +
> +#if _POSIX_THREAD_PROCESS_SHARED > 0
> +     switch (sem_id) {
> +     case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
> +             sem = &control_buffer->sem_request_or_flush_or_exit;
> +             break;
> +     case SEMAPHORE_RESPONSE:
> +             sem = &control_buffer->sem_request;
> +             break;
> +     case SEMAPHORE_DISPATCH:
> +             sem = &control_buffer->sem_response;
> +             break;
> +     case SEMAPHORE_REQUEST:
> +             sem = &control_buffer->sem_dispatch;
> +             break;
> +     }
> +
> +     timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
> +     timeout.tv_nsec = 0;
> +
> +retry_sem_timedwait:
> +     res = sem_timedwait (sem, &timeout);
> +     if (res == -1 && errno == ETIMEDOUT) {
> +             return (CS_ERR_LIBRARY);
> +     } else
> +     if (res == -1 && errno == EINTR) {
> +             goto retry_sem_timedwait;
> +     } else
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +#else
> +     /*
> +      * Wait for semaphore indicating a new message from server
> +      * to client in queue
> +      */
> +     sop.sem_num = sem_id;
> +     sop.sem_op = -1;
> +     sop.sem_flg = 0;
> +
> +retry_semop:
> +     res = semop (control_buffer->semid, &sop, 1);
> +     if (res == -1 && errno == EINTR) {
> +             return (CS_ERR_TRY_AGAIN);
> +             goto retry_semop;
> +     } else
> +     if (res == -1 && errno == EACCES) {
> +             return (CS_ERR_TRY_AGAIN);
> +     } else
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +#endif
> +     return (CS_OK);
> +}
> +
> +static inline cs_error_t
> +ipc_sem_post (
> +     struct control_buffer *control_buffer,
> +     enum ipc_semaphore_identifiers sem_id)
> +{
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> +     struct sembuf sop;
> +#else
> +     sem_t *sem = NULL;
> +#endif
> +     int res;
> +     
> +#if _POSIX_THREAD_PROCESS_SHARED > 0
> +     switch (sem_id) {
> +     case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
> +             sem = &control_buffer->sem_request_or_flush_or_exit;
> +             break;
> +     case SEMAPHORE_RESPONSE:
> +             sem = &control_buffer->sem_request;
> +             break;
> +     case SEMAPHORE_DISPATCH:
> +             sem = &control_buffer->sem_response;
> +             break;
> +     case SEMAPHORE_REQUEST:
> +             sem = &control_buffer->sem_dispatch;
> +             break;
> +     }
> +
> +     res = sem_post (sem);
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +#else
> +     sop.sem_num = sem_id;
> +     sop.sem_op = 1;
> +     sop.sem_flg = 0;
> +
> +retry_semop:
> +     res = semop (control_buffer->semid, &sop, 1);
> +     if (res == -1 && errno == EINTR) {
> +             goto retry_semop;
> +     } else
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +#endif
> +     return (CS_OK);
> +}
> +
> +static inline cs_error_t
> +ipc_sem_getvalue (
> +     struct control_buffer *control_buffer,
> +     enum ipc_semaphore_identifiers sem_id,
> +     int *sem_value)
> +{
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> +     struct sembuf sop;
> +     int sem_value_hold;
> +#else
> +     sem_t *sem = NULL;
> +#endif
> +     
> +#if _POSIX_THREAD_PROCESS_SHARED > 0
> +     switch (sem_id) {
> +     case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
> +             sem = &control_buffer->sem_request_or_flush_or_exit;
> +             break;
> +     case SEMAPHORE_RESPONSE:
> +             sem = &control_buffer->sem_request;
> +             break;
> +     case SEMAPHORE_DISPATCH:
> +             sem = &control_buffer->sem_response;
> +             break;
> +     case SEMAPHORE_REQUEST:
> +             sem = &control_buffer->sem_dispatch;
> +             break;
> +     }
> +
> +     res = sem_getvalue (sem, sem_value);
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +#else
> +     sop.sem_num = sem_id;
> +     sop.sem_op = 1;
> +     sop.sem_flg = 0;
> +
> +retry_semctl:
> +     sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
> +     if (sem_value_hold == -1 && errno == EINTR) {
> +             goto retry_semctl;
> +     } else
> +     if (sem_value_hold == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +     *sem_value = sem_value_hold;
> +#endif
> +     return (CS_OK);
> +}
> +
>  #endif /* COROIPC_IPC_H_DEFINED */
> Index: exec/coroipcs.c
> ===================================================================
> --- exec/coroipcs.c   (revision 3000)
> +++ exec/coroipcs.c   (working copy)
> @@ -95,6 +95,9 @@
>  #define MSG_SEND_LOCKED              0
>  #define MSG_SEND_UNLOCKED    1
>  
> +#define POLL_STATE_IN                1
> +#define POLL_STATE_INOUT     2
> +
>  static struct coroipcs_init_state_v2 *api = NULL;
>  
>  DECLARE_LIST_INIT (conn_info_list_head);
> @@ -141,13 +144,10 @@
>       pthread_attr_t thread_attr;
>       unsigned int service;
>       enum conn_state state;
> -     int notify_flow_control_enabled;
> -     int flow_control_state;
>       int refcount;
>       hdb_handle_t stats_handle;
>  #if _POSIX_THREAD_PROCESS_SHARED < 1
>       key_t semkey;
> -     int semid;
>  #endif
>       unsigned int pending_semops;
>       pthread_mutex_t mutex;
> @@ -166,6 +166,7 @@
>       unsigned int setup_bytes_read;
>       struct list_head zcb_mapped_list_head;
>       char *sending_allowed_private_data[64];
> +     int poll_state;
>  };
>  
>  static int shared_mem_dispatch_bytes_left (const struct conn_info 
> *conn_info);
> @@ -221,34 +222,6 @@
>  {
>  }
>  
> -static void sem_post_exit_thread (struct conn_info *conn_info)
> -{
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#endif
> -     int res;
> -
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -retry_semop:
> -     res = sem_post (&conn_info->control_buffer->sem0);
> -     if (res == -1 && errno == EINTR) {
> -             api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -             goto retry_semop;
> -     }
> -#else
> -     sop.sem_num = 0;
> -     sop.sem_op = 1;
> -     sop.sem_flg = 0;
> -
> -retry_semop:
> -     res = semop (conn_info->semid, &sop, 1);
> -     if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> -             api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -             goto retry_semop;
> -     }
> -#endif
> -}
> -
>  static int
>  memory_map (
>       const char *path,
> @@ -383,6 +356,34 @@
>       return (res);
>  }
>  
> +static inline void flow_control_state_set (
> +     struct conn_info *conn_info,
> +     int flow_control_state)
> +{
> +     if (conn_info->control_buffer->flow_control_enabled == 
> flow_control_state) {
> +             return;
> +     }
> +     if (flow_control_state == 0) {
> +             log_printf (LOGSYS_LEVEL_DEBUG,
> +                     "Disabling flow control for %d\n",
> +                     conn_info->client_pid);
> +     } else
> +     if (flow_control_state == 1) {
> +             log_printf (LOGSYS_LEVEL_DEBUG,
> +                     "Enabling flow control for %d\n",
> +                     conn_info->client_pid);
> +     }
> +
> +
> +     conn_info->control_buffer->flow_control_enabled = flow_control_state;
> +     api->stats_update_value (conn_info->stats_handle,
> +             "flow_control",
> +             &flow_control_state,
> +             sizeof(flow_control_state));
> +     api->stats_increment_value (conn_info->stats_handle,
> +             "flow_control_count");
> +}
> +
>  static inline int zcb_free (struct zcb_mapped *zcb_mapped)
>  {
>       unsigned int res;
> @@ -517,7 +518,7 @@
>       }
>  
>       if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
> -             sem_post_exit_thread (conn_info);
> +             ipc_sem_post (conn_info->control_buffer, 
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>               return (0);
>       }
>  
> @@ -546,11 +547,12 @@
>       pthread_mutex_unlock (&conn_info->mutex);
>  
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
> -     sem_destroy (&conn_info->control_buffer->sem0);
> -     sem_destroy (&conn_info->control_buffer->sem1);
> -     sem_destroy (&conn_info->control_buffer->sem2);
> +     sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
> +     sem_destroy (&conn_info->control_buffer->sem_request);
> +     sem_destroy (&conn_info->control_buffer->sem_response);
> +     sem_destroy (&conn_info->control_buffer->sem_dispatch);
>  #else
> -     semctl (conn_info->semid, 0, IPC_RMID);
> +     semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
>  #endif
>       /*
>        * Destroy shared memory segment and semaphore
> @@ -653,14 +655,12 @@
>  static void *pthread_ipc_consumer (void *conn)
>  {
>       struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#endif
>       int res;
>       coroipc_request_header_t *header;
>       coroipc_response_header_t coroipc_response_header;
>       int send_ok;
>       unsigned int new_message;
> +     int sem_value = 0;
>  
>  #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && 
> defined(HAVE_SCHED_GET_PRIORITY_MAX)
>       if (api->sched_policy != 0) {
> @@ -670,43 +670,28 @@
>  #endif
>  
>       for (;;) {
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -retry_semwait:
> -             res = sem_wait (&conn_info->control_buffer->sem0);
> +             ipc_sem_wait (conn_info->control_buffer, 
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>               if (ipc_thread_active (conn_info) == 0) {
>                       coroipcs_refcount_dec (conn_info);
>                       pthread_exit (0);
>               }
> -             if ((res == -1) && (errno == EINTR)) {
> -                     api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -                     goto retry_semwait;
> -             }
> -#else
>  
> -             sop.sem_num = 0;
> -             sop.sem_op = -1;
> -             sop.sem_flg = 0;
> -retry_semop:
> -             res = semop (conn_info->semid, &sop, 1);
> -             if (ipc_thread_active (conn_info) == 0) {
> -                     coroipcs_refcount_dec (conn_info);
> -                     pthread_exit (0);
> -             }
> -             if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> -                     api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -                     goto retry_semop;
> -             } else
> -             if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> -                     coroipcs_refcount_dec (conn_info);
> -                     pthread_exit (0);
> -             }
> -#endif
> +             outq_flush (conn_info);
>  
> +             ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, 
> &sem_value);
> +             if (sem_value > 0) {
> +             
> +                     res = ipc_sem_wait (conn_info->control_buffer, 
> SEMAPHORE_REQUEST);
> +             } else {
> +                     continue;
> +             }
> +     
>               zerocopy_operations_process (conn_info, &header, &new_message);
>               /*
>                * There is no new message to process, continue for loop
>                */
>               if (new_message == 0) {
> +printf ("continuing\n");
>                       continue;
>               }
>  
> @@ -738,7 +723,6 @@
>                       /*
>                        * Overload, tell library to retry
>                        */
> -                     api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
>                       coroipc_response_header.size = sizeof 
> (coroipc_response_header_t);
>                       coroipc_response_header.id = 0;
>                       coroipc_response_header.error = CS_ERR_TRY_AGAIN;
> @@ -928,7 +912,7 @@
>       conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
>       pthread_mutex_unlock (&conn_info->mutex);
>  
> -     sem_post_exit_thread (conn_info);
> +     ipc_sem_post (conn_info->control_buffer, 
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>  }
>  
>  static int conn_info_create (int fd)
> @@ -945,6 +929,7 @@
>       conn_info->client_pid = 0;
>       conn_info->service = SOCKET_SERVICE_INIT;
>       conn_info->state = CONN_STATE_THREAD_INACTIVE;
> +        conn_info->poll_state = POLL_STATE_IN;
>       list_init (&conn_info->outq_head);
>       list_init (&conn_info->list);
>       list_init (&conn_info->zcb_mapped_list_head);
> @@ -1103,11 +1088,12 @@
>               ipc_disconnect (conn_info);
>  
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
> -             sem_destroy (&conn_info->control_buffer->sem0);
> -             sem_destroy (&conn_info->control_buffer->sem1);
> -             sem_destroy (&conn_info->control_buffer->sem2);
> +             sem_destroy 
> (&conn_info->control_buffer->sem_request_or_flush_or_exit);
> +             sem_destroy (&conn_info->control_buffer->sem_request);
> +             sem_destroy (&conn_info->control_buffer->sem_response);
> +             sem_destroy (&conn_info->control_buffer->sem_dispatch);
>  #else
> -             semctl (conn_info->semid, 0, IPC_RMID);
> +             semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
>  #endif
>  
>               /*
> @@ -1181,33 +1167,11 @@
>  int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
>  {
>       struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#endif
> -     int res;
>  
>       memcpy (conn_info->response_buffer, msg, mlen);
>  
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -     res = sem_post (&conn_info->control_buffer->sem1);
> -     if (res == -1) {
> -             return (-1);
> -     }
> -#else
> -     sop.sem_num = 1;
> -     sop.sem_op = 1;
> -     sop.sem_flg = 0;
> +     ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
>  
> -retry_semop:
> -     res = semop (conn_info->semid, &sop, 1);
> -     if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> -             api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -             goto retry_semop;
> -     } else
> -     if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> -             return (0);
> -     }
> -#endif
>       api->stats_increment_value (conn_info->stats_handle, "responses");
>       return (0);
>  }
> @@ -1215,10 +1179,6 @@
>  int coroipcs_response_iov_send (void *conn, const struct iovec *iov, 
> unsigned int iov_len)
>  {
>       struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#endif
> -     int res;
>       int write_idx = 0;
>       int i;
>  
> @@ -1228,26 +1188,8 @@
>               write_idx += iov[i].iov_len;
>       }
>  
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -     res = sem_post (&conn_info->control_buffer->sem1);
> -     if (res == -1) {
> -             return (-1);
> -     }
> -#else
> -     sop.sem_num = 1;
> -     sop.sem_op = 1;
> -     sop.sem_flg = 0;
> +     ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
>  
> -retry_semop:
> -     res = semop (conn_info->semid, &sop, 1);
> -     if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> -             api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -             goto retry_semop;
> -     } else
> -     if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> -             return (0);
> -     }
> -#endif
>       api->stats_increment_value (conn_info->stats_handle, "responses");
>       return (0);
>  }
> @@ -1283,86 +1225,31 @@
>       conn_info->control_buffer->write = (write_idx + len) % 
> conn_info->dispatch_size;
>  }
>  
> -/**
> - * simulate the behaviour in coroipcc.c
> - */
> -static int flow_control_event_send (struct conn_info *conn_info, char event)
> -{
> -     int new_fc = 0;
> -
> -     if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
> -             event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> -             new_fc = 1;
> -     }
> -
> -     if (conn_info->flow_control_state != new_fc) {
> -             if (new_fc == 1) {
> -                     log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control 
> for %d, event %d\n",
> -                             conn_info->client_pid, event);
> -             } else {
> -                     log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control 
> for %d, event %d\n",
> -                             conn_info->client_pid, event);
> -             }
> -             conn_info->flow_control_state = new_fc;
> -             api->stats_update_value (conn_info->stats_handle, 
> "flow_control",
> -                     &conn_info->flow_control_state,
> -                     sizeof(conn_info->flow_control_state));
> -             api->stats_increment_value (conn_info->stats_handle, 
> "flow_control_count");
> -     }
> -
> -     return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
> -}
> -
>  static void msg_send (void *conn, const struct iovec *iov, unsigned int 
> iov_len,
>                     int locked)
>  {
>       struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#endif
>       int res;
>       int i;
> +     char buf;
>  
>       for (i = 0; i < iov_len; i++) {
>               memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
>       }
>  
> -     if (list_empty (&conn_info->outq_head))
> -             res = flow_control_event_send (conn_info, 
> MESSAGE_RES_OUTQ_EMPTY);
> -     else
> -             res = flow_control_event_send (conn_info, 
> MESSAGE_RES_OUTQ_NOT_EMPTY);
> -
> -     if (res == -1 && errno == EAGAIN) {
> -             if (locked == 0) {
> -                     pthread_mutex_lock (&conn_info->mutex);
> -             }
> +     buf = list_empty (&conn_info->outq_head);
> +     res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> +     if (res != 1) {
>               conn_info->pending_semops += 1;
> -             if (locked == 0) {
> -                     pthread_mutex_unlock (&conn_info->mutex);
> +             if (conn_info->poll_state == POLL_STATE_IN) {
> +                     conn_info->poll_state = POLL_STATE_INOUT;
> +                     api->poll_dispatch_modify (conn_info->fd,
> +                             POLLIN|POLLOUT|POLLNVAL);
>               }
> -             api->poll_dispatch_modify (conn_info->fd,
> -                     POLLIN|POLLOUT|POLLNVAL);
> -     } else
> -     if (res == -1) {
> -             ipc_disconnect (conn_info);
>       }
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -     res = sem_post (&conn_info->control_buffer->sem2);
> -#else
> -     sop.sem_num = 2;
> -     sop.sem_op = 1;
> -     sop.sem_flg = 0;
>  
> -retry_semop:
> -     res = semop (conn_info->semid, &sop, 1);
> -     if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> -             api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
> -             goto retry_semop;
> -     } else
> -     if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> -             return;
> -     }
> -#endif
> +     ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
> +
>       api->stats_increment_value (conn_info->stats_handle, "dispatched");
>  }
>  
> @@ -1371,11 +1258,10 @@
>       struct outq_item *outq_item;
>       unsigned int bytes_left;
>       struct iovec iov;
> -     int res;
>  
>       pthread_mutex_lock (&conn_info->mutex);
>       if (list_empty (&conn_info->outq_head)) {
> -             res = flow_control_event_send (conn_info, 
> MESSAGE_RES_OUTQ_FLUSH_NR);
> +             flow_control_state_set (conn_info, 0);
>               pthread_mutex_unlock (&conn_info->mutex);
>               return;
>       }
> @@ -1441,7 +1327,7 @@
>       semun.buf = &ipc_set;
>  
>       for (i = 0; i < 3; i++) {
> -             res = semctl (conn_info->semid, 0, IPC_SET, semun);
> +             res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, 
> semun);
>               if (res == -1) {
>                       return (-1);
>               }
> @@ -1471,6 +1357,7 @@
>               bytes_msg += iov[i].iov_len;
>       }
>       if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
> +             flow_control_state_set (conn_info, 1);
>               outq_item = api->malloc (sizeof (struct outq_item));
>               if (outq_item == NULL) {
>                       ipc_disconnect (conn);
> @@ -1491,11 +1378,6 @@
>               outq_item->mlen = bytes_msg;
>               list_init (&outq_item->list);
>               pthread_mutex_lock (&conn_info->mutex);
> -             if (list_empty (&conn_info->outq_head)) {
> -                     conn_info->notify_flow_control_enabled = 1;
> -                     api->poll_dispatch_modify (conn_info->fd,
> -                             POLLIN|POLLOUT|POLLNVAL);
> -             }
>               list_add_tail (&outq_item->list, &conn_info->outq_head);
>               pthread_mutex_unlock (&conn_info->mutex);
>               api->stats_increment_value (conn_info->stats_handle, 
> "queue_size");
> @@ -1742,11 +1624,10 @@
>  
>               conn_info->service = req_setup->service;
>               conn_info->refcount = 0;
> -             conn_info->notify_flow_control_enabled = 0;
>               conn_info->setup_bytes_read = 0;
>  
>  #if _POSIX_THREAD_PROCESS_SHARED < 1
> -             conn_info->semid = semget (conn_info->semkey, 3, 0600);
> +             conn_info->control_buffer->semid = semget (conn_info->semkey, 
> 3, 0600);
>  #endif
>               conn_info->pending_semops = 0;
>  
> @@ -1794,9 +1675,6 @@
>               res = recv (fd, &buf, 1, MSG_NOSIGNAL);
>               if (res == 1) {
>                       switch (buf) {
> -                     case MESSAGE_REQ_OUTQ_FLUSH:
> -                             outq_flush (conn_info);
> -                             break;
>                       case MESSAGE_REQ_CHANGE_EUID:
>                               if (priv_change (conn_info) == -1) {
>                                       ipc_disconnect (conn_info);
> @@ -1820,37 +1698,24 @@
>               coroipcs_refcount_dec (conn_info);
>       }
>  
> -     coroipcs_refcount_inc (conn_info);
> -     pthread_mutex_lock (&conn_info->mutex);
> -     if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & 
> POLLOUT)) {
> -             if (list_empty (&conn_info->outq_head))
> -                     buf = MESSAGE_RES_OUTQ_EMPTY;
> -             else
> -                     buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
> +     if (revent & POLLOUT) {
> +             int psop = conn_info->pending_semops;
> +             int i;
>  
> -             for (; conn_info->pending_semops;) {
> -                     res = flow_control_event_send (conn_info, buf);
> -                     if (res == 1) {
> -                             conn_info->pending_semops--;
> +             assert (psop != 0);
> +             for (i = 0; i < psop; i++) {
> +                     res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> +                     if (res != 1) {
> +                             return (0);
>                       } else {
> -                             break;
> +                             conn_info->pending_semops -= 1;
>                       }
>               }
> -             if (conn_info->notify_flow_control_enabled) {
> -                     res = flow_control_event_send (conn_info, 
> MESSAGE_RES_ENABLE_FLOWCONTROL);
> -                     if (res == 1) {
> -                             conn_info->notify_flow_control_enabled = 0;
> -                     }
> +             if (conn_info->poll_state == POLL_STATE_INOUT) {
> +                     conn_info->poll_state = POLL_STATE_IN;
> +                     api->poll_dispatch_modify (conn_info->fd, 
> POLLIN|POLLNVAL);
>               }
> -             if (conn_info->notify_flow_control_enabled == 0 &&
> -                     conn_info->pending_semops == 0) {
> -
> -                     api->poll_dispatch_modify (conn_info->fd,
> -                             POLLIN|POLLNVAL);
> -             }
>       }
> -     pthread_mutex_unlock (&conn_info->mutex);
> -     coroipcs_refcount_dec (conn_info);
>  
>       return (0);
>  }
> Index: lib/coroipcc.c
> ===================================================================
> --- lib/coroipcc.c    (revision 3000)
> +++ lib/coroipcc.c    (working copy)
> @@ -72,17 +72,8 @@
>  
>  #include "util.h"
>  
> -/*
> - * Define sem_wait timeout (real timeout will be (n-1;n) )
> - */
> -#define IPC_SEMWAIT_TIMEOUT 2
> -
>  struct ipc_instance {
>       int fd;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     int semid;
> -#endif
> -     int flow_control_state;
>       struct control_buffer *control_buffer;
>       char *request_buffer;
>       char *response_buffer;
> @@ -117,6 +108,23 @@
>  #define MSG_NOSIGNAL 0
>  #endif
>  
> +static inline int shared_mem_dispatch_bytes_left (struct ipc_instance 
> *context)
> +{
> +     unsigned int n_read;
> +     unsigned int n_write;
> +     unsigned int bytes_left;
> +
> +     n_read = context->control_buffer->read;
> +     n_write = context->control_buffer->write;
> +
> +     if (n_read <= n_write) {
> +             bytes_left = context->dispatch_size - n_write + n_read;
> +     } else {
> +             bytes_left = n_read - n_write;
> +     }
> +     return (bytes_left);
> +}
> +
>  static cs_error_t
>  socket_send (
>       int s,
> @@ -238,10 +246,10 @@
>       return (res);
>  }
>  
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
>  static int
>  priv_change_send (struct ipc_instance *ipc_instance)
>  {
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
>       char buf_req;
>       mar_req_priv_change req_priv_change;
>       unsigned int res;
> @@ -268,19 +276,12 @@
>       }
>  
>       ipc_instance->euid = req_priv_change.euid;
> +#else
> +     ipc_instance = NULL;
> +#endif
>       return (0);
>  }
>  
> -#if defined(_SEM_SEMUN_UNDEFINED)
> -union semun {
> -        int val;
> -        struct semid_ds *buf;
> -        unsigned short int *array;
> -        struct seminfo *__buf;
> -};
> -#endif
> -#endif
> -
>  static int
>  circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
>  {
> @@ -471,10 +472,6 @@
>       const struct iovec *iov,
>       unsigned int iov_len)
>  {
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#endif
> -
>       int i;
>       int res;
>       int req_buffer_idx = 0;
> @@ -490,117 +487,18 @@
>               req_buffer_idx += iov[i].iov_len;
>       }
>  
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -     res = sem_post (&ipc_instance->control_buffer->sem0);
> -     if (res == -1) {
> -             return (CS_ERR_LIBRARY);
> -     }
> -#else 
>       /*
> -      * Signal semaphore #0 indicting a new message from client
> +      * Signal semaphore #3 and #0 indicting a new message from client
>        * to server request queue
>        */
> -     sop.sem_num = 0;
> -     sop.sem_op = 1;
> -     sop.sem_flg = 0;
> -
> -retry_semop:
> -     res = semop (ipc_instance->semid, &sop, 1);
> -     if (res == -1 && errno == EINTR) {
> -             return (CS_ERR_TRY_AGAIN);
> -     } else
> -     if (res == -1 && errno == EACCES) {
> -             priv_change_send (ipc_instance);
> -             goto retry_semop;
> -     } else
> -     if (res == -1) {
> +     res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
> +     if (res != CS_OK) {
>               return (CS_ERR_LIBRARY);
>       }
> -#endif
> -     return (CS_OK);
> -}
> -
> -inline static cs_error_t
> -ipc_sem_wait (
> -     struct ipc_instance *ipc_instance,
> -     int sem_num)
> -{
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> -     struct sembuf sop;
> -#else
> -     struct timespec timeout;
> -     struct pollfd pfd;
> -     sem_t *sem = NULL;
> -#endif
> -     int res;
> -
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -     switch (sem_num) {
> -     case 0:
> -             sem = &ipc_instance->control_buffer->sem0;
> -             break;
> -     case 1:
> -             sem = &ipc_instance->control_buffer->sem1;
> -             break;
> -     case 2:
> -             sem = &ipc_instance->control_buffer->sem2;
> -             break;
> -     }
> -
> -retry_semwait:
> -     timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
> -     timeout.tv_nsec = 0;
> -
> -     res = sem_timedwait (sem, &timeout);
> -     if (res == -1 && errno == ETIMEDOUT) {
> -             pfd.fd = ipc_instance->fd;
> -             pfd.events = 0;
> -
> -             res = poll (&pfd, 1, 0);
> -
> -             if (res == -1 && errno == EINTR) {
> -                     return (CS_ERR_TRY_AGAIN);
> -             } else
> -             if (res == -1) {
> -                     return (CS_ERR_LIBRARY);
> -             }
> -
> -             if (res == 1) {
> -                     if (pfd.revents == POLLERR || pfd.revents == POLLHUP || 
> pfd.revents == POLLNVAL) {
> -                             return (CS_ERR_LIBRARY);
> -                     }
> -             }
> -
> -             goto retry_semwait;
> -     } else
> -     if (res == -1 && errno == EINTR) {
> -             return (CS_ERR_TRY_AGAIN);
> -     } else
> -     if (res == -1) {
> +     res = ipc_sem_post (ipc_instance->control_buffer, 
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> +     if (res != CS_OK) {
>               return (CS_ERR_LIBRARY);
>       }
> -#else
> -     /*
> -      * Wait for semaphore indicating a new message from server
> -      * to client in queue
> -      */
> -     sop.sem_num = sem_num;
> -     sop.sem_op = -1;
> -     sop.sem_flg = 0;
> -
> -retry_semop:
> -     res = semop (ipc_instance->semid, &sop, 1);
> -     if (res == -1 && errno == EINTR) {
> -             return (CS_ERR_TRY_AGAIN);
> -     } else
> -     if (res == -1 && errno == EACCES) {
> -             priv_change_send (ipc_instance);
> -             goto retry_semop;
> -     } else
> -     if (res == -1) {
> -             return (CS_ERR_LIBRARY);
> -     }
> -#endif
>       return (CS_OK);
>  }
>  
> @@ -611,10 +509,17 @@
>       size_t res_len)
>  {
>       coroipc_response_header_t *response_header;
> -     cs_error_t err;
> +     cs_error_t res;
>  
> -     if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
> -             return (err);
> +retry_ipc_sem_wait:
> +     res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
> +     if (res != CS_OK) {
> +             if (res == CS_ERR_TRY_AGAIN) {
> +                     priv_change_send (ipc_instance);
> +                     goto retry_ipc_sem_wait;
> +             } else {
> +                     return (res);
> +             }
>       }
>  
>       response_header = (coroipc_response_header_t 
> *)ipc_instance->response_buffer;
> @@ -631,10 +536,17 @@
>       struct ipc_instance *ipc_instance,
>       void **res_msg)
>  {
> -     cs_error_t err;
> +     cs_error_t res;
>  
> -     if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
> -             return (err);
> +retry_ipc_sem_wait:
> +     res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
> +     if (res != CS_OK) {
> +             if (res == CS_ERR_TRY_AGAIN) {
> +                     priv_change_send (ipc_instance);
> +                     goto retry_ipc_sem_wait;
> +             } else {
> +                     return (res);
> +             }
>       }
>  
>       *res_msg = (char *)ipc_instance->response_buffer;
> @@ -754,18 +666,22 @@
>       }
>  
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
> -     sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
> -     sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
> -     sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
> +     sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit, 
> 1, 0);
> +     sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
> +     sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
> +     sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
>  #else
> +{
> +     int i;
> +
>       /*
>        * Allocate a semaphore segment
>        */
>       while (1) {
>               semkey = random();
>               ipc_instance->euid = geteuid ();
> -             if ((ipc_instance->semid
> -                  = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
> +             if ((ipc_instance->control_buffer->semid
> +                  = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
>                     break;
>               }
>               /*
> @@ -781,18 +697,15 @@
>               }
>       }
>  
> -     semun.val = 0;
> -     sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
> -     if (sys_res != 0) {
> -             res = CS_ERR_LIBRARY;
> -             goto error_exit;
> +     for (i = 0; i < 4; i++) {
> +             semun.val = 0;
> +             sys_res = semctl (ipc_instance->control_buffer->semid, i, 
> SETVAL, semun);
> +             if (sys_res != 0) {
> +                     res = CS_ERR_LIBRARY;
> +                     goto error_exit;
> +             }
>       }
> -
> -     sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
> -     if (sys_res != 0) {
> -             res = CS_ERR_LIBRARY;
> -             goto error_exit;
> -     }
> +}
>  #endif
>  
>       /*
> @@ -822,7 +735,6 @@
>       }
>  
>       ipc_instance->fd = request_fd;
> -     ipc_instance->flow_control_state = 0;
>  
>       if (res_setup.error == CS_ERR_TRY_AGAIN) {
>               res = res_setup.error;
> @@ -842,8 +754,8 @@
>  
>  error_exit:
>  #if _POSIX_THREAD_PROCESS_SHARED < 1
> -     if (ipc_instance->semid > 0)
> -             semctl (ipc_instance->semid, 0, IPC_RMID);
> +     if (ipc_instance->control_buffer->semid > 0)
> +             semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
>  #endif
>       memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
>  error_dispatch_buffer:
> @@ -893,7 +805,7 @@
>               return (res);
>       }
>  
> -     *flow_control_state = ipc_instance->flow_control_state;
> +     *flow_control_state = 
> ipc_instance->control_buffer->flow_control_enabled;
>  
>       hdb_handle_put (&ipc_hdb, handle);
>       return (res);
> @@ -928,10 +840,9 @@
>       int poll_events;
>       char buf;
>       struct ipc_instance *ipc_instance;
> -     int res;
> -     char buf_two = 1;
>       char *data_addr;
>       cs_error_t error = CS_OK;
> +     int res;
>  
>       error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void 
> **)&ipc_instance));
>       if (error != CS_OK) {
> @@ -962,47 +873,22 @@
>               goto error_put;
>       }
>  
> -     res = recv (ipc_instance->fd, &buf, 1, 0);
> -     if (res == -1 && errno == EINTR) {
> -             error = CS_ERR_TRY_AGAIN;
> -             goto error_put;
> -     } else
> -     if (res == -1) {
> -             error = CS_ERR_LIBRARY;
> -             goto error_put;
> -     } else
> -     if (res == 0) {
> -             /* Means that the peer closed cleanly the socket. However, it 
> should
> -              * happen only on BSD and Darwing systems since poll() returns a
> -              * POLLHUP event on other systems.
> +     error = socket_recv (ipc_instance->fd, &buf, 1);
> +     assert (error == CS_OK);
> +
> +     if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
> +             /*
> +              * Notify coroipcs to flush any pending dispatch messages
>                */
> -             error = CS_ERR_LIBRARY;
> -             goto error_put;
> +             
> +             res = ipc_sem_post (ipc_instance->control_buffer, 
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> +             if (res != CS_OK) {
> +                     error = CS_ERR_LIBRARY;
> +                     goto error_put;
> +             }
> +
> +
>       }
> -     ipc_instance->flow_control_state = 0;
> -     if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == 
> MESSAGE_RES_ENABLE_FLOWCONTROL) {
> -             ipc_instance->flow_control_state = 1;
> -     }
> -     /*
> -      * Notify executive to flush any pending dispatch messages
> -      */
> -     if (ipc_instance->flow_control_state) {
> -             buf_two = MESSAGE_REQ_OUTQ_FLUSH;
> -             res = socket_send (ipc_instance->fd, &buf_two, 1);
> -             assert (res == CS_OK); /* TODO */
> -     }
> -     /*
> -      * This is just a notification of flow control starting at the addition
> -      * of a new pending message, not a message to dispatch
> -      */
> -     if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> -             error = CS_ERR_TRY_AGAIN;
> -             goto error_put;
> -     }
> -     if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
> -             error = CS_ERR_TRY_AGAIN;
> -             goto error_put;
> -     }
>  
>       data_addr = ipc_instance->dispatch_buffer;
>  
> @@ -1030,8 +916,15 @@
>               return (res);
>       }
>  
> -     if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
> -             goto error_exit;
> +retry_ipc_sem_wait:
> +     res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
> +     if (res != CS_OK) {
> +             if (res == CS_ERR_TRY_AGAIN) {
> +                     priv_change_send (ipc_instance);
> +                     goto retry_ipc_sem_wait;
> +             } else {
> +                     goto error_exit;
> +             }
>       }
>  
>       addr = ipc_instance->dispatch_buffer;
> @@ -1078,8 +971,8 @@
>       res = reply_receive (ipc_instance, res_msg, res_len);
>  
>  error_exit:
> -     hdb_handle_put (&ipc_hdb, handle);
>       pthread_mutex_unlock (&ipc_instance->mutex);
> +     hdb_handle_put (&ipc_hdb, handle);
>  
>       return (res);
>  }

> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to