(Work in Progress)

Use the totembuf functions to allocate buffers for sending iba packets and
maintain the free list. When a buffer is currently enqueued, an attempt to
enqueue it again will cause a copy. However when the buffer is not already
enqueued, it may now be sent without first copying it.

TODO:
 * Use the same principles for tokens, and clean up existing send_buf code
 * Unify the implementation of send and receive buffers
---
 exec/totemiba.c |   67 ++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/exec/totemiba.c b/exec/totemiba.c
index c8839a1..f210359 100644
--- a/exec/totemiba.c
+++ b/exec/totemiba.c
@@ -59,6 +59,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
+#include <stddef.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netdb.h>
@@ -73,6 +74,7 @@
 #include <corosync/totem/coropoll.h>
 #define LOGSYS_UTILS_ONLY 1
 #include <corosync/engine/logsys.h>
+#include "totembuf.h"
 #include "totemiba.h"
 #include "wthread.h"
 
@@ -229,18 +231,22 @@ struct recv_buf {
 };
 
 struct send_buf {
-       struct list_head list_free;
-       struct list_head list_all;
+       struct list_head list_free; // TODO remove
+       struct list_head list_all; // TODO remove
        struct ibv_mr *mr;
        char buffer[MAX_MTU_SIZE];
 };
 
+#define SEND_BUF_FROM_BUFFER(BUFFER) (struct send_buf *)(((char *)(BUFFER)) - 
offsetof(struct send_buf, buffer))
+
 static hdb_handle_t
 void2wrid (void *v) { union u u; u.v = v; return u.wr_id; }
 
 static void *
 wrid2void (uint64_t wr_id) { union u u; u.wr_id = wr_id; return u.v; }
 
+static struct totembuf_list *free_list = NULL;
+
 static void totemiba_instance_initialize (struct totemiba_instance *instance)
 {
        memset (instance, 0, sizeof (struct totemiba_instance));
@@ -252,29 +258,32 @@ static void totemiba_instance_initialize (struct 
totemiba_instance *instance)
 }
 
 static inline struct send_buf *mcast_send_buf_get (
-       struct totemiba_instance *instance)
+       struct totemiba_instance *instance,
+       const void *ms)
 {
-       struct send_buf *send_buf;
+       struct send_buf *send_buf = SEND_BUF_FROM_BUFFER(ms);
 
-       if (list_empty (&instance->mcast_send_buf_free) == 0) {
-               send_buf = list_entry (instance->mcast_send_buf_free.next, 
struct send_buf, list_free);
-               list_del (&send_buf->list_free);
-               return (send_buf);
+       if (send_buf->mr) {
+               /* Buffer is already enqueued. Make a copy. */
+               struct send_buf *new_buf = totembuf_alloc (free_list);
+               if (new_buf == NULL) {
+                       return (NULL);
+               }
+               memcpy (new_buf->buffer, send_buf->buffer,
+                       sizeof (new_buf->buffer));
+               send_buf = new_buf;
+       } else {
+               send_buf = totembuf_retain (send_buf);
        }
 
-       send_buf = malloc (sizeof (struct send_buf));
-       if (send_buf == NULL) {
-               return (NULL);
-       }
        send_buf->mr = ibv_reg_mr (instance->mcast_pd,
                send_buf->buffer,
                2048, IBV_ACCESS_LOCAL_WRITE);
        if (send_buf->mr == NULL) {
                log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory 
range\n");
+               totembuf_release (send_buf);
                return (NULL);
        }
-       list_init (&send_buf->list_all);
-       list_add_tail (&send_buf->list_all, &instance->mcast_send_buf_head);
                
        return (send_buf);
 }
@@ -283,8 +292,9 @@ static inline void mcast_send_buf_put (
        struct totemiba_instance *instance,
        struct send_buf *send_buf)
 {
-       list_init (&send_buf->list_free);
-       list_add_tail (&send_buf->list_free, &instance->mcast_send_buf_free);
+       ibv_dereg_mr (send_buf->mr);
+       send_buf->mr = NULL;
+       totembuf_release (send_buf);
 }
 
 static inline struct send_buf *token_send_buf_get (
@@ -1283,6 +1293,10 @@ int totemiba_initialize (
        struct totemiba_instance *instance;
        int res = 0;
 
+       if (!free_list) {
+               free_list = totembuf_list_init (sizeof (struct send_buf));
+       }
+
        instance = malloc (sizeof (struct totemiba_instance));
        if (instance == NULL) {
                return (-1);
@@ -1319,12 +1333,15 @@ int totemiba_initialize (
 
 void *totemiba_buffer_alloc (void)
 {
-       return malloc (MAX_MTU_SIZE);
+       struct send_buf *send_buf = totembuf_alloc (free_list);
+       send_buf->mr = NULL;
+       return (send_buf->buffer);
 }
 
 void totemiba_buffer_release (void *ptr)
 {
-       return free (ptr);
+       struct send_buf *send_buf = SEND_BUF_FROM_BUFFER(ptr);
+       totembuf_release (send_buf);
 }
 
 int totemiba_processor_count_set (
@@ -1397,16 +1414,13 @@ int totemiba_mcast_flush_send (
        int res = 0;
        struct ibv_send_wr send_wr, *failed_send_wr;
        struct ibv_sge sge;
-       void *msg;
        struct send_buf *send_buf;
 
-       send_buf = mcast_send_buf_get (instance);
+       send_buf = mcast_send_buf_get (instance, ms);
        if (send_buf == NULL) {
                return (-1);
        }
 
-       msg = send_buf->buffer;
-       memcpy (msg, ms, msg_len);
        send_wr.next = NULL;
        send_wr.sg_list = &sge;
        send_wr.num_sge = 1;
@@ -1420,7 +1434,7 @@ int totemiba_mcast_flush_send (
 
        sge.length = msg_len;
        sge.lkey = send_buf->mr->lkey;
-       sge.addr = (uintptr_t)msg;
+       sge.addr = (uintptr_t)send_buf->buffer;
 
        res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, 
&failed_send_wr);
        return (res);
@@ -1435,16 +1449,13 @@ int totemiba_mcast_noflush_send (
        int res = 0;
        struct ibv_send_wr send_wr, *failed_send_wr;
        struct ibv_sge sge;
-       void *msg;
        struct send_buf *send_buf;
 
-       send_buf = mcast_send_buf_get (instance);
+       send_buf = mcast_send_buf_get (instance, ms);
        if (send_buf == NULL) {
                return (-1);
        }
 
-       msg = send_buf->buffer;
-       memcpy (msg, ms, msg_len);
        send_wr.next = NULL;
        send_wr.sg_list = &sge;
        send_wr.num_sge = 1;
@@ -1458,7 +1469,7 @@ int totemiba_mcast_noflush_send (
 
        sge.length = msg_len;
        sge.lkey = send_buf->mr->lkey;
-       sge.addr = (uintptr_t)msg;
+       sge.addr = (uintptr_t)send_buf->buffer;
 
        res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, 
&failed_send_wr);
        return (res);

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to