patch didn't attach
On Fri, 2009-04-17 at 22:24 -0700, Steven Dake wrote:
> In the past ipc system, we passed a buffer into coroipcc_dispatch_recv
> and the buffer was memcpied to from the shared memory segment. This
> resulted in an extra memcpy and the need to have a buffer of appropriate
> size to return the data into. Jim recently added a patch which verified
> the size of the buffer which broke every service since they all used
> undersized buffers.
>
> This patch uses the mmap system call to create a shared memory area
> backed by a file for the dispatch data. The file is removed and is not
> saved. The mmap magic creates a segment twice the size of the dispatch
> data, then maps the first dispatch segment into it, then maps the same
> dispatch segment into the second part of it. This allows us to do a
> direct memcpy and rely on the OS VM subsystem to map the reads or writes
> to the appropriate locations. The end result is that we no longer have
> to allocate a buffer in our dispatch routine and we can directly use the
> dispatch buffer's data in our applications. memcpy gone, memory
> utilization reduced with a possible effect on portability. Now all the
> corosync and openais services work properly. For systems that don't
> support the mmap feature we depend on, we can work around this by
> allocating a large enough buffer in dispatch_get and free it in
> dispatch_put and do a memcpy. Then conditionally compile the mmap
> routines that create the mapped circular buffer.
>
> Interested in experiences on Darwin/BSD systems.
>
> Regards
> -steve
>
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais
Index: test/evsbench.c
===================================================================
--- test/evsbench.c (revision 2078)
+++ test/evsbench.c (working copy)
@@ -74,7 +74,6 @@
size_t msg_len)
{
const char *m = msg;
- printf ("Delivering message %s\n", m);
}
static void evs_confchg_fn (
Index: test/testevs.c
===================================================================
--- test/testevs.c (revision 2078)
+++ test/testevs.c (working copy)
@@ -126,8 +126,6 @@
printf ("Init result %d\n", result);
result = evs_join (handle, groups, 3);
printf ("Join result %d\n", result);
- result = evs_leave (handle, &groups[0], 1);
- printf ("Leave result %d\n", result);
delivery_string = "evs_mcast_joined";
/*
@@ -145,7 +143,6 @@
result = evs_mcast_joined (handle, EVS_TYPE_AGREED,
&iov, 1);
if (result == CS_ERR_TRY_AGAIN) {
-//printf ("try again\n");
goto try_again_one;
}
result = evs_dispatch (handle, CS_DISPATCH_ALL);
Index: include/corosync/coroipcc.h
===================================================================
--- include/corosync/coroipcc.h (revision 2078)
+++ include/corosync/coroipcc.h (working copy)
@@ -67,32 +67,35 @@
};
-cs_error_t
+extern cs_error_t
coroipcc_service_connect (
const char *socket_name,
enum service_types service,
void **ipc_context);
-cs_error_t
+extern cs_error_t
coroipcc_service_disconnect (
void *ipc_context);
-int
+extern int
coroipcc_fd_get (
void *ipc_context);
-int
-coroipcc_dispatch_recv (
+extern int
+coroipcc_dispatch_get (
void *ipc_context,
- void *buf,
- size_t buflen,
+ void **buf,
int timeout);
-int
+extern int
+coroipcc_dispatch_put (
+ void *ipc_context);
+
+extern int
coroipcc_dispatch_flow_control_get (
void *ipc_context);
-cs_error_t
+extern cs_error_t
coroipcc_msg_send_reply_receive (
void *ipc_context,
const struct iovec *iov,
@@ -100,35 +103,33 @@
void *res_msg,
size_t res_len);
-cs_error_t
+extern cs_error_t
coroipcc_msg_send_reply_receive_in_buf (
void *ipc_context,
const struct iovec *iov,
unsigned int iov_len,
void **res_msg);
-cs_error_t
+extern cs_error_t
saHandleCreate (
struct saHandleDatabase *handleDatabase,
int instanceSize,
uint64_t *handleOut);
-cs_error_t
+extern cs_error_t
saHandleDestroy (
struct saHandleDatabase *handleDatabase,
uint64_t handle);
-cs_error_t
+extern cs_error_t
saHandleInstanceGet (
struct saHandleDatabase *handleDatabase,
uint64_t handle,
void **instance);
-cs_error_t
+extern cs_error_t
saHandleInstancePut (
struct saHandleDatabase *handleDatabase,
uint64_t handle);
-#define offset_of(type,member) (int)(&(((type *)0)->member))
-
#endif /* COROIPC_H_DEFINED */
Index: include/corosync/ipc_gen.h
===================================================================
--- include/corosync/ipc_gen.h (revision 2078)
+++ include/corosync/ipc_gen.h (working copy)
@@ -66,12 +66,11 @@
#define REQ_SIZE 1000000
#define RES_SIZE 1000000
-#define DISPATCH_SIZE 1000000
+#define DISPATCH_SIZE 8192*128
struct shared_memory {
unsigned char req_buffer[REQ_SIZE];
unsigned char res_buffer[RES_SIZE];
- unsigned char dispatch_buffer[DISPATCH_SIZE];
unsigned int read;
unsigned int write;
};
@@ -89,6 +88,7 @@
int service __attribute__((aligned(8)));
unsigned long long shmkey __attribute__((aligned(8)));
unsigned long long semkey __attribute__((aligned(8)));
+ char dispatch_file[64]__attribute__((aligned(8)));
} mar_req_setup_t __attribute__((aligned(8)));
typedef struct {
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c (revision 2078)
+++ exec/coroipcs.c (working copy)
@@ -124,6 +124,7 @@
unsigned int pending_semops;
pthread_mutex_t mutex;
struct shared_memory *mem;
+ char *dispatch_buffer;
struct list_head outq_head;
void *private_data;
struct list_head list;
@@ -143,8 +144,6 @@
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked);
-static int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len);
-
static int ipc_thread_active (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
@@ -241,6 +240,7 @@
api->free (conn_info->private_data);
}
close (conn_info->fd);
+ munmap (conn_info->dispatch_buffer, (DISPATCH_SIZE));
api->free (conn_info);
api->serialize_unlock ();
return (-1);
@@ -651,25 +651,14 @@
return (bytes_left);
}
-static int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len)
+static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
{
- char *dest_char = (char *)conn_info->mem->dispatch_buffer;
- char *src_char = msg;
- unsigned int first_write;
- unsigned int second_write;
+ unsigned int write_idx;
- first_write = len;
- second_write = 0;
- if (len + conn_info->mem->write >= DISPATCH_SIZE) {
- first_write = DISPATCH_SIZE - conn_info->mem->write;
- second_write = len - first_write;
- }
- memcpy (&dest_char[conn_info->mem->write], src_char, first_write);
- if (second_write) {
- memcpy (dest_char, &src_char[first_write], second_write);
- }
- conn_info->mem->write = (conn_info->mem->write + len) % DISPATCH_SIZE;
- return (0);
+ write_idx = conn_info->mem->write;
+
+ memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
+ conn_info->mem->write = (write_idx + len) % (DISPATCH_SIZE);
}
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
@@ -934,6 +923,46 @@
return (0);
}
+static int
+coroipcs_memory_map (char *path, void **buf, size_t bytes)
+{
+ int fd;
+ void *addr_orig;
+ void *addr;
+ int res;
+
+ fd = open (path, O_RDWR, 0600);
+
+ unlink (path);
+
+ res = ftruncate (fd, bytes);
+
+ addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
+ MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+
+ if (addr_orig == MAP_FAILED) {
+ return (-1);
+ }
+
+ addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+ MAP_FIXED | MAP_SHARED, fd, 0);
+
+ if (addr != addr_orig) {
+ return (-1);
+ }
+
+ addr = mmap (((char *)addr_orig) + bytes,
+ bytes, PROT_READ | PROT_WRITE,
+ MAP_FIXED | MAP_SHARED, fd, 0);
+
+ res = close (fd);
+ if (res) {
+ return (-1);
+ }
+ *buf = addr_orig;
+ return (0);
+}
+
int coroipcs_handler_dispatch (
int fd,
int revent,
@@ -987,6 +1016,11 @@
conn_info->shmkey = req_setup->shmkey;
conn_info->semkey = req_setup->semkey;
+ res = coroipcs_memory_map (
+ req_setup->dispatch_file,
+ (void *)&conn_info->dispatch_buffer,
+ DISPATCH_SIZE);
+
conn_info->service = req_setup->service;
conn_info->refcount = 0;
conn_info->notify_flow_control_enabled = 0;
Index: lib/cfg.c
===================================================================
--- lib/cfg.c (revision 2078)
+++ lib/cfg.c (working copy)
@@ -56,11 +56,6 @@
#include <corosync/ipc_cfg.h>
#include <corosync/coroipcc.h>
-struct cfg_res_overlay {
- mar_res_header_t header;
- char data[4096];
-};
-
/*
* Data structure for instance data
*/
@@ -170,15 +165,8 @@
int dispatch_avail;
struct cfg_instance *cfg_instance;
struct res_lib_cfg_testshutdown *res_lib_cfg_testshutdown;
-#ifdef COMPILE_OUT
- struct res_lib_corosync_healthcheckcallback *res_lib_corosync_healthcheckcallback;
- struct res_lib_corosync_readinessstatesetcallback *res_lib_corosync_readinessstatesetcallback;
- struct res_lib_corosync_csisetcallback *res_lib_corosync_csisetcallback;
- struct res_lib_corosync_csiremovecallback *res_lib_corosync_csiremovecallback;
- struct res_lib_cfg_statetrackcallback *res_lib_cfg_statetrackcallback;
-#endif
corosync_cfg_callbacks_t callbacks;
- struct cfg_res_overlay dispatch_data;
+ mar_res_header_t *dispatch_data;
error = saHandleInstanceGet (&cfg_hdb, cfg_handle,
(void *)&cfg_instance);
@@ -194,18 +182,20 @@
}
do {
- dispatch_avail = coroipcc_dispatch_recv (cfg_instance->ipc_ctx,
- (void *)&dispatch_data,
- sizeof (dispatch_data),
- timeout);
+ pthread_mutex_lock (&cfg_instance->dispatch_mutex);
+ dispatch_avail = coroipcc_dispatch_get (
+ cfg_instance->ipc_ctx,
+ (void **)&dispatch_data,
+ timeout);
+
/*
* Handle has been finalized in another thread
*/
if (cfg_instance->finalize == 1) {
error = CS_OK;
pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
- goto error_unlock;
+ goto error_put;
}
if (dispatch_avail == 0 && dispatch_flags == CS_DISPATCH_ALL) {
@@ -228,18 +218,20 @@
/*
* Dispatch incoming response
*/
- switch (dispatch_data.header.id) {
+ switch (dispatch_data->id) {
case MESSAGE_RES_CFG_TESTSHUTDOWN:
if (callbacks.corosync_cfg_shutdown_callback) {
- res_lib_cfg_testshutdown = (struct res_lib_cfg_testshutdown *)&dispatch_data;
+ res_lib_cfg_testshutdown = (struct res_lib_cfg_testshutdown *)dispatch_data;
callbacks.corosync_cfg_shutdown_callback(cfg_handle, res_lib_cfg_testshutdown->flags);
}
break;
default:
+ coroipcc_dispatch_put (cfg_instance->ipc_ctx);
error = CS_ERR_LIBRARY;
goto error_nounlock;
break;
}
+ coroipcc_dispatch_put (cfg_instance->ipc_ctx);
/*
* Determine if more messages should be processed
@@ -255,7 +247,7 @@
}
} while (cont);
-error_unlock:
+error_put:
(void)saHandleInstancePut (&cfg_hdb, cfg_handle);
error_nounlock:
return (error);
Index: lib/coroipcc.c
===================================================================
--- lib/coroipcc.c (revision 2078)
+++ lib/coroipcc.c (working copy)
@@ -56,6 +56,7 @@
#include <assert.h>
#include <sys/shm.h>
#include <sys/sem.h>
+#include <sys/mman.h>
#include <corosync/corotypes.h>
#include <corosync/ipc_gen.h>
@@ -80,6 +81,7 @@
int semid;
int flow_control_state;
struct shared_memory *shared_memory;
+ void *dispatch_buffer;
uid_t euid;
};
@@ -275,6 +277,61 @@
};
#endif
+static int
+coroipcc_memory_map (char *path, const char *file, void **buf, size_t bytes)
+{
+ int fd;
+ void *addr_orig;
+ void *addr;
+ int res;
+
+ sprintf (path, "/dev/shm/%s", file);
+
+ fd = mkstemp (path);
+ if (fd == -1) {
+ sprintf (path, "/var/run/%s", file);
+ fd = mkstemp (path);
+ if (fd == -1) {
+ return (-1);
+ }
+ }
+
+ res = ftruncate (fd, bytes);
+
+ addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
+ MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+
+ if (addr_orig == MAP_FAILED) {
+ return (-1);
+ }
+
+ addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+ MAP_FIXED | MAP_SHARED, fd, 0);
+
+ if (addr != addr_orig) {
+ return (-1);
+ }
+
+ addr = mmap (((char *)addr_orig) + bytes,
+ bytes, PROT_READ | PROT_WRITE,
+ MAP_FIXED | MAP_SHARED, fd, 0);
+
+ res = close (fd);
+ if (res) {
+ return (-1);
+ }
+ *buf = addr_orig;
+ return (0);
+}
+
+static void
+coroipcc_memory_unmap (void *addr, size_t bytes)
+{
+ int res;
+
+ res = munmap (addr, bytes);
+}
+
cs_error_t
coroipcc_service_connect (
const char *socket_name,
@@ -291,6 +348,7 @@
mar_req_setup_t req_setup;
mar_res_setup_t res_setup;
union semun semun;
+ char dispatch_map_path[128];
res_setup.error = CS_ERR_LIBRARY;
@@ -373,8 +431,13 @@
goto error_exit;
}
+ res = coroipcc_memory_map (dispatch_map_path,
+ "dispatch_bufer-XXXXXX",
+ &ipc_segment->dispatch_buffer, DISPATCH_SIZE);
+ strcpy (req_setup.dispatch_file, dispatch_map_path);
req_setup.shmkey = shmkey;
req_setup.semkey = semkey;
+
req_setup.service = service;
error = coroipcc_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
@@ -418,6 +481,7 @@
shutdown (ipc_segment->fd, SHUT_RDWR);
close (ipc_segment->fd);
shmdt (ipc_segment->shared_memory);
+ coroipcc_memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE));
free (ipc_segment);
return (CS_OK);
}
@@ -431,7 +495,6 @@
return (ipc_segment->flow_control_state);
}
-
int
coroipcc_fd_get (void *ipc_ctx)
{
@@ -440,43 +503,16 @@
return (ipc_segment->fd);
}
-static void memcpy_swrap (void *dest, size_t dest_len,
- void *src, int len, unsigned int *n_read)
-{
- char *dest_chr = (char *)dest;
- char *src_chr = (char *)src;
-
- unsigned int first_read;
- unsigned int second_read;
-
- first_read = len;
- second_read = 0;
-
- if (len + *n_read >= DISPATCH_SIZE) {
- first_read = DISPATCH_SIZE - *n_read;
- second_read = (len + *n_read) % DISPATCH_SIZE;
- }
- memcpy (dest_chr, &src_chr[*n_read], first_read);
- if (second_read) {
- memcpy (&dest_chr[first_read], src_chr,
- second_read);
- }
- *n_read = (*n_read + len) % (DISPATCH_SIZE);
-}
-int original_flow = -1;
-
int
-coroipcc_dispatch_recv (void *ipc_ctx, void *data, size_t buflen, int timeout)
+coroipcc_dispatch_get (void *ipc_ctx, void **data, int timeout)
{
struct pollfd ufds;
- struct sembuf sop;
int poll_events;
- mar_res_header_t *header;
char buf;
struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
int res;
- unsigned int my_read;
char buf_two = 1;
+ char *data_addr;
ufds.fd = ipc_segment->fd;
ufds.events = POLLIN;
@@ -530,10 +566,27 @@
return (0);
}
+ data_addr = ipc_segment->dispatch_buffer;
+
+ data_addr = &data_addr[ipc_segment->shared_memory->read];
+
+ *data = (void *)data_addr;
+ return (1);
+}
+
+int
+coroipcc_dispatch_put (void *ipc_ctx)
+{
+ struct sembuf sop;
+ mar_res_header_t *header;
+ struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+ int res;
+ char *addr;
+ unsigned int read_idx;
+
sop.sem_num = 2;
sop.sem_op = -1;
sop.sem_flg = 0;
-
retry_semop:
res = semop (ipc_segment->semid, &sop, 1);
if (res == -1 && errno == EINTR) {
@@ -547,33 +600,13 @@
return (-1);
}
- if (buflen < DISPATCH_SIZE) {
- return -1;
- }
+ addr = ipc_segment->dispatch_buffer;
- if (ipc_segment->shared_memory->read + sizeof (mar_res_header_t) >= DISPATCH_SIZE) {
- my_read = ipc_segment->shared_memory->read;
- memcpy_swrap (data, DISPATCH_SIZE,
- ipc_segment->shared_memory->dispatch_buffer,
- sizeof (mar_res_header_t),
- &ipc_segment->shared_memory->read);
- header = (mar_res_header_t *)data;
- memcpy_swrap (
- (void *)((char *)data + sizeof (mar_res_header_t)),
- DISPATCH_SIZE,
- ipc_segment->shared_memory->dispatch_buffer,
- header->size - sizeof (mar_res_header_t),
- &ipc_segment->shared_memory->read);
- } else {
- header = (mar_res_header_t *)&ipc_segment->shared_memory->dispatch_buffer[ipc_segment->shared_memory->read];
- memcpy_swrap (
- data, DISPATCH_SIZE,
- ipc_segment->shared_memory->dispatch_buffer,
- header->size,
- &ipc_segment->shared_memory->read);
- }
-
- return (1);
+ read_idx = ipc_segment->shared_memory->read;
+ header = (mar_res_header_t *) &addr[read_idx];
+ ipc_segment->shared_memory->read =
+ (read_idx + header->size) % (DISPATCH_SIZE);
+ return (0);
}
static cs_error_t
Index: lib/cpg.c
===================================================================
--- lib/cpg.c (revision 2078)
+++ lib/cpg.c (working copy)
@@ -222,11 +222,6 @@
return (CS_OK);
}
-struct res_overlay {
- mar_res_header_t header __attribute__((aligned(8)));
- char data[512000];
-};
-
cs_error_t cpg_dispatch (
cpg_handle_t handle,
cs_dispatch_flags_t dispatch_types)
@@ -239,7 +234,7 @@
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
cpg_callbacks_t callbacks;
- struct res_overlay dispatch_data;
+ mar_res_header_t *dispatch_data;
int ignore_dispatch = 0;
struct cpg_address member_list[CPG_MEMBERS_MAX];
struct cpg_address left_list[CPG_MEMBERS_MAX];
@@ -265,17 +260,13 @@
do {
pthread_mutex_lock (&cpg_inst->dispatch_mutex);
- dispatch_avail = coroipcc_dispatch_recv (cpg_inst->ipc_ctx,
- (void *)&dispatch_data,
- sizeof (dispatch_data),
- timeout);
+ dispatch_avail = coroipcc_dispatch_get (
+ cpg_inst->ipc_ctx,
+ (void **)&dispatch_data,
+ timeout);
pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
- if (error != CS_OK) {
- goto error_put;
- }
-
if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) {
pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
break; /* exit do while cont is 1 loop */
@@ -302,9 +293,9 @@
/*
* Dispatch incoming message
*/
- switch (dispatch_data.header.id) {
+ switch (dispatch_data->id) {
case MESSAGE_RES_CPG_DELIVER_CALLBACK:
- res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
+ res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
marshall_from_mar_cpg_name_t (
&group_name,
@@ -319,7 +310,7 @@
break;
case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
- res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)&dispatch_data;
+ res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
marshall_from_mar_cpg_address_t (&member_list[i],
@@ -353,10 +344,12 @@
break;
default:
+ coroipcc_dispatch_put (cpg_inst->ipc_ctx);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
+ coroipcc_dispatch_put (cpg_inst->ipc_ctx);
/*
* Determine if more messages should be processed
Index: lib/votequorum.c
===================================================================
--- lib/votequorum.c (revision 2078)
+++ lib/votequorum.c (working copy)
@@ -732,12 +732,6 @@
return (CS_OK);
}
-
-struct res_overlay {
- mar_res_header_t header __attribute__((aligned(8)));
- char data[512000];
-};
-
cs_error_t votequorum_dispatch (
votequorum_handle_t handle,
cs_dispatch_flags_t dispatch_types)
@@ -748,7 +742,7 @@
int dispatch_avail;
struct votequorum_inst *votequorum_inst;
votequorum_callbacks_t callbacks;
- struct res_overlay dispatch_data;
+ mar_res_header_t *dispatch_data;
struct res_lib_votequorum_notification *res_lib_votequorum_notification;
struct res_lib_votequorum_expectedvotes_notification *res_lib_votequorum_expectedvotes_notification;
@@ -776,10 +770,10 @@
do {
pthread_mutex_lock (&votequorum_inst->dispatch_mutex);
- dispatch_avail = coroipcc_dispatch_recv (votequorum_inst->ipc_ctx,
- (void *)&dispatch_data,
- sizeof (dispatch_data),
- timeout);
+ dispatch_avail = coroipcc_dispatch_get (
+ votequorum_inst->ipc_ctx,
+ (void **)&dispatch_data,
+ timeout);
/*
* Handle has been finalized in another thread
@@ -809,13 +803,13 @@
/*
* Dispatch incoming message
*/
- switch (dispatch_data.header.id) {
+ switch (dispatch_data->id) {
case MESSAGE_RES_VOTEQUORUM_NOTIFICATION:
if (callbacks.votequorum_notify_fn == NULL) {
continue;
}
- res_lib_votequorum_notification = (struct res_lib_votequorum_notification *)&dispatch_data;
+ res_lib_votequorum_notification = (struct res_lib_votequorum_notification *)dispatch_data;
callbacks.votequorum_notify_fn ( handle,
res_lib_votequorum_notification->context,
@@ -829,7 +823,7 @@
if (callbacks.votequorum_expectedvotes_notify_fn == NULL) {
continue;
}
- res_lib_votequorum_expectedvotes_notification = (struct res_lib_votequorum_expectedvotes_notification *)&dispatch_data;
+ res_lib_votequorum_expectedvotes_notification = (struct res_lib_votequorum_expectedvotes_notification *)dispatch_data;
callbacks.votequorum_expectedvotes_notify_fn ( handle,
res_lib_votequorum_expectedvotes_notification->context,
@@ -837,10 +831,12 @@
break;
default:
+ coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
+ coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
/*
* Determine if more messages should be processed
Index: lib/quorum.c
===================================================================
--- lib/quorum.c (revision 2078)
+++ lib/quorum.c (working copy)
@@ -349,11 +349,6 @@
return (error);
}
-struct quorum_res_overlay {
- mar_res_header_t header __attribute__((aligned(8)));
- char data[512000];
-};
-
cs_error_t quorum_dispatch (
quorum_handle_t handle,
cs_dispatch_flags_t dispatch_types)
@@ -364,7 +359,7 @@
int dispatch_avail;
struct quorum_inst *quorum_inst;
quorum_callbacks_t callbacks;
- struct quorum_res_overlay dispatch_data;
+ mar_res_header_t *dispatch_data;
struct res_lib_quorum_notification *res_lib_quorum_notification;
if (dispatch_types != CS_DISPATCH_ONE &&
@@ -391,10 +386,10 @@
do {
pthread_mutex_lock (&quorum_inst->dispatch_mutex);
- dispatch_avail = coroipcc_dispatch_recv (quorum_inst->ipc_ctx,
- (void *)&dispatch_data,
- sizeof (dispatch_data),
- timeout);
+ dispatch_avail = coroipcc_dispatch_get (
+ quorum_inst->ipc_ctx,
+ (void **)&dispatch_data,
+ timeout);
/*
* Handle has been finalized in another thread
@@ -424,13 +419,13 @@
/*
* Dispatch incoming message
*/
- switch (dispatch_data.header.id) {
+ switch (dispatch_data->id) {
case MESSAGE_RES_QUORUM_NOTIFICATION:
if (callbacks.quorum_notify_fn == NULL) {
continue;
}
- res_lib_quorum_notification = (struct res_lib_quorum_notification *)&dispatch_data;
+ res_lib_quorum_notification = (struct res_lib_quorum_notification *)dispatch_data;
callbacks.quorum_notify_fn ( handle,
res_lib_quorum_notification->quorate,
@@ -440,10 +435,12 @@
break;
default:
+ coroipcc_dispatch_put (quorum_inst->ipc_ctx);
error = CS_ERR_LIBRARY;
goto error_put;
break;
}
+ coroipcc_dispatch_put (quorum_inst->ipc_ctx);
/*
* Determine if more messages should be processed
Index: lib/confdb.c
===================================================================
--- lib/confdb.c (revision 2078)
+++ lib/confdb.c (working copy)
@@ -288,11 +288,6 @@
return (CS_OK);
}
-struct confdb_res_overlay {
- mar_res_header_t header __attribute__((aligned(8)));
- char data[512000];
-};
-
cs_error_t confdb_dispatch (
confdb_handle_t handle,
cs_dispatch_flags_t dispatch_types)
@@ -306,7 +301,7 @@
struct res_lib_confdb_key_change_callback *res_key_changed_pt;
struct res_lib_confdb_object_create_callback *res_object_created_pt;
struct res_lib_confdb_object_destroy_callback *res_object_destroyed_pt;
- struct confdb_res_overlay dispatch_data;
+ mar_res_header_t *dispatch_data;
error = saHandleInstanceGet (&confdb_handle_t_db, handle, (void *)&confdb_inst);
if (error != CS_OK) {
@@ -329,12 +324,11 @@
do {
pthread_mutex_lock (&confdb_inst->dispatch_mutex);
- dispatch_avail = coroipcc_dispatch_recv (confdb_inst->ipc_ctx,
- (void *)&dispatch_data,
- sizeof (dispatch_data),
- timeout);
+ dispatch_avail = coroipcc_dispatch_get (
+ confdb_inst->ipc_ctx,
+ (void **)&dispatch_data,
+ timeout);
-
/*
* Handle has been finalized in another thread
*/
@@ -365,9 +359,9 @@
/*
* Dispatch incoming message
*/
- switch (dispatch_data.header.id) {
+ switch (dispatch_data->id) {
case MESSAGE_RES_CONFDB_KEY_CHANGE_CALLBACK:
- res_key_changed_pt = (struct res_lib_confdb_key_change_callback *)&dispatch_data;
+ res_key_changed_pt = (struct res_lib_confdb_key_change_callback *)dispatch_data;
callbacks.confdb_key_change_notify_fn(handle,
res_key_changed_pt->change_type,
@@ -382,7 +376,7 @@
break;
case MESSAGE_RES_CONFDB_OBJECT_CREATE_CALLBACK:
- res_object_created_pt = (struct res_lib_confdb_object_create_callback *)&dispatch_data;
+ res_object_created_pt = (struct res_lib_confdb_object_create_callback *)dispatch_data;
callbacks.confdb_object_create_change_notify_fn(handle,
res_object_created_pt->object_handle,
@@ -392,7 +386,7 @@
break;
case MESSAGE_RES_CONFDB_OBJECT_DESTROY_CALLBACK:
- res_object_destroyed_pt = (struct res_lib_confdb_object_destroy_callback *)&dispatch_data;
+ res_object_destroyed_pt = (struct res_lib_confdb_object_destroy_callback *)dispatch_data;
callbacks.confdb_object_delete_change_notify_fn(handle,
res_object_destroyed_pt->parent_object_handle,
@@ -401,10 +395,12 @@
break;
default:
+ coroipcc_dispatch_put (confdb_inst->ipc_ctx);
error = CS_ERR_LIBRARY;
goto error_noput;
break;
}
+ coroipcc_dispatch_put (confdb_inst->ipc_ctx);
/*
* Determine if more messages should be processed
Index: lib/evs.c
===================================================================
--- lib/evs.c (revision 2078)
+++ lib/evs.c (working copy)
@@ -65,11 +65,6 @@
pthread_mutex_t dispatch_mutex;
};
-struct res_overlay {
- mar_res_header_t header __attribute__((aligned(8)));
- char data[512000];
-};
-
static void evs_instance_destructor (void *instance);
static struct saHandleDatabase evs_handle_t_db = {
@@ -208,7 +203,7 @@
struct res_evs_confchg_callback *res_evs_confchg_callback;
struct res_evs_deliver_callback *res_evs_deliver_callback;
evs_callbacks_t callbacks;
- struct res_overlay dispatch_data;
+ mar_res_header_t *dispatch_data;
int ignore_dispatch = 0;
error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
@@ -225,35 +220,29 @@
}
do {
- dispatch_avail = coroipcc_dispatch_recv (evs_inst->ipc_ctx,
- (void *)&dispatch_data,
- sizeof (dispatch_data),
- timeout);
- if (dispatch_avail == -1) {
- error = CS_ERR_LIBRARY;
- goto error_nounlock;
- }
-
-
pthread_mutex_lock (&evs_inst->dispatch_mutex);
- /*
- * Handle has been finalized in another thread
- */
- if (evs_inst->finalize == 1) {
- error = EVS_OK;
- pthread_mutex_unlock (&evs_inst->dispatch_mutex);
- goto error_unlock;
- }
+ dispatch_avail = coroipcc_dispatch_get (
+ evs_inst->ipc_ctx,
+ (void **)&dispatch_data,
+ timeout);
+ pthread_mutex_unlock (&evs_inst->dispatch_mutex);
+
if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
- pthread_mutex_unlock (&evs_inst->dispatch_mutex);
break; /* exit do while cont is 1 loop */
} else
if (dispatch_avail == 0) {
- pthread_mutex_unlock (&evs_inst->dispatch_mutex);
continue; /* next dispatch event */
}
+ if (dispatch_avail == -1) {
+ if (evs_inst->finalize == 1) {
+ error = CS_OK;
+ } else {
+ error = CS_ERR_LIBRARY;
+ }
+ goto error_put;
+ }
/*
* Make copy of callbacks, message data, unlock instance, and call callback
@@ -262,13 +251,12 @@
*/
memcpy (&callbacks, &evs_inst->callbacks, sizeof (evs_callbacks_t));
- pthread_mutex_unlock (&evs_inst->dispatch_mutex);
/*
* Dispatch incoming message
*/
- switch (dispatch_data.header.id) {
+ switch (dispatch_data->id) {
case MESSAGE_RES_EVS_DELIVER_CALLBACK:
- res_evs_deliver_callback = (struct res_evs_deliver_callback *)&dispatch_data;
+ res_evs_deliver_callback = (struct res_evs_deliver_callback *)dispatch_data;
callbacks.evs_deliver_fn (
res_evs_deliver_callback->local_nodeid,
&res_evs_deliver_callback->msg,
@@ -276,7 +264,7 @@
break;
case MESSAGE_RES_EVS_CONFCHG_CALLBACK:
- res_evs_confchg_callback = (struct res_evs_confchg_callback *)&dispatch_data;
+ res_evs_confchg_callback = (struct res_evs_confchg_callback *)dispatch_data;
callbacks.evs_confchg_fn (
res_evs_confchg_callback->member_list,
res_evs_confchg_callback->member_list_entries,
@@ -287,10 +275,12 @@
break;
default:
+ coroipcc_dispatch_put (evs_inst->ipc_ctx);
error = CS_ERR_LIBRARY;
- goto error_nounlock;
+ goto error_put;
break;
}
+ coroipcc_dispatch_put (evs_inst->ipc_ctx);
/*
* Determine if more messages should be processed
@@ -313,9 +303,8 @@
}
} while (cont);
-error_unlock:
+error_put:
saHandleInstancePut (&evs_handle_t_db, handle);
-error_nounlock:
return (error);
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais