On Wed, Jul 21, 2010 at 12:33:15AM -0700, Steven Dake wrote:
> Integrating comments from Angus and generally cleaning up the code a
> bit for maintenance purposes.
Much easier to read Steve.
Two very minor comments:
I am not convinced flow_control_state_set() needs to be inline.
Do we really need to assign enums values?
BLA_BLA = 1,
FOO_FOO = 2,
...
I'll have more of a look in the morning.
CTS is underway at the time of writing, looks ok so far.
(I'll let you know how it goes)
Angus
>
> Regards
> -steve
> Index: include/corosync/coroipc_ipc.h
> ===================================================================
> --- include/corosync/coroipc_ipc.h (revision 3000)
> +++ include/corosync/coroipc_ipc.h (working copy)
> @@ -35,6 +35,9 @@
> #define COROIPC_IPC_H_DEFINED
>
> #include <unistd.h>
> +#include <poll.h>
> +#include <time.h>
> +#include "corotypes.h"
> #include "config.h"
>
> /*
> @@ -52,28 +55,40 @@
>
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> #include <semaphore.h>
> +#else
> +#include <sys/sem.h>
> #endif
>
> +/*
> + * Define sem_wait timeout (real timeout will be (n-1;n) )
> + */
> +#define IPC_SEMWAIT_TIMEOUT 2
> +
> enum req_init_types {
> MESSAGE_REQ_RESPONSE_INIT = 0,
> MESSAGE_REQ_DISPATCH_INIT = 1
> };
>
> #define MESSAGE_REQ_CHANGE_EUID 1
> -#define MESSAGE_REQ_OUTQ_FLUSH 2
>
> -#define MESSAGE_RES_OUTQ_EMPTY 0
> -#define MESSAGE_RES_OUTQ_NOT_EMPTY 1
> -#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
> -#define MESSAGE_RES_OUTQ_FLUSH_NR 3
> +enum ipc_semaphore_identifiers {
> + SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT = 0,
> + SEMAPHORE_REQUEST = 1,
> + SEMAPHORE_RESPONSE = 2,
> + SEMAPHORE_DISPATCH = 3
> +};
>
> struct control_buffer {
> unsigned int read;
> unsigned int write;
> + int flow_control_enabled;
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> - sem_t sem0;
> - sem_t sem1;
> - sem_t sem2;
> + sem_t sem_request_or_flush_or_exit;
> + sem_t sem_response;
> + sem_t sem_dispatch;
> + sem_t sem_request;
> +#else
> + int semid;
> #endif
> };
>
> @@ -145,4 +160,172 @@
> #define ZC_FREE_HEADER 0xFFFFFFFE
> #define ZC_EXECUTE_HEADER 0xFFFFFFFD
>
> +static inline cs_error_t
> +ipc_sem_wait (
> + struct control_buffer *control_buffer,
> + enum ipc_semaphore_identifiers sem_id)
> +{
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> + struct sembuf sop;
> +#else
> + struct timespec timeout;
> + sem_t *sem = NULL;
> +#endif
> + int res;
> +
> +#if _POSIX_THREAD_PROCESS_SHARED > 0
> + switch (sem_id) {
> + case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
> + sem = &control_buffer->sem_request_or_flush_or_exit;
> + break;
> + case SEMAPHORE_RESPONSE:
> + sem = &control_buffer->sem_request;
> + break;
> + case SEMAPHORE_DISPATCH:
> + sem = &control_buffer->sem_response;
> + break;
> + case SEMAPHORE_REQUEST:
> + sem = &control_buffer->sem_dispatch;
> + break;
> + }
> +
> + timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
> + timeout.tv_nsec = 0;
> +
> +retry_sem_timedwait:
> + res = sem_timedwait (sem, &timeout);
> + if (res == -1 && errno == ETIMEDOUT) {
> + return (CS_ERR_LIBRARY);
> + } else
> + if (res == -1 && errno == EINTR) {
> + goto retry_sem_timedwait;
> + } else
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> +#else
> + /*
> + * Wait for semaphore indicating a new message from server
> + * to client in queue
> + */
> + sop.sem_num = sem_id;
> + sop.sem_op = -1;
> + sop.sem_flg = 0;
> +
> +retry_semop:
> + res = semop (control_buffer->semid, &sop, 1);
> + if (res == -1 && errno == EINTR) {
> + return (CS_ERR_TRY_AGAIN);
> + goto retry_semop;
> + } else
> + if (res == -1 && errno == EACCES) {
> + return (CS_ERR_TRY_AGAIN);
> + } else
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> +#endif
> + return (CS_OK);
> +}
> +
> +static inline cs_error_t
> +ipc_sem_post (
> + struct control_buffer *control_buffer,
> + enum ipc_semaphore_identifiers sem_id)
> +{
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> + struct sembuf sop;
> +#else
> + sem_t *sem = NULL;
> +#endif
> + int res;
> +
> +#if _POSIX_THREAD_PROCESS_SHARED > 0
> + switch (sem_id) {
> + case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
> + sem = &control_buffer->sem_request_or_flush_or_exit;
> + break;
> + case SEMAPHORE_RESPONSE:
> + sem = &control_buffer->sem_request;
> + break;
> + case SEMAPHORE_DISPATCH:
> + sem = &control_buffer->sem_response;
> + break;
> + case SEMAPHORE_REQUEST:
> + sem = &control_buffer->sem_dispatch;
> + break;
> + }
> +
> + res = sem_post (sem);
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> +#else
> + sop.sem_num = sem_id;
> + sop.sem_op = 1;
> + sop.sem_flg = 0;
> +
> +retry_semop:
> + res = semop (control_buffer->semid, &sop, 1);
> + if (res == -1 && errno == EINTR) {
> + goto retry_semop;
> + } else
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> +#endif
> + return (CS_OK);
> +}
> +
> +static inline cs_error_t
> +ipc_sem_getvalue (
> + struct control_buffer *control_buffer,
> + enum ipc_semaphore_identifiers sem_id,
> + int *sem_value)
> +{
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> + struct sembuf sop;
> + int sem_value_hold;
> +#else
> + sem_t *sem = NULL;
> +#endif
> +
> +#if _POSIX_THREAD_PROCESS_SHARED > 0
> + switch (sem_id) {
> + case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
> + sem = &control_buffer->sem_request_or_flush_or_exit;
> + break;
> + case SEMAPHORE_RESPONSE:
> + sem = &control_buffer->sem_request;
> + break;
> + case SEMAPHORE_DISPATCH:
> + sem = &control_buffer->sem_response;
> + break;
> + case SEMAPHORE_REQUEST:
> + sem = &control_buffer->sem_dispatch;
> + break;
> + }
> +
> + res = sem_getvalue (sem, sem_value);
> + if (res == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> +#else
> + sop.sem_num = sem_id;
> + sop.sem_op = 1;
> + sop.sem_flg = 0;
> +
> +retry_semctl:
> + sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
> + if (sem_value_hold == -1 && errno == EINTR) {
> + goto retry_semctl;
> + } else
> + if (sem_value_hold == -1) {
> + return (CS_ERR_LIBRARY);
> + }
> + *sem_value = sem_value_hold;
> +#endif
> + return (CS_OK);
> +}
> +
> #endif /* COROIPC_IPC_H_DEFINED */
> Index: exec/coroipcs.c
> ===================================================================
> --- exec/coroipcs.c (revision 3000)
> +++ exec/coroipcs.c (working copy)
> @@ -95,6 +95,9 @@
> #define MSG_SEND_LOCKED 0
> #define MSG_SEND_UNLOCKED 1
>
> +#define POLL_STATE_IN 1
> +#define POLL_STATE_INOUT 2
> +
> static struct coroipcs_init_state_v2 *api = NULL;
>
> DECLARE_LIST_INIT (conn_info_list_head);
> @@ -141,13 +144,10 @@
> pthread_attr_t thread_attr;
> unsigned int service;
> enum conn_state state;
> - int notify_flow_control_enabled;
> - int flow_control_state;
> int refcount;
> hdb_handle_t stats_handle;
> #if _POSIX_THREAD_PROCESS_SHARED < 1
> key_t semkey;
> - int semid;
> #endif
> unsigned int pending_semops;
> pthread_mutex_t mutex;
> @@ -166,6 +166,7 @@
> unsigned int setup_bytes_read;
> struct list_head zcb_mapped_list_head;
> char *sending_allowed_private_data[64];
> + int poll_state;
> };
>
> static int shared_mem_dispatch_bytes_left (const struct conn_info
> *conn_info);
> @@ -221,34 +222,6 @@
> {
> }
>
> -static void sem_post_exit_thread (struct conn_info *conn_info)
> -{
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#endif
> - int res;
> -
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -retry_semop:
> - res = sem_post (&conn_info->control_buffer->sem0);
> - if (res == -1 && errno == EINTR) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semop;
> - }
> -#else
> - sop.sem_num = 0;
> - sop.sem_op = 1;
> - sop.sem_flg = 0;
> -
> -retry_semop:
> - res = semop (conn_info->semid, &sop, 1);
> - if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semop;
> - }
> -#endif
> -}
> -
> static int
> memory_map (
> const char *path,
> @@ -383,6 +356,34 @@
> return (res);
> }
>
> +static inline void flow_control_state_set (
> + struct conn_info *conn_info,
> + int flow_control_state)
> +{
> + if (conn_info->control_buffer->flow_control_enabled ==
> flow_control_state) {
> + return;
> + }
> + if (flow_control_state == 0) {
> + log_printf (LOGSYS_LEVEL_DEBUG,
> + "Disabling flow control for %d\n",
> + conn_info->client_pid);
> + } else
> + if (flow_control_state == 1) {
> + log_printf (LOGSYS_LEVEL_DEBUG,
> + "Enabling flow control for %d\n",
> + conn_info->client_pid);
> + }
> +
> +
> + conn_info->control_buffer->flow_control_enabled = flow_control_state;
> + api->stats_update_value (conn_info->stats_handle,
> + "flow_control",
> + &flow_control_state,
> + sizeof(flow_control_state));
> + api->stats_increment_value (conn_info->stats_handle,
> + "flow_control_count");
> +}
> +
> static inline int zcb_free (struct zcb_mapped *zcb_mapped)
> {
> unsigned int res;
> @@ -517,7 +518,7 @@
> }
>
> if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
> - sem_post_exit_thread (conn_info);
> + ipc_sem_post (conn_info->control_buffer,
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> return (0);
> }
>
> @@ -546,11 +547,12 @@
> pthread_mutex_unlock (&conn_info->mutex);
>
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> - sem_destroy (&conn_info->control_buffer->sem0);
> - sem_destroy (&conn_info->control_buffer->sem1);
> - sem_destroy (&conn_info->control_buffer->sem2);
> + sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
> + sem_destroy (&conn_info->control_buffer->sem_request);
> + sem_destroy (&conn_info->control_buffer->sem_response);
> + sem_destroy (&conn_info->control_buffer->sem_dispatch);
> #else
> - semctl (conn_info->semid, 0, IPC_RMID);
> + semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
> #endif
> /*
> * Destroy shared memory segment and semaphore
> @@ -653,14 +655,12 @@
> static void *pthread_ipc_consumer (void *conn)
> {
> struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#endif
> int res;
> coroipc_request_header_t *header;
> coroipc_response_header_t coroipc_response_header;
> int send_ok;
> unsigned int new_message;
> + int sem_value = 0;
>
> #if defined(HAVE_PTHREAD_SETSCHEDPARAM) &&
> defined(HAVE_SCHED_GET_PRIORITY_MAX)
> if (api->sched_policy != 0) {
> @@ -670,43 +670,28 @@
> #endif
>
> for (;;) {
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> -retry_semwait:
> - res = sem_wait (&conn_info->control_buffer->sem0);
> + ipc_sem_wait (conn_info->control_buffer,
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> if (ipc_thread_active (conn_info) == 0) {
> coroipcs_refcount_dec (conn_info);
> pthread_exit (0);
> }
> - if ((res == -1) && (errno == EINTR)) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semwait;
> - }
> -#else
>
> - sop.sem_num = 0;
> - sop.sem_op = -1;
> - sop.sem_flg = 0;
> -retry_semop:
> - res = semop (conn_info->semid, &sop, 1);
> - if (ipc_thread_active (conn_info) == 0) {
> - coroipcs_refcount_dec (conn_info);
> - pthread_exit (0);
> - }
> - if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semop;
> - } else
> - if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> - coroipcs_refcount_dec (conn_info);
> - pthread_exit (0);
> - }
> -#endif
> + outq_flush (conn_info);
>
> + ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST,
> &sem_value);
> + if (sem_value > 0) {
> +
> + res = ipc_sem_wait (conn_info->control_buffer,
> SEMAPHORE_REQUEST);
> + } else {
> + continue;
> + }
> +
> zerocopy_operations_process (conn_info, &header, &new_message);
> /*
> * There is no new message to process, continue for loop
> */
> if (new_message == 0) {
> +printf ("continuing\n");
> continue;
> }
>
> @@ -738,7 +723,6 @@
> /*
> * Overload, tell library to retry
> */
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> coroipc_response_header.size = sizeof
> (coroipc_response_header_t);
> coroipc_response_header.id = 0;
> coroipc_response_header.error = CS_ERR_TRY_AGAIN;
> @@ -928,7 +912,7 @@
> conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
> pthread_mutex_unlock (&conn_info->mutex);
>
> - sem_post_exit_thread (conn_info);
> + ipc_sem_post (conn_info->control_buffer,
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> }
>
> static int conn_info_create (int fd)
> @@ -945,6 +929,7 @@
> conn_info->client_pid = 0;
> conn_info->service = SOCKET_SERVICE_INIT;
> conn_info->state = CONN_STATE_THREAD_INACTIVE;
> + conn_info->poll_state = POLL_STATE_IN;
> list_init (&conn_info->outq_head);
> list_init (&conn_info->list);
> list_init (&conn_info->zcb_mapped_list_head);
> @@ -1103,11 +1088,12 @@
> ipc_disconnect (conn_info);
>
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> - sem_destroy (&conn_info->control_buffer->sem0);
> - sem_destroy (&conn_info->control_buffer->sem1);
> - sem_destroy (&conn_info->control_buffer->sem2);
> + sem_destroy
> (&conn_info->control_buffer->sem_request_or_flush_or_exit);
> + sem_destroy (&conn_info->control_buffer->sem_request);
> + sem_destroy (&conn_info->control_buffer->sem_response);
> + sem_destroy (&conn_info->control_buffer->sem_dispatch);
> #else
> - semctl (conn_info->semid, 0, IPC_RMID);
> + semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
> #endif
>
> /*
> @@ -1181,33 +1167,11 @@
> int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
> {
> struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#endif
> - int res;
>
> memcpy (conn_info->response_buffer, msg, mlen);
>
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> - res = sem_post (&conn_info->control_buffer->sem1);
> - if (res == -1) {
> - return (-1);
> - }
> -#else
> - sop.sem_num = 1;
> - sop.sem_op = 1;
> - sop.sem_flg = 0;
> + ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
>
> -retry_semop:
> - res = semop (conn_info->semid, &sop, 1);
> - if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semop;
> - } else
> - if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> - return (0);
> - }
> -#endif
> api->stats_increment_value (conn_info->stats_handle, "responses");
> return (0);
> }
> @@ -1215,10 +1179,6 @@
> int coroipcs_response_iov_send (void *conn, const struct iovec *iov,
> unsigned int iov_len)
> {
> struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#endif
> - int res;
> int write_idx = 0;
> int i;
>
> @@ -1228,26 +1188,8 @@
> write_idx += iov[i].iov_len;
> }
>
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> - res = sem_post (&conn_info->control_buffer->sem1);
> - if (res == -1) {
> - return (-1);
> - }
> -#else
> - sop.sem_num = 1;
> - sop.sem_op = 1;
> - sop.sem_flg = 0;
> + ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
>
> -retry_semop:
> - res = semop (conn_info->semid, &sop, 1);
> - if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semop;
> - } else
> - if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> - return (0);
> - }
> -#endif
> api->stats_increment_value (conn_info->stats_handle, "responses");
> return (0);
> }
> @@ -1283,86 +1225,31 @@
> conn_info->control_buffer->write = (write_idx + len) %
> conn_info->dispatch_size;
> }
>
> -/**
> - * simulate the behaviour in coroipcc.c
> - */
> -static int flow_control_event_send (struct conn_info *conn_info, char event)
> -{
> - int new_fc = 0;
> -
> - if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
> - event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> - new_fc = 1;
> - }
> -
> - if (conn_info->flow_control_state != new_fc) {
> - if (new_fc == 1) {
> - log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control
> for %d, event %d\n",
> - conn_info->client_pid, event);
> - } else {
> - log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control
> for %d, event %d\n",
> - conn_info->client_pid, event);
> - }
> - conn_info->flow_control_state = new_fc;
> - api->stats_update_value (conn_info->stats_handle,
> "flow_control",
> - &conn_info->flow_control_state,
> - sizeof(conn_info->flow_control_state));
> - api->stats_increment_value (conn_info->stats_handle,
> "flow_control_count");
> - }
> -
> - return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
> -}
> -
> static void msg_send (void *conn, const struct iovec *iov, unsigned int
> iov_len,
> int locked)
> {
> struct conn_info *conn_info = (struct conn_info *)conn;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#endif
> int res;
> int i;
> + char buf;
>
> for (i = 0; i < iov_len; i++) {
> memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
> }
>
> - if (list_empty (&conn_info->outq_head))
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_OUTQ_EMPTY);
> - else
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_OUTQ_NOT_EMPTY);
> -
> - if (res == -1 && errno == EAGAIN) {
> - if (locked == 0) {
> - pthread_mutex_lock (&conn_info->mutex);
> - }
> + buf = list_empty (&conn_info->outq_head);
> + res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> + if (res != 1) {
> conn_info->pending_semops += 1;
> - if (locked == 0) {
> - pthread_mutex_unlock (&conn_info->mutex);
> + if (conn_info->poll_state == POLL_STATE_IN) {
> + conn_info->poll_state = POLL_STATE_INOUT;
> + api->poll_dispatch_modify (conn_info->fd,
> + POLLIN|POLLOUT|POLLNVAL);
> }
> - api->poll_dispatch_modify (conn_info->fd,
> - POLLIN|POLLOUT|POLLNVAL);
> - } else
> - if (res == -1) {
> - ipc_disconnect (conn_info);
> }
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> - res = sem_post (&conn_info->control_buffer->sem2);
> -#else
> - sop.sem_num = 2;
> - sop.sem_op = 1;
> - sop.sem_flg = 0;
>
> -retry_semop:
> - res = semop (conn_info->semid, &sop, 1);
> - if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
> - api->stats_increment_value (conn_info->stats_handle,
> "sem_retry_count");
> - goto retry_semop;
> - } else
> - if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
> - return;
> - }
> -#endif
> + ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
> +
> api->stats_increment_value (conn_info->stats_handle, "dispatched");
> }
>
> @@ -1371,11 +1258,10 @@
> struct outq_item *outq_item;
> unsigned int bytes_left;
> struct iovec iov;
> - int res;
>
> pthread_mutex_lock (&conn_info->mutex);
> if (list_empty (&conn_info->outq_head)) {
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_OUTQ_FLUSH_NR);
> + flow_control_state_set (conn_info, 0);
> pthread_mutex_unlock (&conn_info->mutex);
> return;
> }
> @@ -1441,7 +1327,7 @@
> semun.buf = &ipc_set;
>
> for (i = 0; i < 3; i++) {
> - res = semctl (conn_info->semid, 0, IPC_SET, semun);
> + res = semctl (conn_info->control_buffer->semid, 0, IPC_SET,
> semun);
> if (res == -1) {
> return (-1);
> }
> @@ -1471,6 +1357,7 @@
> bytes_msg += iov[i].iov_len;
> }
> if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
> + flow_control_state_set (conn_info, 1);
> outq_item = api->malloc (sizeof (struct outq_item));
> if (outq_item == NULL) {
> ipc_disconnect (conn);
> @@ -1491,11 +1378,6 @@
> outq_item->mlen = bytes_msg;
> list_init (&outq_item->list);
> pthread_mutex_lock (&conn_info->mutex);
> - if (list_empty (&conn_info->outq_head)) {
> - conn_info->notify_flow_control_enabled = 1;
> - api->poll_dispatch_modify (conn_info->fd,
> - POLLIN|POLLOUT|POLLNVAL);
> - }
> list_add_tail (&outq_item->list, &conn_info->outq_head);
> pthread_mutex_unlock (&conn_info->mutex);
> api->stats_increment_value (conn_info->stats_handle,
> "queue_size");
> @@ -1742,11 +1624,10 @@
>
> conn_info->service = req_setup->service;
> conn_info->refcount = 0;
> - conn_info->notify_flow_control_enabled = 0;
> conn_info->setup_bytes_read = 0;
>
> #if _POSIX_THREAD_PROCESS_SHARED < 1
> - conn_info->semid = semget (conn_info->semkey, 3, 0600);
> + conn_info->control_buffer->semid = semget (conn_info->semkey,
> 3, 0600);
> #endif
> conn_info->pending_semops = 0;
>
> @@ -1794,9 +1675,6 @@
> res = recv (fd, &buf, 1, MSG_NOSIGNAL);
> if (res == 1) {
> switch (buf) {
> - case MESSAGE_REQ_OUTQ_FLUSH:
> - outq_flush (conn_info);
> - break;
> case MESSAGE_REQ_CHANGE_EUID:
> if (priv_change (conn_info) == -1) {
> ipc_disconnect (conn_info);
> @@ -1820,37 +1698,24 @@
> coroipcs_refcount_dec (conn_info);
> }
>
> - coroipcs_refcount_inc (conn_info);
> - pthread_mutex_lock (&conn_info->mutex);
> - if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent &
> POLLOUT)) {
> - if (list_empty (&conn_info->outq_head))
> - buf = MESSAGE_RES_OUTQ_EMPTY;
> - else
> - buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
> + if (revent & POLLOUT) {
> + int psop = conn_info->pending_semops;
> + int i;
>
> - for (; conn_info->pending_semops;) {
> - res = flow_control_event_send (conn_info, buf);
> - if (res == 1) {
> - conn_info->pending_semops--;
> + assert (psop != 0);
> + for (i = 0; i < psop; i++) {
> + res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> + if (res != 1) {
> + return (0);
> } else {
> - break;
> + conn_info->pending_semops -= 1;
> }
> }
> - if (conn_info->notify_flow_control_enabled) {
> - res = flow_control_event_send (conn_info,
> MESSAGE_RES_ENABLE_FLOWCONTROL);
> - if (res == 1) {
> - conn_info->notify_flow_control_enabled = 0;
> - }
> + if (conn_info->poll_state == POLL_STATE_INOUT) {
> + conn_info->poll_state = POLL_STATE_IN;
> + api->poll_dispatch_modify (conn_info->fd,
> POLLIN|POLLNVAL);
> }
> - if (conn_info->notify_flow_control_enabled == 0 &&
> - conn_info->pending_semops == 0) {
> -
> - api->poll_dispatch_modify (conn_info->fd,
> - POLLIN|POLLNVAL);
> - }
> }
> - pthread_mutex_unlock (&conn_info->mutex);
> - coroipcs_refcount_dec (conn_info);
>
> return (0);
> }
> Index: lib/coroipcc.c
> ===================================================================
> --- lib/coroipcc.c (revision 3000)
> +++ lib/coroipcc.c (working copy)
> @@ -72,17 +72,8 @@
>
> #include "util.h"
>
> -/*
> - * Define sem_wait timeout (real timeout will be (n-1;n) )
> - */
> -#define IPC_SEMWAIT_TIMEOUT 2
> -
> struct ipc_instance {
> int fd;
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - int semid;
> -#endif
> - int flow_control_state;
> struct control_buffer *control_buffer;
> char *request_buffer;
> char *response_buffer;
> @@ -117,6 +108,23 @@
> #define MSG_NOSIGNAL 0
> #endif
>
> +static inline int shared_mem_dispatch_bytes_left (struct ipc_instance
> *context)
> +{
> + unsigned int n_read;
> + unsigned int n_write;
> + unsigned int bytes_left;
> +
> + n_read = context->control_buffer->read;
> + n_write = context->control_buffer->write;
> +
> + if (n_read <= n_write) {
> + bytes_left = context->dispatch_size - n_write + n_read;
> + } else {
> + bytes_left = n_read - n_write;
> + }
> + return (bytes_left);
> +}
> +
> static cs_error_t
> socket_send (
> int s,
> @@ -238,10 +246,10 @@
> return (res);
> }
>
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> static int
> priv_change_send (struct ipc_instance *ipc_instance)
> {
> +#if _POSIX_THREAD_PROCESS_SHARED < 1
> char buf_req;
> mar_req_priv_change req_priv_change;
> unsigned int res;
> @@ -268,19 +276,12 @@
> }
>
> ipc_instance->euid = req_priv_change.euid;
> +#else
> + ipc_instance = NULL;
> +#endif
> return (0);
> }
>
> -#if defined(_SEM_SEMUN_UNDEFINED)
> -union semun {
> - int val;
> - struct semid_ds *buf;
> - unsigned short int *array;
> - struct seminfo *__buf;
> -};
> -#endif
> -#endif
> -
> static int
> circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
> {
> @@ -471,10 +472,6 @@
> const struct iovec *iov,
> unsigned int iov_len)
> {
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#endif
> -
> int i;
> int res;
> int req_buffer_idx = 0;
> @@ -490,117 +487,18 @@
> req_buffer_idx += iov[i].iov_len;
> }
>
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> - res = sem_post (&ipc_instance->control_buffer->sem0);
> - if (res == -1) {
> - return (CS_ERR_LIBRARY);
> - }
> -#else
> /*
> - * Signal semaphore #0 indicting a new message from client
> + * Signal semaphore #3 and #0 indicting a new message from client
> * to server request queue
> */
> - sop.sem_num = 0;
> - sop.sem_op = 1;
> - sop.sem_flg = 0;
> -
> -retry_semop:
> - res = semop (ipc_instance->semid, &sop, 1);
> - if (res == -1 && errno == EINTR) {
> - return (CS_ERR_TRY_AGAIN);
> - } else
> - if (res == -1 && errno == EACCES) {
> - priv_change_send (ipc_instance);
> - goto retry_semop;
> - } else
> - if (res == -1) {
> + res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
> + if (res != CS_OK) {
> return (CS_ERR_LIBRARY);
> }
> -#endif
> - return (CS_OK);
> -}
> -
> -inline static cs_error_t
> -ipc_sem_wait (
> - struct ipc_instance *ipc_instance,
> - int sem_num)
> -{
> -#if _POSIX_THREAD_PROCESS_SHARED < 1
> - struct sembuf sop;
> -#else
> - struct timespec timeout;
> - struct pollfd pfd;
> - sem_t *sem = NULL;
> -#endif
> - int res;
> -
> -#if _POSIX_THREAD_PROCESS_SHARED > 0
> - switch (sem_num) {
> - case 0:
> - sem = &ipc_instance->control_buffer->sem0;
> - break;
> - case 1:
> - sem = &ipc_instance->control_buffer->sem1;
> - break;
> - case 2:
> - sem = &ipc_instance->control_buffer->sem2;
> - break;
> - }
> -
> -retry_semwait:
> - timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
> - timeout.tv_nsec = 0;
> -
> - res = sem_timedwait (sem, &timeout);
> - if (res == -1 && errno == ETIMEDOUT) {
> - pfd.fd = ipc_instance->fd;
> - pfd.events = 0;
> -
> - res = poll (&pfd, 1, 0);
> -
> - if (res == -1 && errno == EINTR) {
> - return (CS_ERR_TRY_AGAIN);
> - } else
> - if (res == -1) {
> - return (CS_ERR_LIBRARY);
> - }
> -
> - if (res == 1) {
> - if (pfd.revents == POLLERR || pfd.revents == POLLHUP ||
> pfd.revents == POLLNVAL) {
> - return (CS_ERR_LIBRARY);
> - }
> - }
> -
> - goto retry_semwait;
> - } else
> - if (res == -1 && errno == EINTR) {
> - return (CS_ERR_TRY_AGAIN);
> - } else
> - if (res == -1) {
> + res = ipc_sem_post (ipc_instance->control_buffer,
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> + if (res != CS_OK) {
> return (CS_ERR_LIBRARY);
> }
> -#else
> - /*
> - * Wait for semaphore indicating a new message from server
> - * to client in queue
> - */
> - sop.sem_num = sem_num;
> - sop.sem_op = -1;
> - sop.sem_flg = 0;
> -
> -retry_semop:
> - res = semop (ipc_instance->semid, &sop, 1);
> - if (res == -1 && errno == EINTR) {
> - return (CS_ERR_TRY_AGAIN);
> - } else
> - if (res == -1 && errno == EACCES) {
> - priv_change_send (ipc_instance);
> - goto retry_semop;
> - } else
> - if (res == -1) {
> - return (CS_ERR_LIBRARY);
> - }
> -#endif
> return (CS_OK);
> }
>
> @@ -611,10 +509,17 @@
> size_t res_len)
> {
> coroipc_response_header_t *response_header;
> - cs_error_t err;
> + cs_error_t res;
>
> - if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
> - return (err);
> +retry_ipc_sem_wait:
> + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
> + if (res != CS_OK) {
> + if (res == CS_ERR_TRY_AGAIN) {
> + priv_change_send (ipc_instance);
> + goto retry_ipc_sem_wait;
> + } else {
> + return (res);
> + }
> }
>
> response_header = (coroipc_response_header_t
> *)ipc_instance->response_buffer;
> @@ -631,10 +536,17 @@
> struct ipc_instance *ipc_instance,
> void **res_msg)
> {
> - cs_error_t err;
> + cs_error_t res;
>
> - if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
> - return (err);
> +retry_ipc_sem_wait:
> + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
> + if (res != CS_OK) {
> + if (res == CS_ERR_TRY_AGAIN) {
> + priv_change_send (ipc_instance);
> + goto retry_ipc_sem_wait;
> + } else {
> + return (res);
> + }
> }
>
> *res_msg = (char *)ipc_instance->response_buffer;
> @@ -754,18 +666,22 @@
> }
>
> #if _POSIX_THREAD_PROCESS_SHARED > 0
> - sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
> - sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
> - sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
> + sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit,
> 1, 0);
> + sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
> + sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
> + sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
> #else
> +{
> + int i;
> +
> /*
> * Allocate a semaphore segment
> */
> while (1) {
> semkey = random();
> ipc_instance->euid = geteuid ();
> - if ((ipc_instance->semid
> - = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
> + if ((ipc_instance->control_buffer->semid
> + = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
> break;
> }
> /*
> @@ -781,18 +697,15 @@
> }
> }
>
> - semun.val = 0;
> - sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
> - if (sys_res != 0) {
> - res = CS_ERR_LIBRARY;
> - goto error_exit;
> + for (i = 0; i < 4; i++) {
> + semun.val = 0;
> + sys_res = semctl (ipc_instance->control_buffer->semid, i,
> SETVAL, semun);
> + if (sys_res != 0) {
> + res = CS_ERR_LIBRARY;
> + goto error_exit;
> + }
> }
> -
> - sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
> - if (sys_res != 0) {
> - res = CS_ERR_LIBRARY;
> - goto error_exit;
> - }
> +}
> #endif
>
> /*
> @@ -822,7 +735,6 @@
> }
>
> ipc_instance->fd = request_fd;
> - ipc_instance->flow_control_state = 0;
>
> if (res_setup.error == CS_ERR_TRY_AGAIN) {
> res = res_setup.error;
> @@ -842,8 +754,8 @@
>
> error_exit:
> #if _POSIX_THREAD_PROCESS_SHARED < 1
> - if (ipc_instance->semid > 0)
> - semctl (ipc_instance->semid, 0, IPC_RMID);
> + if (ipc_instance->control_buffer->semid > 0)
> + semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
> #endif
> memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
> error_dispatch_buffer:
> @@ -893,7 +805,7 @@
> return (res);
> }
>
> - *flow_control_state = ipc_instance->flow_control_state;
> + *flow_control_state =
> ipc_instance->control_buffer->flow_control_enabled;
>
> hdb_handle_put (&ipc_hdb, handle);
> return (res);
> @@ -928,10 +840,9 @@
> int poll_events;
> char buf;
> struct ipc_instance *ipc_instance;
> - int res;
> - char buf_two = 1;
> char *data_addr;
> cs_error_t error = CS_OK;
> + int res;
>
> error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void
> **)&ipc_instance));
> if (error != CS_OK) {
> @@ -962,47 +873,22 @@
> goto error_put;
> }
>
> - res = recv (ipc_instance->fd, &buf, 1, 0);
> - if (res == -1 && errno == EINTR) {
> - error = CS_ERR_TRY_AGAIN;
> - goto error_put;
> - } else
> - if (res == -1) {
> - error = CS_ERR_LIBRARY;
> - goto error_put;
> - } else
> - if (res == 0) {
> - /* Means that the peer closed cleanly the socket. However, it
> should
> - * happen only on BSD and Darwing systems since poll() returns a
> - * POLLHUP event on other systems.
> + error = socket_recv (ipc_instance->fd, &buf, 1);
> + assert (error == CS_OK);
> +
> + if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
> + /*
> + * Notify coroipcs to flush any pending dispatch messages
> */
> - error = CS_ERR_LIBRARY;
> - goto error_put;
> +
> + res = ipc_sem_post (ipc_instance->control_buffer,
> SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
> + if (res != CS_OK) {
> + error = CS_ERR_LIBRARY;
> + goto error_put;
> + }
> +
> +
> }
> - ipc_instance->flow_control_state = 0;
> - if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf ==
> MESSAGE_RES_ENABLE_FLOWCONTROL) {
> - ipc_instance->flow_control_state = 1;
> - }
> - /*
> - * Notify executive to flush any pending dispatch messages
> - */
> - if (ipc_instance->flow_control_state) {
> - buf_two = MESSAGE_REQ_OUTQ_FLUSH;
> - res = socket_send (ipc_instance->fd, &buf_two, 1);
> - assert (res == CS_OK); /* TODO */
> - }
> - /*
> - * This is just a notification of flow control starting at the addition
> - * of a new pending message, not a message to dispatch
> - */
> - if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> - error = CS_ERR_TRY_AGAIN;
> - goto error_put;
> - }
> - if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
> - error = CS_ERR_TRY_AGAIN;
> - goto error_put;
> - }
>
> data_addr = ipc_instance->dispatch_buffer;
>
> @@ -1030,8 +916,15 @@
> return (res);
> }
>
> - if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
> - goto error_exit;
> +retry_ipc_sem_wait:
> + res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
> + if (res != CS_OK) {
> + if (res == CS_ERR_TRY_AGAIN) {
> + priv_change_send (ipc_instance);
> + goto retry_ipc_sem_wait;
> + } else {
> + goto error_exit;
> + }
> }
>
> addr = ipc_instance->dispatch_buffer;
> @@ -1078,8 +971,8 @@
> res = reply_receive (ipc_instance, res_msg, res_len);
>
> error_exit:
> - hdb_handle_put (&ipc_hdb, handle);
> pthread_mutex_unlock (&ipc_instance->mutex);
> + hdb_handle_put (&ipc_hdb, handle);
>
> return (res);
> }
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais