On 07/21/2010 05:05 AM, Angus Salkeld wrote:
> On Wed, Jul 21, 2010 at 12:33:15AM -0700, Steven Dake wrote:
>> Integrating comments from Angus and generally cleaning up the code a
>> bit for maintenance purposes.
>
> Much easier to read Steve.
>
> Two very minor comments:
> I am not convinced flow_control_state_set() needs to be inline.
ya i'll change that.
> Do we really need to assign enums values?
> BLA_BLA = 1,
> FOO_FOO = 2,
> ...
c standard doesn't require it, but I prefer to know exactly what values
each enum has in this case, since the sysv semaphore implementation is
dependent on the numbers ranging from 0..3.
>
> I'll have more of a look in the morning.
>
> CTS is underway at the time of writing, looks ok so far.
> (I'll let you know how it goes)
>
> Angus
>
>>
>> Regards
>> -steve
>
>> Index: include/corosync/coroipc_ipc.h
>> ===================================================================
>> --- include/corosync/coroipc_ipc.h (revision 3000)
>> +++ include/corosync/coroipc_ipc.h (working copy)
>> @@ -35,6 +35,9 @@
>> #define COROIPC_IPC_H_DEFINED
>>
>> #include<unistd.h>
>> +#include<poll.h>
>> +#include<time.h>
>> +#include "corotypes.h"
>> #include "config.h"
>>
>> /*
>> @@ -52,28 +55,40 @@
>>
>> #if _POSIX_THREAD_PROCESS_SHARED> 0
>> #include<semaphore.h>
>> +#else
>> +#include<sys/sem.h>
>> #endif
>>
>> +/*
>> + * Define sem_wait timeout (real timeout will be (n-1;n) )
>> + */
>> +#define IPC_SEMWAIT_TIMEOUT 2
>> +
>> enum req_init_types {
>> MESSAGE_REQ_RESPONSE_INIT = 0,
>> MESSAGE_REQ_DISPATCH_INIT = 1
>> };
>>
>> #define MESSAGE_REQ_CHANGE_EUID 1
>> -#define MESSAGE_REQ_OUTQ_FLUSH 2
>>
>> -#define MESSAGE_RES_OUTQ_EMPTY 0
>> -#define MESSAGE_RES_OUTQ_NOT_EMPTY 1
>> -#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
>> -#define MESSAGE_RES_OUTQ_FLUSH_NR 3
>> +enum ipc_semaphore_identifiers {
>> + SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT = 0,
>> + SEMAPHORE_REQUEST = 1,
>> + SEMAPHORE_RESPONSE = 2,
>> + SEMAPHORE_DISPATCH = 3
>> +};
>>
>> struct control_buffer {
>> unsigned int read;
>> unsigned int write;
>> + int flow_control_enabled;
>> #if _POSIX_THREAD_PROCESS_SHARED> 0
>> - sem_t sem0;
>> - sem_t sem1;
>> - sem_t sem2;
>> + sem_t sem_request_or_flush_or_exit;
>> + sem_t sem_response;
>> + sem_t sem_dispatch;
>> + sem_t sem_request;
>> +#else
>> + int semid;
>> #endif
>> };
>>
>> @@ -145,4 +160,172 @@
>> #define ZC_FREE_HEADER 0xFFFFFFFE
>> #define ZC_EXECUTE_HEADER 0xFFFFFFFD
>>
>> +static inline cs_error_t
>> +ipc_sem_wait (
>> + struct control_buffer *control_buffer,
>> + enum ipc_semaphore_identifiers sem_id)
>> +{
>> +#if _POSIX_THREAD_PROCESS_SHARED< 1
>> + struct sembuf sop;
>> +#else
>> + struct timespec timeout;
>> + sem_t *sem = NULL;
>> +#endif
>> + int res;
>> +
>> +#if _POSIX_THREAD_PROCESS_SHARED> 0
>> + switch (sem_id) {
>> + case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
>> + sem =&control_buffer->sem_request_or_flush_or_exit;
>> + break;
>> + case SEMAPHORE_RESPONSE:
>> + sem =&control_buffer->sem_request;
>> + break;
>> + case SEMAPHORE_DISPATCH:
>> + sem =&control_buffer->sem_response;
>> + break;
>> + case SEMAPHORE_REQUEST:
>> + sem =&control_buffer->sem_dispatch;
>> + break;
>> + }
>> +
>> + timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
>> + timeout.tv_nsec = 0;
>> +
>> +retry_sem_timedwait:
>> + res = sem_timedwait (sem,&timeout);
>> + if (res == -1&& errno == ETIMEDOUT) {
>> + return (CS_ERR_LIBRARY);
>> + } else
>> + if (res == -1&& errno == EINTR) {
>> + goto retry_sem_timedwait;
>> + } else
>> + if (res == -1) {
>> + return (CS_ERR_LIBRARY);
>> + }
>> +#else
>> + /*
>> + * Wait for semaphore indicating a new message from server
>> + * to client in queue
>> + */
>> + sop.sem_num = sem_id;
>> + sop.sem_op = -1;
>> + sop.sem_flg = 0;
>> +
>> +retry_semop:
>> + res = semop (control_buffer->semid,&sop, 1);
>> + if (res == -1&& errno == EINTR) {
>> + return (CS_ERR_TRY_AGAIN);
>> + goto retry_semop;
>> + } else
>> + if (res == -1&& errno == EACCES) {
>> + return (CS_ERR_TRY_AGAIN);
>> + } else
>> + if (res == -1) {
>> + return (CS_ERR_LIBRARY);
>> + }
>> +#endif
>> + return (CS_OK);
>> +}
>> +
>> +static inline cs_error_t
>> +ipc_sem_post (
>> + struct control_buffer *control_buffer,
>> + enum ipc_semaphore_identifiers sem_id)
>> +{
>> +#if _POSIX_THREAD_PROCESS_SHARED< 1
>> + struct sembuf sop;
>> +#else
>> + sem_t *sem = NULL;
>> +#endif
>> + int res;
>> +
>> +#if _POSIX_THREAD_PROCESS_SHARED> 0
>> + switch (sem_id) {
>> + case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
>> + sem =&control_buffer->sem_request_or_flush_or_exit;
>> + break;
>> + case SEMAPHORE_RESPONSE:
>> + sem =&control_buffer->sem_request;
>> + break;
>> + case SEMAPHORE_DISPATCH:
>> + sem =&control_buffer->sem_response;
>> + break;
>> + case SEMAPHORE_REQUEST:
>> + sem =&control_buffer->sem_dispatch;
>> + break;
>> + }
>> +
>> + res = sem_post (sem);
>> + if (res == -1) {
>> + return (CS_ERR_LIBRARY);
>> + }
>> +#else
>> + sop.sem_num = sem_id;
>> + sop.sem_op = 1;
>> + sop.sem_flg = 0;
>> +
>> +retry_semop:
>> + res = semop (control_buffer->semid,&sop, 1);
>> + if (res == -1&& errno == EINTR) {
>> + goto retry_semop;
>> + } else
>> + if (res == -1) {
>> + return (CS_ERR_LIBRARY);
>> + }
>> +#endif
>> + return (CS_OK);
>> +}
>> +
>> +static inline cs_error_t
>> +ipc_sem_getvalue (
>> + struct control_buffer *control_buffer,
>> + enum ipc_semaphore_identifiers sem_id,
>> + int *sem_value)
>> +{
>> +#if _POSIX_THREAD_PROCESS_SHARED< 1
>> + struct sembuf sop;
>> + int sem_value_hold;
>> +#else
>> + sem_t *sem = NULL;
>> +#endif
>> +
>> +#if _POSIX_THREAD_PROCESS_SHARED> 0
>> + switch (sem_id) {
>> + case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
>> + sem =&control_buffer->sem_request_or_flush_or_exit;
>> + break;
>> + case SEMAPHORE_RESPONSE:
>> + sem =&control_buffer->sem_request;
>> + break;
>> + case SEMAPHORE_DISPATCH:
>> + sem =&control_buffer->sem_response;
>> + break;
>> + case SEMAPHORE_REQUEST:
>> + sem =&control_buffer->sem_dispatch;
>> + break;
>> + }
>> +
>> + res = sem_getvalue (sem, sem_value);
>> + if (res == -1) {
>> + return (CS_ERR_LIBRARY);
>> + }
>> +#else
>> + sop.sem_num = sem_id;
>> + sop.sem_op = 1;
>> + sop.sem_flg = 0;
>> +
>> +retry_semctl:
>> + sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
>> + if (sem_value_hold == -1&& errno == EINTR) {
>> + goto retry_semctl;
>> + } else
>> + if (sem_value_hold == -1) {
>> + return (CS_ERR_LIBRARY);
>> + }
>> + *sem_value = sem_value_hold;
>> +#endif
>> + return (CS_OK);
>> +}
>> +
>> #endif /* COROIPC_IPC_H_DEFINED */
>> Index: exec/coroipcs.c
>> ===================================================================
>> --- exec/coroipcs.c (revision 3000)
>> +++ exec/coroipcs.c (working copy)
>> @@ -95,6 +95,9 @@
>> #define MSG_SEND_LOCKED 0
>> #define MSG_SEND_UNLOCKED 1
>>
>> +#define POLL_STATE_IN 1
>> +#define POLL_STATE_INOUT 2
>> +
>> static struct coroipcs_init_state_v2 *api = NULL;
>>
>> DECLARE_LIST_INIT (conn_info_list_head);
>> @@ -141,13 +144,10 @@
>> pthread_attr_t thread_attr;
>> unsigned int service;
>> enum conn_state state;
>> - int notify_flow_control_enabled;
>> - int flow_control_state;
>> int refcount;
>> hdb_handle_t stats_handle;
>> #if _POSIX_THREAD_PROCESS_SHARED< 1
>> key_t semkey;
>> - int semid;
>> #endif
>> unsigned int pending_semops;
>> pthread_mutex_t mutex;
>> @@ -166,6 +166,7 @@
>> unsigned int setup_bytes_read;
>> struct list_head zcb_mapped_list_head;
>> char *sending_allowed_private_data[64];
>> + int poll_state;
>> };
>>
>> static int shared_mem_dispatch_bytes_left (const struct conn_info
>> *conn_info);
>> @@ -221,34 +222,6 @@
>> {
>> }
>>
>> -static void sem_post_exit_thread (struct conn_info *conn_info)
>> -{
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#endif
>> - int res;
>> -
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> -retry_semop:
>> - res = sem_post (&conn_info->control_buffer->sem0);
>> - if (res == -1&& errno == EINTR) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semop;
>> - }
>> -#else
>> - sop.sem_num = 0;
>> - sop.sem_op = 1;
>> - sop.sem_flg = 0;
>> -
>> -retry_semop:
>> - res = semop (conn_info->semid,&sop, 1);
>> - if ((res == -1)&& (errno == EINTR || errno == EAGAIN)) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semop;
>> - }
>> -#endif
>> -}
>> -
>> static int
>> memory_map (
>> const char *path,
>> @@ -383,6 +356,34 @@
>> return (res);
>> }
>>
>> +static inline void flow_control_state_set (
>> + struct conn_info *conn_info,
>> + int flow_control_state)
>> +{
>> + if (conn_info->control_buffer->flow_control_enabled ==
>> flow_control_state) {
>> + return;
>> + }
>> + if (flow_control_state == 0) {
>> + log_printf (LOGSYS_LEVEL_DEBUG,
>> + "Disabling flow control for %d\n",
>> + conn_info->client_pid);
>> + } else
>> + if (flow_control_state == 1) {
>> + log_printf (LOGSYS_LEVEL_DEBUG,
>> + "Enabling flow control for %d\n",
>> + conn_info->client_pid);
>> + }
>> +
>> +
>> + conn_info->control_buffer->flow_control_enabled = flow_control_state;
>> + api->stats_update_value (conn_info->stats_handle,
>> + "flow_control",
>> + &flow_control_state,
>> + sizeof(flow_control_state));
>> + api->stats_increment_value (conn_info->stats_handle,
>> + "flow_control_count");
>> +}
>> +
>> static inline int zcb_free (struct zcb_mapped *zcb_mapped)
>> {
>> unsigned int res;
>> @@ -517,7 +518,7 @@
>> }
>>
>> if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
>> - sem_post_exit_thread (conn_info);
>> + ipc_sem_post (conn_info->control_buffer,
>> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>> return (0);
>> }
>>
>> @@ -546,11 +547,12 @@
>> pthread_mutex_unlock (&conn_info->mutex);
>>
>> #if _POSIX_THREAD_PROCESS_SHARED> 0
>> - sem_destroy (&conn_info->control_buffer->sem0);
>> - sem_destroy (&conn_info->control_buffer->sem1);
>> - sem_destroy (&conn_info->control_buffer->sem2);
>> + sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
>> + sem_destroy (&conn_info->control_buffer->sem_request);
>> + sem_destroy (&conn_info->control_buffer->sem_response);
>> + sem_destroy (&conn_info->control_buffer->sem_dispatch);
>> #else
>> - semctl (conn_info->semid, 0, IPC_RMID);
>> + semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
>> #endif
>> /*
>> * Destroy shared memory segment and semaphore
>> @@ -653,14 +655,12 @@
>> static void *pthread_ipc_consumer (void *conn)
>> {
>> struct conn_info *conn_info = (struct conn_info *)conn;
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#endif
>> int res;
>> coroipc_request_header_t *header;
>> coroipc_response_header_t coroipc_response_header;
>> int send_ok;
>> unsigned int new_message;
>> + int sem_value = 0;
>>
>> #if defined(HAVE_PTHREAD_SETSCHEDPARAM)&&
>> defined(HAVE_SCHED_GET_PRIORITY_MAX)
>> if (api->sched_policy != 0) {
>> @@ -670,43 +670,28 @@
>> #endif
>>
>> for (;;) {
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> -retry_semwait:
>> - res = sem_wait (&conn_info->control_buffer->sem0);
>> + ipc_sem_wait (conn_info->control_buffer,
>> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>> if (ipc_thread_active (conn_info) == 0) {
>> coroipcs_refcount_dec (conn_info);
>> pthread_exit (0);
>> }
>> - if ((res == -1)&& (errno == EINTR)) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semwait;
>> - }
>> -#else
>>
>> - sop.sem_num = 0;
>> - sop.sem_op = -1;
>> - sop.sem_flg = 0;
>> -retry_semop:
>> - res = semop (conn_info->semid,&sop, 1);
>> - if (ipc_thread_active (conn_info) == 0) {
>> - coroipcs_refcount_dec (conn_info);
>> - pthread_exit (0);
>> - }
>> - if ((res == -1)&& (errno == EINTR || errno == EAGAIN)) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semop;
>> - } else
>> - if ((res == -1)&& (errno == EINVAL || errno == EIDRM)) {
>> - coroipcs_refcount_dec (conn_info);
>> - pthread_exit (0);
>> - }
>> -#endif
>> + outq_flush (conn_info);
>>
>> + ipc_sem_getvalue (conn_info->control_buffer,
>> SEMAPHORE_REQUEST,&sem_value);
>> + if (sem_value> 0) {
>> +
>> + res = ipc_sem_wait (conn_info->control_buffer,
>> SEMAPHORE_REQUEST);
>> + } else {
>> + continue;
>> + }
>> +
>> zerocopy_operations_process (conn_info,&header,&new_message);
>> /*
>> * There is no new message to process, continue for loop
>> */
>> if (new_message == 0) {
>> +printf ("continuing\n");
>> continue;
>> }
>>
>> @@ -738,7 +723,6 @@
>> /*
>> * Overload, tell library to retry
>> */
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> coroipc_response_header.size = sizeof
>> (coroipc_response_header_t);
>> coroipc_response_header.id = 0;
>> coroipc_response_header.error = CS_ERR_TRY_AGAIN;
>> @@ -928,7 +912,7 @@
>> conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
>> pthread_mutex_unlock (&conn_info->mutex);
>>
>> - sem_post_exit_thread (conn_info);
>> + ipc_sem_post (conn_info->control_buffer,
>> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>> }
>>
>> static int conn_info_create (int fd)
>> @@ -945,6 +929,7 @@
>> conn_info->client_pid = 0;
>> conn_info->service = SOCKET_SERVICE_INIT;
>> conn_info->state = CONN_STATE_THREAD_INACTIVE;
>> + conn_info->poll_state = POLL_STATE_IN;
>> list_init (&conn_info->outq_head);
>> list_init (&conn_info->list);
>> list_init (&conn_info->zcb_mapped_list_head);
>> @@ -1103,11 +1088,12 @@
>> ipc_disconnect (conn_info);
>>
>> #if _POSIX_THREAD_PROCESS_SHARED> 0
>> - sem_destroy (&conn_info->control_buffer->sem0);
>> - sem_destroy (&conn_info->control_buffer->sem1);
>> - sem_destroy (&conn_info->control_buffer->sem2);
>> + sem_destroy
>> (&conn_info->control_buffer->sem_request_or_flush_or_exit);
>> + sem_destroy (&conn_info->control_buffer->sem_request);
>> + sem_destroy (&conn_info->control_buffer->sem_response);
>> + sem_destroy (&conn_info->control_buffer->sem_dispatch);
>> #else
>> - semctl (conn_info->semid, 0, IPC_RMID);
>> + semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
>> #endif
>>
>> /*
>> @@ -1181,33 +1167,11 @@
>> int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
>> {
>> struct conn_info *conn_info = (struct conn_info *)conn;
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#endif
>> - int res;
>>
>> memcpy (conn_info->response_buffer, msg, mlen);
>>
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> - res = sem_post (&conn_info->control_buffer->sem1);
>> - if (res == -1) {
>> - return (-1);
>> - }
>> -#else
>> - sop.sem_num = 1;
>> - sop.sem_op = 1;
>> - sop.sem_flg = 0;
>> + ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
>>
>> -retry_semop:
>> - res = semop (conn_info->semid,&sop, 1);
>> - if ((res == -1)&& (errno == EINTR || errno == EAGAIN)) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semop;
>> - } else
>> - if ((res == -1)&& (errno == EINVAL || errno == EIDRM)) {
>> - return (0);
>> - }
>> -#endif
>> api->stats_increment_value (conn_info->stats_handle, "responses");
>> return (0);
>> }
>> @@ -1215,10 +1179,6 @@
>> int coroipcs_response_iov_send (void *conn, const struct iovec *iov,
>> unsigned int iov_len)
>> {
>> struct conn_info *conn_info = (struct conn_info *)conn;
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#endif
>> - int res;
>> int write_idx = 0;
>> int i;
>>
>> @@ -1228,26 +1188,8 @@
>> write_idx += iov[i].iov_len;
>> }
>>
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> - res = sem_post (&conn_info->control_buffer->sem1);
>> - if (res == -1) {
>> - return (-1);
>> - }
>> -#else
>> - sop.sem_num = 1;
>> - sop.sem_op = 1;
>> - sop.sem_flg = 0;
>> + ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
>>
>> -retry_semop:
>> - res = semop (conn_info->semid,&sop, 1);
>> - if ((res == -1)&& (errno == EINTR || errno == EAGAIN)) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semop;
>> - } else
>> - if ((res == -1)&& (errno == EINVAL || errno == EIDRM)) {
>> - return (0);
>> - }
>> -#endif
>> api->stats_increment_value (conn_info->stats_handle, "responses");
>> return (0);
>> }
>> @@ -1283,86 +1225,31 @@
>> conn_info->control_buffer->write = (write_idx + len) %
>> conn_info->dispatch_size;
>> }
>>
>> -/**
>> - * simulate the behaviour in coroipcc.c
>> - */
>> -static int flow_control_event_send (struct conn_info *conn_info, char event)
>> -{
>> - int new_fc = 0;
>> -
>> - if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
>> - event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
>> - new_fc = 1;
>> - }
>> -
>> - if (conn_info->flow_control_state != new_fc) {
>> - if (new_fc == 1) {
>> - log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control
>> for %d, event %d\n",
>> - conn_info->client_pid, event);
>> - } else {
>> - log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control
>> for %d, event %d\n",
>> - conn_info->client_pid, event);
>> - }
>> - conn_info->flow_control_state = new_fc;
>> - api->stats_update_value (conn_info->stats_handle,
>> "flow_control",
>> - &conn_info->flow_control_state,
>> - sizeof(conn_info->flow_control_state));
>> - api->stats_increment_value (conn_info->stats_handle,
>> "flow_control_count");
>> - }
>> -
>> - return send (conn_info->fd,&event, 1, MSG_NOSIGNAL);
>> -}
>> -
>> static void msg_send (void *conn, const struct iovec *iov, unsigned int
>> iov_len,
>> int locked)
>> {
>> struct conn_info *conn_info = (struct conn_info *)conn;
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#endif
>> int res;
>> int i;
>> + char buf;
>>
>> for (i = 0; i< iov_len; i++) {
>> memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
>> }
>>
>> - if (list_empty (&conn_info->outq_head))
>> - res = flow_control_event_send (conn_info,
>> MESSAGE_RES_OUTQ_EMPTY);
>> - else
>> - res = flow_control_event_send (conn_info,
>> MESSAGE_RES_OUTQ_NOT_EMPTY);
>> -
>> - if (res == -1&& errno == EAGAIN) {
>> - if (locked == 0) {
>> - pthread_mutex_lock (&conn_info->mutex);
>> - }
>> + buf = list_empty (&conn_info->outq_head);
>> + res = send (conn_info->fd,&buf, 1, MSG_NOSIGNAL);
>> + if (res != 1) {
>> conn_info->pending_semops += 1;
>> - if (locked == 0) {
>> - pthread_mutex_unlock (&conn_info->mutex);
>> + if (conn_info->poll_state == POLL_STATE_IN) {
>> + conn_info->poll_state = POLL_STATE_INOUT;
>> + api->poll_dispatch_modify (conn_info->fd,
>> + POLLIN|POLLOUT|POLLNVAL);
>> }
>> - api->poll_dispatch_modify (conn_info->fd,
>> - POLLIN|POLLOUT|POLLNVAL);
>> - } else
>> - if (res == -1) {
>> - ipc_disconnect (conn_info);
>> }
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> - res = sem_post (&conn_info->control_buffer->sem2);
>> -#else
>> - sop.sem_num = 2;
>> - sop.sem_op = 1;
>> - sop.sem_flg = 0;
>>
>> -retry_semop:
>> - res = semop (conn_info->semid,&sop, 1);
>> - if ((res == -1)&& (errno == EINTR || errno == EAGAIN)) {
>> - api->stats_increment_value (conn_info->stats_handle,
>> "sem_retry_count");
>> - goto retry_semop;
>> - } else
>> - if ((res == -1)&& (errno == EINVAL || errno == EIDRM)) {
>> - return;
>> - }
>> -#endif
>> + ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
>> +
>> api->stats_increment_value (conn_info->stats_handle, "dispatched");
>> }
>>
>> @@ -1371,11 +1258,10 @@
>> struct outq_item *outq_item;
>> unsigned int bytes_left;
>> struct iovec iov;
>> - int res;
>>
>> pthread_mutex_lock (&conn_info->mutex);
>> if (list_empty (&conn_info->outq_head)) {
>> - res = flow_control_event_send (conn_info,
>> MESSAGE_RES_OUTQ_FLUSH_NR);
>> + flow_control_state_set (conn_info, 0);
>> pthread_mutex_unlock (&conn_info->mutex);
>> return;
>> }
>> @@ -1441,7 +1327,7 @@
>> semun.buf =&ipc_set;
>>
>> for (i = 0; i< 3; i++) {
>> - res = semctl (conn_info->semid, 0, IPC_SET, semun);
>> + res = semctl (conn_info->control_buffer->semid, 0, IPC_SET,
>> semun);
>> if (res == -1) {
>> return (-1);
>> }
>> @@ -1471,6 +1357,7 @@
>> bytes_msg += iov[i].iov_len;
>> }
>> if (bytes_left< bytes_msg || list_empty (&conn_info->outq_head) == 0) {
>> + flow_control_state_set (conn_info, 1);
>> outq_item = api->malloc (sizeof (struct outq_item));
>> if (outq_item == NULL) {
>> ipc_disconnect (conn);
>> @@ -1491,11 +1378,6 @@
>> outq_item->mlen = bytes_msg;
>> list_init (&outq_item->list);
>> pthread_mutex_lock (&conn_info->mutex);
>> - if (list_empty (&conn_info->outq_head)) {
>> - conn_info->notify_flow_control_enabled = 1;
>> - api->poll_dispatch_modify (conn_info->fd,
>> - POLLIN|POLLOUT|POLLNVAL);
>> - }
>> list_add_tail (&outq_item->list,&conn_info->outq_head);
>> pthread_mutex_unlock (&conn_info->mutex);
>> api->stats_increment_value (conn_info->stats_handle,
>> "queue_size");
>> @@ -1742,11 +1624,10 @@
>>
>> conn_info->service = req_setup->service;
>> conn_info->refcount = 0;
>> - conn_info->notify_flow_control_enabled = 0;
>> conn_info->setup_bytes_read = 0;
>>
>> #if _POSIX_THREAD_PROCESS_SHARED< 1
>> - conn_info->semid = semget (conn_info->semkey, 3, 0600);
>> + conn_info->control_buffer->semid = semget (conn_info->semkey,
>> 3, 0600);
>> #endif
>> conn_info->pending_semops = 0;
>>
>> @@ -1794,9 +1675,6 @@
>> res = recv (fd,&buf, 1, MSG_NOSIGNAL);
>> if (res == 1) {
>> switch (buf) {
>> - case MESSAGE_REQ_OUTQ_FLUSH:
>> - outq_flush (conn_info);
>> - break;
>> case MESSAGE_REQ_CHANGE_EUID:
>> if (priv_change (conn_info) == -1) {
>> ipc_disconnect (conn_info);
>> @@ -1820,37 +1698,24 @@
>> coroipcs_refcount_dec (conn_info);
>> }
>>
>> - coroipcs_refcount_inc (conn_info);
>> - pthread_mutex_lock (&conn_info->mutex);
>> - if ((conn_info->state == CONN_STATE_THREAD_ACTIVE)&& (revent&
>> POLLOUT)) {
>> - if (list_empty (&conn_info->outq_head))
>> - buf = MESSAGE_RES_OUTQ_EMPTY;
>> - else
>> - buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
>> + if (revent& POLLOUT) {
>> + int psop = conn_info->pending_semops;
>> + int i;
>>
>> - for (; conn_info->pending_semops;) {
>> - res = flow_control_event_send (conn_info, buf);
>> - if (res == 1) {
>> - conn_info->pending_semops--;
>> + assert (psop != 0);
>> + for (i = 0; i< psop; i++) {
>> + res = send (conn_info->fd,&buf, 1, MSG_NOSIGNAL);
>> + if (res != 1) {
>> + return (0);
>> } else {
>> - break;
>> + conn_info->pending_semops -= 1;
>> }
>> }
>> - if (conn_info->notify_flow_control_enabled) {
>> - res = flow_control_event_send (conn_info,
>> MESSAGE_RES_ENABLE_FLOWCONTROL);
>> - if (res == 1) {
>> - conn_info->notify_flow_control_enabled = 0;
>> - }
>> + if (conn_info->poll_state == POLL_STATE_INOUT) {
>> + conn_info->poll_state = POLL_STATE_IN;
>> + api->poll_dispatch_modify (conn_info->fd,
>> POLLIN|POLLNVAL);
>> }
>> - if (conn_info->notify_flow_control_enabled == 0&&
>> - conn_info->pending_semops == 0) {
>> -
>> - api->poll_dispatch_modify (conn_info->fd,
>> - POLLIN|POLLNVAL);
>> - }
>> }
>> - pthread_mutex_unlock (&conn_info->mutex);
>> - coroipcs_refcount_dec (conn_info);
>>
>> return (0);
>> }
>> Index: lib/coroipcc.c
>> ===================================================================
>> --- lib/coroipcc.c (revision 3000)
>> +++ lib/coroipcc.c (working copy)
>> @@ -72,17 +72,8 @@
>>
>> #include "util.h"
>>
>> -/*
>> - * Define sem_wait timeout (real timeout will be (n-1;n) )
>> - */
>> -#define IPC_SEMWAIT_TIMEOUT 2
>> -
>> struct ipc_instance {
>> int fd;
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - int semid;
>> -#endif
>> - int flow_control_state;
>> struct control_buffer *control_buffer;
>> char *request_buffer;
>> char *response_buffer;
>> @@ -117,6 +108,23 @@
>> #define MSG_NOSIGNAL 0
>> #endif
>>
>> +static inline int shared_mem_dispatch_bytes_left (struct ipc_instance
>> *context)
>> +{
>> + unsigned int n_read;
>> + unsigned int n_write;
>> + unsigned int bytes_left;
>> +
>> + n_read = context->control_buffer->read;
>> + n_write = context->control_buffer->write;
>> +
>> + if (n_read<= n_write) {
>> + bytes_left = context->dispatch_size - n_write + n_read;
>> + } else {
>> + bytes_left = n_read - n_write;
>> + }
>> + return (bytes_left);
>> +}
>> +
>> static cs_error_t
>> socket_send (
>> int s,
>> @@ -238,10 +246,10 @@
>> return (res);
>> }
>>
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> static int
>> priv_change_send (struct ipc_instance *ipc_instance)
>> {
>> +#if _POSIX_THREAD_PROCESS_SHARED< 1
>> char buf_req;
>> mar_req_priv_change req_priv_change;
>> unsigned int res;
>> @@ -268,19 +276,12 @@
>> }
>>
>> ipc_instance->euid = req_priv_change.euid;
>> +#else
>> + ipc_instance = NULL;
>> +#endif
>> return (0);
>> }
>>
>> -#if defined(_SEM_SEMUN_UNDEFINED)
>> -union semun {
>> - int val;
>> - struct semid_ds *buf;
>> - unsigned short int *array;
>> - struct seminfo *__buf;
>> -};
>> -#endif
>> -#endif
>> -
>> static int
>> circular_memory_map (char *path, const char *file, void **buf, size_t
>> bytes)
>> {
>> @@ -471,10 +472,6 @@
>> const struct iovec *iov,
>> unsigned int iov_len)
>> {
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#endif
>> -
>> int i;
>> int res;
>> int req_buffer_idx = 0;
>> @@ -490,117 +487,18 @@
>> req_buffer_idx += iov[i].iov_len;
>> }
>>
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> - res = sem_post (&ipc_instance->control_buffer->sem0);
>> - if (res == -1) {
>> - return (CS_ERR_LIBRARY);
>> - }
>> -#else
>> /*
>> - * Signal semaphore #0 indicting a new message from client
>> + * Signal semaphore #3 and #0 indicting a new message from client
>> * to server request queue
>> */
>> - sop.sem_num = 0;
>> - sop.sem_op = 1;
>> - sop.sem_flg = 0;
>> -
>> -retry_semop:
>> - res = semop (ipc_instance->semid,&sop, 1);
>> - if (res == -1&& errno == EINTR) {
>> - return (CS_ERR_TRY_AGAIN);
>> - } else
>> - if (res == -1&& errno == EACCES) {
>> - priv_change_send (ipc_instance);
>> - goto retry_semop;
>> - } else
>> - if (res == -1) {
>> + res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
>> + if (res != CS_OK) {
>> return (CS_ERR_LIBRARY);
>> }
>> -#endif
>> - return (CS_OK);
>> -}
>> -
>> -inline static cs_error_t
>> -ipc_sem_wait (
>> - struct ipc_instance *ipc_instance,
>> - int sem_num)
>> -{
>> -#if _POSIX_THREAD_PROCESS_SHARED< 1
>> - struct sembuf sop;
>> -#else
>> - struct timespec timeout;
>> - struct pollfd pfd;
>> - sem_t *sem = NULL;
>> -#endif
>> - int res;
>> -
>> -#if _POSIX_THREAD_PROCESS_SHARED> 0
>> - switch (sem_num) {
>> - case 0:
>> - sem =&ipc_instance->control_buffer->sem0;
>> - break;
>> - case 1:
>> - sem =&ipc_instance->control_buffer->sem1;
>> - break;
>> - case 2:
>> - sem =&ipc_instance->control_buffer->sem2;
>> - break;
>> - }
>> -
>> -retry_semwait:
>> - timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
>> - timeout.tv_nsec = 0;
>> -
>> - res = sem_timedwait (sem,&timeout);
>> - if (res == -1&& errno == ETIMEDOUT) {
>> - pfd.fd = ipc_instance->fd;
>> - pfd.events = 0;
>> -
>> - res = poll (&pfd, 1, 0);
>> -
>> - if (res == -1&& errno == EINTR) {
>> - return (CS_ERR_TRY_AGAIN);
>> - } else
>> - if (res == -1) {
>> - return (CS_ERR_LIBRARY);
>> - }
>> -
>> - if (res == 1) {
>> - if (pfd.revents == POLLERR || pfd.revents == POLLHUP ||
>> pfd.revents == POLLNVAL) {
>> - return (CS_ERR_LIBRARY);
>> - }
>> - }
>> -
>> - goto retry_semwait;
>> - } else
>> - if (res == -1&& errno == EINTR) {
>> - return (CS_ERR_TRY_AGAIN);
>> - } else
>> - if (res == -1) {
>> + res = ipc_sem_post (ipc_instance->control_buffer,
>> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>> + if (res != CS_OK) {
>> return (CS_ERR_LIBRARY);
>> }
>> -#else
>> - /*
>> - * Wait for semaphore indicating a new message from server
>> - * to client in queue
>> - */
>> - sop.sem_num = sem_num;
>> - sop.sem_op = -1;
>> - sop.sem_flg = 0;
>> -
>> -retry_semop:
>> - res = semop (ipc_instance->semid,&sop, 1);
>> - if (res == -1&& errno == EINTR) {
>> - return (CS_ERR_TRY_AGAIN);
>> - } else
>> - if (res == -1&& errno == EACCES) {
>> - priv_change_send (ipc_instance);
>> - goto retry_semop;
>> - } else
>> - if (res == -1) {
>> - return (CS_ERR_LIBRARY);
>> - }
>> -#endif
>> return (CS_OK);
>> }
>>
>> @@ -611,10 +509,17 @@
>> size_t res_len)
>> {
>> coroipc_response_header_t *response_header;
>> - cs_error_t err;
>> + cs_error_t res;
>>
>> - if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
>> - return (err);
>> +retry_ipc_sem_wait:
>> + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
>> + if (res != CS_OK) {
>> + if (res == CS_ERR_TRY_AGAIN) {
>> + priv_change_send (ipc_instance);
>> + goto retry_ipc_sem_wait;
>> + } else {
>> + return (res);
>> + }
>> }
>>
>> response_header = (coroipc_response_header_t
>> *)ipc_instance->response_buffer;
>> @@ -631,10 +536,17 @@
>> struct ipc_instance *ipc_instance,
>> void **res_msg)
>> {
>> - cs_error_t err;
>> + cs_error_t res;
>>
>> - if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
>> - return (err);
>> +retry_ipc_sem_wait:
>> + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
>> + if (res != CS_OK) {
>> + if (res == CS_ERR_TRY_AGAIN) {
>> + priv_change_send (ipc_instance);
>> + goto retry_ipc_sem_wait;
>> + } else {
>> + return (res);
>> + }
>> }
>>
>> *res_msg = (char *)ipc_instance->response_buffer;
>> @@ -754,18 +666,22 @@
>> }
>>
>> #if _POSIX_THREAD_PROCESS_SHARED> 0
>> - sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
>> - sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
>> - sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
>> + sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit,
>> 1, 0);
>> + sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
>> + sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
>> + sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
>> #else
>> +{
>> + int i;
>> +
>> /*
>> * Allocate a semaphore segment
>> */
>> while (1) {
>> semkey = random();
>> ipc_instance->euid = geteuid ();
>> - if ((ipc_instance->semid
>> - = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
>> + if ((ipc_instance->control_buffer->semid
>> + = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
>> break;
>> }
>> /*
>> @@ -781,18 +697,15 @@
>> }
>> }
>>
>> - semun.val = 0;
>> - sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
>> - if (sys_res != 0) {
>> - res = CS_ERR_LIBRARY;
>> - goto error_exit;
>> + for (i = 0; i< 4; i++) {
>> + semun.val = 0;
>> + sys_res = semctl (ipc_instance->control_buffer->semid, i,
>> SETVAL, semun);
>> + if (sys_res != 0) {
>> + res = CS_ERR_LIBRARY;
>> + goto error_exit;
>> + }
>> }
>> -
>> - sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
>> - if (sys_res != 0) {
>> - res = CS_ERR_LIBRARY;
>> - goto error_exit;
>> - }
>> +}
>> #endif
>>
>> /*
>> @@ -822,7 +735,6 @@
>> }
>>
>> ipc_instance->fd = request_fd;
>> - ipc_instance->flow_control_state = 0;
>>
>> if (res_setup.error == CS_ERR_TRY_AGAIN) {
>> res = res_setup.error;
>> @@ -842,8 +754,8 @@
>>
>> error_exit:
>> #if _POSIX_THREAD_PROCESS_SHARED< 1
>> - if (ipc_instance->semid> 0)
>> - semctl (ipc_instance->semid, 0, IPC_RMID);
>> + if (ipc_instance->control_buffer->semid> 0)
>> + semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
>> #endif
>> memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
>> error_dispatch_buffer:
>> @@ -893,7 +805,7 @@
>> return (res);
>> }
>>
>> - *flow_control_state = ipc_instance->flow_control_state;
>> + *flow_control_state =
>> ipc_instance->control_buffer->flow_control_enabled;
>>
>> hdb_handle_put (&ipc_hdb, handle);
>> return (res);
>> @@ -928,10 +840,9 @@
>> int poll_events;
>> char buf;
>> struct ipc_instance *ipc_instance;
>> - int res;
>> - char buf_two = 1;
>> char *data_addr;
>> cs_error_t error = CS_OK;
>> + int res;
>>
>> error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void
>> **)&ipc_instance));
>> if (error != CS_OK) {
>> @@ -962,47 +873,22 @@
>> goto error_put;
>> }
>>
>> - res = recv (ipc_instance->fd,&buf, 1, 0);
>> - if (res == -1&& errno == EINTR) {
>> - error = CS_ERR_TRY_AGAIN;
>> - goto error_put;
>> - } else
>> - if (res == -1) {
>> - error = CS_ERR_LIBRARY;
>> - goto error_put;
>> - } else
>> - if (res == 0) {
>> - /* Means that the peer closed cleanly the socket. However, it
>> should
>> - * happen only on BSD and Darwing systems since poll() returns a
>> - * POLLHUP event on other systems.
>> + error = socket_recv (ipc_instance->fd,&buf, 1);
>> + assert (error == CS_OK);
>> +
>> + if (shared_mem_dispatch_bytes_left (ipc_instance)> 500000) {
>> + /*
>> + * Notify coroipcs to flush any pending dispatch messages
>> */
>> - error = CS_ERR_LIBRARY;
>> - goto error_put;
>> +
>> + res = ipc_sem_post (ipc_instance->control_buffer,
>> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
>> + if (res != CS_OK) {
>> + error = CS_ERR_LIBRARY;
>> + goto error_put;
>> + }
>> +
>> +
>> }
>> - ipc_instance->flow_control_state = 0;
>> - if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf ==
>> MESSAGE_RES_ENABLE_FLOWCONTROL) {
>> - ipc_instance->flow_control_state = 1;
>> - }
>> - /*
>> - * Notify executive to flush any pending dispatch messages
>> - */
>> - if (ipc_instance->flow_control_state) {
>> - buf_two = MESSAGE_REQ_OUTQ_FLUSH;
>> - res = socket_send (ipc_instance->fd,&buf_two, 1);
>> - assert (res == CS_OK); /* TODO */
>> - }
>> - /*
>> - * This is just a notification of flow control starting at the addition
>> - * of a new pending message, not a message to dispatch
>> - */
>> - if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
>> - error = CS_ERR_TRY_AGAIN;
>> - goto error_put;
>> - }
>> - if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
>> - error = CS_ERR_TRY_AGAIN;
>> - goto error_put;
>> - }
>>
>> data_addr = ipc_instance->dispatch_buffer;
>>
>> @@ -1030,8 +916,15 @@
>> return (res);
>> }
>>
>> - if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
>> - goto error_exit;
>> +retry_ipc_sem_wait:
>> + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
>> + if (res != CS_OK) {
>> + if (res == CS_ERR_TRY_AGAIN) {
>> + priv_change_send (ipc_instance);
>> + goto retry_ipc_sem_wait;
>> + } else {
>> + goto error_exit;
>> + }
>> }
>>
>> addr = ipc_instance->dispatch_buffer;
>> @@ -1078,8 +971,8 @@
>> res = reply_receive (ipc_instance, res_msg, res_len);
>>
>> error_exit:
>> - hdb_handle_put (&ipc_hdb, handle);
>> pthread_mutex_unlock (&ipc_instance->mutex);
>> + hdb_handle_put (&ipc_hdb, handle);
>>
>> return (res);
>> }
>
>> _______________________________________________
>> Openais mailing list
>> [email protected]
>> https://lists.linux-foundation.org/mailman/listinfo/openais
>
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais