Signed-off-by: Angus Salkeld <[email protected]>
---
exec/ipc_glue.c | 146 +++++++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 132 insertions(+), 14 deletions(-)
diff --git a/exec/ipc_glue.c b/exec/ipc_glue.c
index f34046d..9d6e8c8 100644
--- a/exec/ipc_glue.c
+++ b/exec/ipc_glue.c
@@ -80,6 +80,12 @@ struct cs_ipcs_mapper {
char name[256];
};
+struct outq_item {
+ void *msg;
+ size_t mlen;
+ struct list_head list;
+};
+
static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT];
static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data,
qb_loop_job_dispatch_fn fn);
@@ -278,6 +284,10 @@ static char * pid_to_name (pid_t pid, char *out_name,
size_t name_len)
struct cs_ipcs_conn_context {
qb_handle_t stats_handle;
+ struct list_head outq_head;
+ int32_t queuing;
+ uint32_t queued;
+ uint32_t sent;
char data[1];
};
@@ -300,6 +310,12 @@ static void
cs_ipcs_connection_created(qb_ipcs_connection_t *c)
size += ais_service[service]->private_data_size;
context = calloc(1, size);
+
+ list_init(&context->outq_head);
+ context->queuing = QB_FALSE;
+ context->queued = 0;
+ context->sent = 0;
+
qb_ipcs_context_set(c, context);
ais_service[service]->lib_init_fn(c);
@@ -389,12 +405,25 @@ void *cs_ipcs_private_data_get(void *conn)
static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
{
- struct cs_ipcs_conn_context *cnx;
+ struct cs_ipcs_conn_context *context;
+ struct list_head *list, *list_next;
+ struct outq_item *outq_item;
+
log_printf(LOG_INFO, "%s() ", __func__);
- cnx = qb_ipcs_context_get(c);
- if (cnx) {
- free(cnx);
+ context = qb_ipcs_context_get(c);
+ if (context) {
+ for (list = context->outq_head.next;
+ list != &context->outq_head; list = list_next) {
+
+ list_next = list->next;
+ outq_item = list_entry (list, struct outq_item, list);
+
+ list_del (list);
+ free (outq_item->msg);
+ free (outq_item);
+ }
+ free(context);
}
}
@@ -444,24 +473,113 @@ int cs_ipcs_response_send(void *conn, const void *msg,
size_t mlen)
return rc;
}
-int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
+static void outq_flush (void *data)
{
- int32_t rc = qb_ipcs_event_send(conn, msg, mlen);
- if (rc >= 0) {
- return 0;
+ qb_ipcs_connection_t *conn = data;
+ struct list_head *list, *list_next;
+ struct outq_item *outq_item;
+ int32_t rc;
+ struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn);
+
+ for (list = context->outq_head.next;
+ list != &context->outq_head; list = list_next) {
+
+ list_next = list->next;
+ outq_item = list_entry (list, struct outq_item, list);
+
+ rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen);
+ if (rc != outq_item->mlen) {
+ break;
+ }
+ context->sent++;
+ context->queued--;
+
+ list_del (list);
+ free (outq_item->msg);
+ free (outq_item);
}
- return rc;
+ if (list_empty (&context->outq_head)) {
+ context->queuing = QB_FALSE;
+ log_printf(LOGSYS_LEVEL_INFO, "Q empty, queued:%d sent:%d.",
+ context->queued, context->sent);
+ context->queued = 0;
+ context->sent = 0;
+ return;
+ }
+ qb_loop_job_add(corosync_poll_handle_get(), QB_LOOP_HIGH, conn,
outq_flush);
+ if (rc < 0 && rc != -EAGAIN) {
+ log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d!", rc);
+ }
+}
+
+static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec
*iov, uint32_t iov_len)
+{
+ int32_t rc = 0;
+ int32_t i;
+ int32_t bytes_msg = 0;
+ struct outq_item *outq_item;
+ char *write_buf = 0;
+ struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn);
+
+ for (i = 0; i < iov_len; i++) {
+ bytes_msg += iov[i].iov_len;
+ }
+
+ if (!context->queuing) {
+ assert(list_empty (&context->outq_head));
+ rc = qb_ipcs_event_sendv(conn, iov, iov_len);
+ if (rc == bytes_msg) {
+ context->sent++;
+ return;
+ }
+ if (rc == -EAGAIN) {
+ context->queued = 0;
+ context->sent = 0;
+ context->queuing = QB_TRUE;
+ qb_loop_job_add(corosync_poll_handle_get(),
QB_LOOP_HIGH, conn, outq_flush);
+ } else {
+ log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d,
expected %d!", rc, bytes_msg);
+ return;
+ }
+ }
+ outq_item = malloc (sizeof (struct outq_item));
+ if (outq_item == NULL) {
+ qb_ipcs_disconnect(conn);
+ return;
+ }
+ outq_item->msg = malloc (bytes_msg);
+ if (outq_item->msg == NULL) {
+ free (outq_item);
+ qb_ipcs_disconnect(conn);
+ return;
+ }
+
+ write_buf = outq_item->msg;
+ for (i = 0; i < iov_len; i++) {
+ memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
+ write_buf += iov[i].iov_len;
+ }
+ outq_item->mlen = bytes_msg;
+ list_init (&outq_item->list);
+ list_add_tail (&outq_item->list, &context->outq_head);
+ context->queued++;
+}
+
+int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
+{
+ struct iovec iov;
+ iov.iov_base = (void *)msg;
+ iov.iov_len = mlen;
+ msg_send_or_queue (conn, &iov, 1);
+ return 0;
}
int cs_ipcs_dispatch_iov_send (void *conn,
const struct iovec *iov,
unsigned int iov_len)
{
- int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len);
- if (rc >= 0) {
- return 0;
- }
- return rc;
+ msg_send_or_queue(conn, iov, iov_len);
+ return 0;
}
static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
--
1.7.3.1
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais