Integrating comments from Angus and generally cleaning up the code a bit
for maintenance purposes.
Regards
-steve
Index: include/corosync/coroipc_ipc.h
===================================================================
--- include/corosync/coroipc_ipc.h (revision 3000)
+++ include/corosync/coroipc_ipc.h (working copy)
@@ -35,6 +35,9 @@
#define COROIPC_IPC_H_DEFINED
#include <unistd.h>
+#include <poll.h>
+#include <time.h>
+#include "corotypes.h"
#include "config.h"
/*
@@ -52,28 +55,40 @@
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
+#else
+#include <sys/sem.h>
#endif
+/*
+ * Define sem_wait timeout (real timeout will be (n-1;n) )
+ */
+#define IPC_SEMWAIT_TIMEOUT 2
+
enum req_init_types {
MESSAGE_REQ_RESPONSE_INIT = 0,
MESSAGE_REQ_DISPATCH_INIT = 1
};
#define MESSAGE_REQ_CHANGE_EUID 1
-#define MESSAGE_REQ_OUTQ_FLUSH 2
-#define MESSAGE_RES_OUTQ_EMPTY 0
-#define MESSAGE_RES_OUTQ_NOT_EMPTY 1
-#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
-#define MESSAGE_RES_OUTQ_FLUSH_NR 3
+enum ipc_semaphore_identifiers {
+ SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT = 0,
+ SEMAPHORE_REQUEST = 1,
+ SEMAPHORE_RESPONSE = 2,
+ SEMAPHORE_DISPATCH = 3
+};
struct control_buffer {
unsigned int read;
unsigned int write;
+ int flow_control_enabled;
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_t sem0;
- sem_t sem1;
- sem_t sem2;
+ sem_t sem_request_or_flush_or_exit;
+ sem_t sem_response;
+ sem_t sem_dispatch;
+ sem_t sem_request;
+#else
+ int semid;
#endif
};
@@ -145,4 +160,172 @@
#define ZC_FREE_HEADER 0xFFFFFFFE
#define ZC_EXECUTE_HEADER 0xFFFFFFFD
+static inline cs_error_t
+ipc_sem_wait (
+ struct control_buffer *control_buffer,
+ enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+#else
+ struct timespec timeout;
+ sem_t *sem = NULL;
+#endif
+ int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ switch (sem_id) {
+ case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+ sem = &control_buffer->sem_request_or_flush_or_exit;
+ break;
+ case SEMAPHORE_RESPONSE:
+ sem = &control_buffer->sem_request;
+ break;
+ case SEMAPHORE_DISPATCH:
+ sem = &control_buffer->sem_response;
+ break;
+ case SEMAPHORE_REQUEST:
+ sem = &control_buffer->sem_dispatch;
+ break;
+ }
+
+ timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
+ timeout.tv_nsec = 0;
+
+retry_sem_timedwait:
+ res = sem_timedwait (sem, &timeout);
+ if (res == -1 && errno == ETIMEDOUT) {
+ return (CS_ERR_LIBRARY);
+ } else
+ if (res == -1 && errno == EINTR) {
+ goto retry_sem_timedwait;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
+ /*
+ * Wait for semaphore indicating a new message from server
+ * to client in queue
+ */
+ sop.sem_num = sem_id;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (control_buffer->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ return (CS_ERR_TRY_AGAIN);
+ goto retry_semop;
+ } else
+ if (res == -1 && errno == EACCES) {
+ return (CS_ERR_TRY_AGAIN);
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#endif
+ return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_post (
+ struct control_buffer *control_buffer,
+ enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+#else
+ sem_t *sem = NULL;
+#endif
+ int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ switch (sem_id) {
+ case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+ sem = &control_buffer->sem_request_or_flush_or_exit;
+ break;
+ case SEMAPHORE_RESPONSE:
+ sem = &control_buffer->sem_request;
+ break;
+ case SEMAPHORE_DISPATCH:
+ sem = &control_buffer->sem_response;
+ break;
+ case SEMAPHORE_REQUEST:
+ sem = &control_buffer->sem_dispatch;
+ break;
+ }
+
+ res = sem_post (sem);
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
+ sop.sem_num = sem_id;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (control_buffer->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#endif
+ return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_getvalue (
+ struct control_buffer *control_buffer,
+ enum ipc_semaphore_identifiers sem_id,
+ int *sem_value)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+ int sem_value_hold;
+#else
+ sem_t *sem = NULL;
+#endif
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ switch (sem_id) {
+ case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+ sem = &control_buffer->sem_request_or_flush_or_exit;
+ break;
+ case SEMAPHORE_RESPONSE:
+ sem = &control_buffer->sem_request;
+ break;
+ case SEMAPHORE_DISPATCH:
+ sem = &control_buffer->sem_response;
+ break;
+ case SEMAPHORE_REQUEST:
+ sem = &control_buffer->sem_dispatch;
+ break;
+ }
+
+ res = sem_getvalue (sem, sem_value);
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
+ sop.sem_num = sem_id;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semctl:
+ sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
+ if (sem_value_hold == -1 && errno == EINTR) {
+ goto retry_semctl;
+ } else
+ if (sem_value_hold == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+ *sem_value = sem_value_hold;
+#endif
+ return (CS_OK);
+}
+
#endif /* COROIPC_IPC_H_DEFINED */
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c (revision 3000)
+++ exec/coroipcs.c (working copy)
@@ -95,6 +95,9 @@
#define MSG_SEND_LOCKED 0
#define MSG_SEND_UNLOCKED 1
+#define POLL_STATE_IN 1
+#define POLL_STATE_INOUT 2
+
static struct coroipcs_init_state_v2 *api = NULL;
DECLARE_LIST_INIT (conn_info_list_head);
@@ -141,13 +144,10 @@
pthread_attr_t thread_attr;
unsigned int service;
enum conn_state state;
- int notify_flow_control_enabled;
- int flow_control_state;
int refcount;
hdb_handle_t stats_handle;
#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey;
- int semid;
#endif
unsigned int pending_semops;
pthread_mutex_t mutex;
@@ -166,6 +166,7 @@
unsigned int setup_bytes_read;
struct list_head zcb_mapped_list_head;
char *sending_allowed_private_data[64];
+ int poll_state;
};
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -221,34 +222,6 @@
{
}
-static void sem_post_exit_thread (struct conn_info *conn_info)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semop:
- res = sem_post (&conn_info->control_buffer->sem0);
- if (res == -1 && errno == EINTR) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- }
-#else
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- }
-#endif
-}
-
static int
memory_map (
const char *path,
@@ -383,6 +356,34 @@
return (res);
}
+static inline void flow_control_state_set (
+ struct conn_info *conn_info,
+ int flow_control_state)
+{
+ if (conn_info->control_buffer->flow_control_enabled ==
flow_control_state) {
+ return;
+ }
+ if (flow_control_state == 0) {
+ log_printf (LOGSYS_LEVEL_DEBUG,
+ "Disabling flow control for %d\n",
+ conn_info->client_pid);
+ } else
+ if (flow_control_state == 1) {
+ log_printf (LOGSYS_LEVEL_DEBUG,
+ "Enabling flow control for %d\n",
+ conn_info->client_pid);
+ }
+
+
+ conn_info->control_buffer->flow_control_enabled = flow_control_state;
+ api->stats_update_value (conn_info->stats_handle,
+ "flow_control",
+ &flow_control_state,
+ sizeof(flow_control_state));
+ api->stats_increment_value (conn_info->stats_handle,
+ "flow_control_count");
+}
+
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
{
unsigned int res;
@@ -517,7 +518,7 @@
}
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
- sem_post_exit_thread (conn_info);
+ ipc_sem_post (conn_info->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
return (0);
}
@@ -546,11 +547,12 @@
pthread_mutex_unlock (&conn_info->mutex);
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_destroy (&conn_info->control_buffer->sem0);
- sem_destroy (&conn_info->control_buffer->sem1);
- sem_destroy (&conn_info->control_buffer->sem2);
+ sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+ sem_destroy (&conn_info->control_buffer->sem_request);
+ sem_destroy (&conn_info->control_buffer->sem_response);
+ sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
- semctl (conn_info->semid, 0, IPC_RMID);
+ semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
* Destroy shared memory segment and semaphore
@@ -653,14 +655,12 @@
static void *pthread_ipc_consumer (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
int res;
coroipc_request_header_t *header;
coroipc_response_header_t coroipc_response_header;
int send_ok;
unsigned int new_message;
+ int sem_value = 0;
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
if (api->sched_policy != 0) {
@@ -670,43 +670,28 @@
#endif
for (;;) {
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semwait:
- res = sem_wait (&conn_info->control_buffer->sem0);
+ ipc_sem_wait (conn_info->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
if (ipc_thread_active (conn_info) == 0) {
coroipcs_refcount_dec (conn_info);
pthread_exit (0);
}
- if ((res == -1) && (errno == EINTR)) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semwait;
- }
-#else
- sop.sem_num = 0;
- sop.sem_op = -1;
- sop.sem_flg = 0;
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if (ipc_thread_active (conn_info) == 0) {
- coroipcs_refcount_dec (conn_info);
- pthread_exit (0);
- }
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- coroipcs_refcount_dec (conn_info);
- pthread_exit (0);
- }
-#endif
+ outq_flush (conn_info);
+ ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST,
&sem_value);
+ if (sem_value > 0) {
+
+ res = ipc_sem_wait (conn_info->control_buffer,
SEMAPHORE_REQUEST);
+ } else {
+ continue;
+ }
+
zerocopy_operations_process (conn_info, &header, &new_message);
/*
* There is no new message to process, continue for loop
*/
if (new_message == 0) {
+printf ("continuing\n");
continue;
}
@@ -738,7 +723,6 @@
/*
* Overload, tell library to retry
*/
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
coroipc_response_header.size = sizeof
(coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -928,7 +912,7 @@
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
pthread_mutex_unlock (&conn_info->mutex);
- sem_post_exit_thread (conn_info);
+ ipc_sem_post (conn_info->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
}
static int conn_info_create (int fd)
@@ -945,6 +929,7 @@
conn_info->client_pid = 0;
conn_info->service = SOCKET_SERVICE_INIT;
conn_info->state = CONN_STATE_THREAD_INACTIVE;
+ conn_info->poll_state = POLL_STATE_IN;
list_init (&conn_info->outq_head);
list_init (&conn_info->list);
list_init (&conn_info->zcb_mapped_list_head);
@@ -1103,11 +1088,12 @@
ipc_disconnect (conn_info);
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_destroy (&conn_info->control_buffer->sem0);
- sem_destroy (&conn_info->control_buffer->sem1);
- sem_destroy (&conn_info->control_buffer->sem2);
+ sem_destroy
(&conn_info->control_buffer->sem_request_or_flush_or_exit);
+ sem_destroy (&conn_info->control_buffer->sem_request);
+ sem_destroy (&conn_info->control_buffer->sem_response);
+ sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
- semctl (conn_info->semid, 0, IPC_RMID);
+ semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
@@ -1181,33 +1167,11 @@
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
memcpy (conn_info->response_buffer, msg, mlen);
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&conn_info->control_buffer->sem1);
- if (res == -1) {
- return (-1);
- }
-#else
- sop.sem_num = 1;
- sop.sem_op = 1;
- sop.sem_flg = 0;
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- return (0);
- }
-#endif
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
@@ -1215,10 +1179,6 @@
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned
int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
int write_idx = 0;
int i;
@@ -1228,26 +1188,8 @@
write_idx += iov[i].iov_len;
}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&conn_info->control_buffer->sem1);
- if (res == -1) {
- return (-1);
- }
-#else
- sop.sem_num = 1;
- sop.sem_op = 1;
- sop.sem_flg = 0;
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- return (0);
- }
-#endif
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
@@ -1283,86 +1225,31 @@
conn_info->control_buffer->write = (write_idx + len) %
conn_info->dispatch_size;
}
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
- int new_fc = 0;
-
- if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
- event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- new_fc = 1;
- }
-
- if (conn_info->flow_control_state != new_fc) {
- if (new_fc == 1) {
- log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control
for %d, event %d\n",
- conn_info->client_pid, event);
- } else {
- log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control
for %d, event %d\n",
- conn_info->client_pid, event);
- }
- conn_info->flow_control_state = new_fc;
- api->stats_update_value (conn_info->stats_handle,
"flow_control",
- &conn_info->flow_control_state,
- sizeof(conn_info->flow_control_state));
- api->stats_increment_value (conn_info->stats_handle,
"flow_control_count");
- }
-
- return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
static void msg_send (void *conn, const struct iovec *iov, unsigned int
iov_len,
int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
int res;
int i;
+ char buf;
for (i = 0; i < iov_len; i++) {
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
}
- if (list_empty (&conn_info->outq_head))
- res = flow_control_event_send (conn_info,
MESSAGE_RES_OUTQ_EMPTY);
- else
- res = flow_control_event_send (conn_info,
MESSAGE_RES_OUTQ_NOT_EMPTY);
-
- if (res == -1 && errno == EAGAIN) {
- if (locked == 0) {
- pthread_mutex_lock (&conn_info->mutex);
- }
+ buf = list_empty (&conn_info->outq_head);
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res != 1) {
conn_info->pending_semops += 1;
- if (locked == 0) {
- pthread_mutex_unlock (&conn_info->mutex);
+ if (conn_info->poll_state == POLL_STATE_IN) {
+ conn_info->poll_state = POLL_STATE_INOUT;
+ api->poll_dispatch_modify (conn_info->fd,
+ POLLIN|POLLOUT|POLLNVAL);
}
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLOUT|POLLNVAL);
- } else
- if (res == -1) {
- ipc_disconnect (conn_info);
}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&conn_info->control_buffer->sem2);
-#else
- sop.sem_num = 2;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- return;
- }
-#endif
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
+
api->stats_increment_value (conn_info->stats_handle, "dispatched");
}
@@ -1371,11 +1258,10 @@
struct outq_item *outq_item;
unsigned int bytes_left;
struct iovec iov;
- int res;
pthread_mutex_lock (&conn_info->mutex);
if (list_empty (&conn_info->outq_head)) {
- res = flow_control_event_send (conn_info,
MESSAGE_RES_OUTQ_FLUSH_NR);
+ flow_control_state_set (conn_info, 0);
pthread_mutex_unlock (&conn_info->mutex);
return;
}
@@ -1441,7 +1327,7 @@
semun.buf = &ipc_set;
for (i = 0; i < 3; i++) {
- res = semctl (conn_info->semid, 0, IPC_SET, semun);
+ res = semctl (conn_info->control_buffer->semid, 0, IPC_SET,
semun);
if (res == -1) {
return (-1);
}
@@ -1471,6 +1357,7 @@
bytes_msg += iov[i].iov_len;
}
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+ flow_control_state_set (conn_info, 1);
outq_item = api->malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
ipc_disconnect (conn);
@@ -1491,11 +1378,6 @@
outq_item->mlen = bytes_msg;
list_init (&outq_item->list);
pthread_mutex_lock (&conn_info->mutex);
- if (list_empty (&conn_info->outq_head)) {
- conn_info->notify_flow_control_enabled = 1;
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLOUT|POLLNVAL);
- }
list_add_tail (&outq_item->list, &conn_info->outq_head);
pthread_mutex_unlock (&conn_info->mutex);
api->stats_increment_value (conn_info->stats_handle,
"queue_size");
@@ -1742,11 +1624,10 @@
conn_info->service = req_setup->service;
conn_info->refcount = 0;
- conn_info->notify_flow_control_enabled = 0;
conn_info->setup_bytes_read = 0;
#if _POSIX_THREAD_PROCESS_SHARED < 1
- conn_info->semid = semget (conn_info->semkey, 3, 0600);
+ conn_info->control_buffer->semid = semget (conn_info->semkey,
3, 0600);
#endif
conn_info->pending_semops = 0;
@@ -1794,9 +1675,6 @@
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
if (res == 1) {
switch (buf) {
- case MESSAGE_REQ_OUTQ_FLUSH:
- outq_flush (conn_info);
- break;
case MESSAGE_REQ_CHANGE_EUID:
if (priv_change (conn_info) == -1) {
ipc_disconnect (conn_info);
@@ -1820,37 +1698,24 @@
coroipcs_refcount_dec (conn_info);
}
- coroipcs_refcount_inc (conn_info);
- pthread_mutex_lock (&conn_info->mutex);
- if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent &
POLLOUT)) {
- if (list_empty (&conn_info->outq_head))
- buf = MESSAGE_RES_OUTQ_EMPTY;
- else
- buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+ if (revent & POLLOUT) {
+ int psop = conn_info->pending_semops;
+ int i;
- for (; conn_info->pending_semops;) {
- res = flow_control_event_send (conn_info, buf);
- if (res == 1) {
- conn_info->pending_semops--;
+ assert (psop != 0);
+ for (i = 0; i < psop; i++) {
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res != 1) {
+ return (0);
} else {
- break;
+ conn_info->pending_semops -= 1;
}
}
- if (conn_info->notify_flow_control_enabled) {
- res = flow_control_event_send (conn_info,
MESSAGE_RES_ENABLE_FLOWCONTROL);
- if (res == 1) {
- conn_info->notify_flow_control_enabled = 0;
- }
+ if (conn_info->poll_state == POLL_STATE_INOUT) {
+ conn_info->poll_state = POLL_STATE_IN;
+ api->poll_dispatch_modify (conn_info->fd,
POLLIN|POLLNVAL);
}
- if (conn_info->notify_flow_control_enabled == 0 &&
- conn_info->pending_semops == 0) {
-
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLNVAL);
- }
}
- pthread_mutex_unlock (&conn_info->mutex);
- coroipcs_refcount_dec (conn_info);
return (0);
}
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c (revision 3000)
+++ lib/coroipcc.c (working copy)
@@ -72,17 +72,8 @@
#include "util.h"
-/*
- * Define sem_wait timeout (real timeout will be (n-1;n) )
- */
-#define IPC_SEMWAIT_TIMEOUT 2
-
struct ipc_instance {
int fd;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- int semid;
-#endif
- int flow_control_state;
struct control_buffer *control_buffer;
char *request_buffer;
char *response_buffer;
@@ -117,6 +108,23 @@
#define MSG_NOSIGNAL 0
#endif
+static inline int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+ unsigned int n_read;
+ unsigned int n_write;
+ unsigned int bytes_left;
+
+ n_read = context->control_buffer->read;
+ n_write = context->control_buffer->write;
+
+ if (n_read <= n_write) {
+ bytes_left = context->dispatch_size - n_write + n_read;
+ } else {
+ bytes_left = n_read - n_write;
+ }
+ return (bytes_left);
+}
+
static cs_error_t
socket_send (
int s,
@@ -238,10 +246,10 @@
return (res);
}
-#if _POSIX_THREAD_PROCESS_SHARED < 1
static int
priv_change_send (struct ipc_instance *ipc_instance)
{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
char buf_req;
mar_req_priv_change req_priv_change;
unsigned int res;
@@ -268,19 +276,12 @@
}
ipc_instance->euid = req_priv_change.euid;
+#else
+ ipc_instance = NULL;
+#endif
return (0);
}
-#if defined(_SEM_SEMUN_UNDEFINED)
-union semun {
- int val;
- struct semid_ds *buf;
- unsigned short int *array;
- struct seminfo *__buf;
-};
-#endif
-#endif
-
static int
circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
{
@@ -471,10 +472,6 @@
const struct iovec *iov,
unsigned int iov_len)
{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
-
int i;
int res;
int req_buffer_idx = 0;
@@ -490,117 +487,18 @@
req_buffer_idx += iov[i].iov_len;
}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&ipc_instance->control_buffer->sem0);
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
-#else
/*
- * Signal semaphore #0 indicting a new message from client
+ * Signal semaphore #3 and #0 indicting a new message from client
* to server request queue
*/
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (ipc_instance->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_instance);
- goto retry_semop;
- } else
- if (res == -1) {
+ res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
+ if (res != CS_OK) {
return (CS_ERR_LIBRARY);
}
-#endif
- return (CS_OK);
-}
-
-inline static cs_error_t
-ipc_sem_wait (
- struct ipc_instance *ipc_instance,
- int sem_num)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#else
- struct timespec timeout;
- struct pollfd pfd;
- sem_t *sem = NULL;
-#endif
- int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- switch (sem_num) {
- case 0:
- sem = &ipc_instance->control_buffer->sem0;
- break;
- case 1:
- sem = &ipc_instance->control_buffer->sem1;
- break;
- case 2:
- sem = &ipc_instance->control_buffer->sem2;
- break;
- }
-
-retry_semwait:
- timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
- timeout.tv_nsec = 0;
-
- res = sem_timedwait (sem, &timeout);
- if (res == -1 && errno == ETIMEDOUT) {
- pfd.fd = ipc_instance->fd;
- pfd.events = 0;
-
- res = poll (&pfd, 1, 0);
-
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
-
- if (res == 1) {
- if (pfd.revents == POLLERR || pfd.revents == POLLHUP ||
pfd.revents == POLLNVAL) {
- return (CS_ERR_LIBRARY);
- }
- }
-
- goto retry_semwait;
- } else
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1) {
+ res = ipc_sem_post (ipc_instance->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+ if (res != CS_OK) {
return (CS_ERR_LIBRARY);
}
-#else
- /*
- * Wait for semaphore indicating a new message from server
- * to client in queue
- */
- sop.sem_num = sem_num;
- sop.sem_op = -1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (ipc_instance->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_instance);
- goto retry_semop;
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
-#endif
return (CS_OK);
}
@@ -611,10 +509,17 @@
size_t res_len)
{
coroipc_response_header_t *response_header;
- cs_error_t err;
+ cs_error_t res;
- if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
- return (err);
+retry_ipc_sem_wait:
+ res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+ if (res != CS_OK) {
+ if (res == CS_ERR_TRY_AGAIN) {
+ priv_change_send (ipc_instance);
+ goto retry_ipc_sem_wait;
+ } else {
+ return (res);
+ }
}
response_header = (coroipc_response_header_t
*)ipc_instance->response_buffer;
@@ -631,10 +536,17 @@
struct ipc_instance *ipc_instance,
void **res_msg)
{
- cs_error_t err;
+ cs_error_t res;
- if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
- return (err);
+retry_ipc_sem_wait:
+ res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+ if (res != CS_OK) {
+ if (res == CS_ERR_TRY_AGAIN) {
+ priv_change_send (ipc_instance);
+ goto retry_ipc_sem_wait;
+ } else {
+ return (res);
+ }
}
*res_msg = (char *)ipc_instance->response_buffer;
@@ -754,18 +666,22 @@
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
- sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
- sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit,
1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
#else
+{
+ int i;
+
/*
* Allocate a semaphore segment
*/
while (1) {
semkey = random();
ipc_instance->euid = geteuid ();
- if ((ipc_instance->semid
- = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+ if ((ipc_instance->control_buffer->semid
+ = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
break;
}
/*
@@ -781,18 +697,15 @@
}
}
- semun.val = 0;
- sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
- if (sys_res != 0) {
- res = CS_ERR_LIBRARY;
- goto error_exit;
+ for (i = 0; i < 4; i++) {
+ semun.val = 0;
+ sys_res = semctl (ipc_instance->control_buffer->semid, i,
SETVAL, semun);
+ if (sys_res != 0) {
+ res = CS_ERR_LIBRARY;
+ goto error_exit;
+ }
}
-
- sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
- if (sys_res != 0) {
- res = CS_ERR_LIBRARY;
- goto error_exit;
- }
+}
#endif
/*
@@ -822,7 +735,6 @@
}
ipc_instance->fd = request_fd;
- ipc_instance->flow_control_state = 0;
if (res_setup.error == CS_ERR_TRY_AGAIN) {
res = res_setup.error;
@@ -842,8 +754,8 @@
error_exit:
#if _POSIX_THREAD_PROCESS_SHARED < 1
- if (ipc_instance->semid > 0)
- semctl (ipc_instance->semid, 0, IPC_RMID);
+ if (ipc_instance->control_buffer->semid > 0)
+ semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
#endif
memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
error_dispatch_buffer:
@@ -893,7 +805,7 @@
return (res);
}
- *flow_control_state = ipc_instance->flow_control_state;
+ *flow_control_state =
ipc_instance->control_buffer->flow_control_enabled;
hdb_handle_put (&ipc_hdb, handle);
return (res);
@@ -928,10 +840,9 @@
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
- int res;
- char buf_two = 1;
char *data_addr;
cs_error_t error = CS_OK;
+ int res;
error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void
**)&ipc_instance));
if (error != CS_OK) {
@@ -962,47 +873,22 @@
goto error_put;
}
- res = recv (ipc_instance->fd, &buf, 1, 0);
- if (res == -1 && errno == EINTR) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- } else
- if (res == -1) {
- error = CS_ERR_LIBRARY;
- goto error_put;
- } else
- if (res == 0) {
- /* Means that the peer closed cleanly the socket. However, it
should
- * happen only on BSD and Darwing systems since poll() returns a
- * POLLHUP event on other systems.
+ error = socket_recv (ipc_instance->fd, &buf, 1);
+ assert (error == CS_OK);
+
+ if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+ /*
+ * Notify coroipcs to flush any pending dispatch messages
*/
- error = CS_ERR_LIBRARY;
- goto error_put;
+
+ res = ipc_sem_post (ipc_instance->control_buffer,
SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+ if (res != CS_OK) {
+ error = CS_ERR_LIBRARY;
+ goto error_put;
+ }
+
+
}
- ipc_instance->flow_control_state = 0;
- if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf ==
MESSAGE_RES_ENABLE_FLOWCONTROL) {
- ipc_instance->flow_control_state = 1;
- }
- /*
- * Notify executive to flush any pending dispatch messages
- */
- if (ipc_instance->flow_control_state) {
- buf_two = MESSAGE_REQ_OUTQ_FLUSH;
- res = socket_send (ipc_instance->fd, &buf_two, 1);
- assert (res == CS_OK); /* TODO */
- }
- /*
- * This is just a notification of flow control starting at the addition
- * of a new pending message, not a message to dispatch
- */
- if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- }
- if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- }
data_addr = ipc_instance->dispatch_buffer;
@@ -1030,8 +916,15 @@
return (res);
}
- if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
- goto error_exit;
+retry_ipc_sem_wait:
+ res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
+ if (res != CS_OK) {
+ if (res == CS_ERR_TRY_AGAIN) {
+ priv_change_send (ipc_instance);
+ goto retry_ipc_sem_wait;
+ } else {
+ goto error_exit;
+ }
}
addr = ipc_instance->dispatch_buffer;
@@ -1078,8 +971,8 @@
res = reply_receive (ipc_instance, res_msg, res_len);
error_exit:
- hdb_handle_put (&ipc_hdb, handle);
pthread_mutex_unlock (&ipc_instance->mutex);
+ hdb_handle_put (&ipc_hdb, handle);
return (res);
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais