This code asserts unless iba_buf_put's totembuf_dealloc is commented
out.  The assertion is:
#0  0x0000003676c330c5 in raise () from /lib64/libc.so.6
#1  0x0000003676c34a76 in abort () from /lib64/libc.so.6
#2  0x0000003676c2b905 in __assert_fail () from /lib64/libc.so.6
#3  0x00007ffff7dd550e in totembuf_release (ptr=<value optimized out>)
    at totembuf.c:193
#4  0x00007ffff7dcbc71 in totemsrp_buffer_release (instance=0x7ffff5db9010,
    msg=<value optimized out>, msg_len=<value optimized out>,
    endian_conversion_needed=<value optimized out>) at totemsrp.c:1383
#5  messages_free (instance=0x7ffff5db9010, msg=<value optimized out>,
    msg_len=<value optimized out>,
    endian_conversion_needed=<value optimized out>) at totemsrp.c:2336
#6  message_handler_orf_token (instance=0x7ffff5db9010,
    msg=<value optimized out>, msg_len=<value optimized out>,
    endian_conversion_needed=<value optimized out>) at totemsrp.c:3487
#7  0x00007ffff7dc7db3 in rrp_deliver_fn (context=0x63a290, msg=0x825134,
    msg_len=110) at totemrrp.c:1454
#8  0x00007ffff7ddb863 in iba_deliver_fn (poll_handle=<value optimized
out>,
    events=<value optimized out>, suck=<value optimized out>,
    context=0x682550) at totemiba.c:467
#9  recv_token_cq_recv_event_fn (poll_handle=<value optimized out>,
    events=<value optimized out>, suck=<value optimized out>,
    context=0x682550) at totemiba.c:613
#10 0x00007ffff7dc1c02 in poll_run (handle=150346236434579456)
#11 0x0000000000407a2b in main (argc=<value optimized out>,
    argv=<value optimized out>, envp=0x7fffffffe548) at main.c:1874

I wasn't able to understand why this happens, only that it appears
dealloc will be called twice during this condition resulting in a double
free.

On 04/16/2011 02:11 PM, Zane Bitter wrote:
> (Work in Progress)
> Note that this patch is completely untested to date.
> ---
>  exec/totemiba.c |  388 
> ++++++++++++++++++++++++++++++-------------------------
>  1 files changed, 211 insertions(+), 177 deletions(-)
> 
> diff --git a/exec/totemiba.c b/exec/totemiba.c
> index 5aa2787..8ef52c9 100644
> --- a/exec/totemiba.c
> +++ b/exec/totemiba.c
> @@ -59,6 +59,7 @@
>  #include <stdio.h>
>  #include <string.h>
>  #include <stdlib.h>
> +#include <stddef.h>
>  #include <sys/types.h>
>  #include <sys/socket.h>
>  #include <netdb.h>
> @@ -73,6 +74,7 @@
>  #include <corosync/totem/coropoll.h>
>  #define LOGSYS_UTILS_ONLY 1
>  #include <corosync/engine/logsys.h>
> +#include "totembuf.h"
>  #include "totemiba.h"
>  #include "wthread.h"
>  
> @@ -82,6 +84,8 @@
>  
>  #define MAX_MTU_SIZE 4096
>  
> +#define TOTEMIBA_BUFFER_MAGIC 0xbeeffeedu
> +
>  struct totemiba_instance {
>       struct sockaddr bind_addr;
>  
> @@ -195,12 +199,6 @@ struct totemiba_instance {
>  
>       int totemiba_subsys_id;
>  
> -     struct list_head mcast_send_buf_free;
> -
> -     struct list_head token_send_buf_free;
> -
> -     struct list_head mcast_send_buf_head;
> -
>       struct list_head token_send_buf_head;
>  
>       struct list_head recv_token_recv_buf_head;
> @@ -218,22 +216,22 @@ do {                                                    
>                 \
>                                   LOGSYS_RECID_LOG),                  \
>                  __FUNCTION__, __FILE__, __LINE__,                    \
>               (const char *)format, ##args);                          \
> -} while (0);
> +} while (0)
>  
> -struct recv_buf {
> -     struct list_head list_all;
> -     struct ibv_recv_wr recv_wr;
> -     struct ibv_sge sge;
> +struct iba_buf {
> +     uint32_t magic;
> +     struct list_head list_active;
>       struct ibv_mr *mr;
> -     char buffer[MAX_MTU_SIZE];
> +     union {
> +             char buffer[MAX_MTU_SIZE];
> +             struct {
> +                     struct ibv_grh header;
> +                     char packet[0];
> +             } p;
> +     } b;
>  };
>  
> -struct send_buf {
> -     struct list_head list_free;
> -     struct list_head list_all;
> -     struct ibv_mr *mr;
> -     char buffer[MAX_MTU_SIZE];
> -};
> +#define IBA_BUF_FROM_PACKET(PACKET) (struct iba_buf *)(((char *)(PACKET)) - 
> offsetof(struct iba_buf, b.p.packet))
>  
>  static hdb_handle_t
>  void2wrid (void *v) { union u u; u.v = v; return u.wr_id; }
> @@ -241,228 +239,232 @@ void2wrid (void *v) { union u u; u.v = v; return 
> u.wr_id; }
>  static void *
>  wrid2void (uint64_t wr_id) { union u u; u.wr_id = wr_id; return u.v; }
>  
> +static struct totembuf_list *free_list = NULL;
> +
>  static void totemiba_instance_initialize (struct totemiba_instance *instance)
>  {
>       memset (instance, 0, sizeof (struct totemiba_instance));
> -     list_init (&instance->mcast_send_buf_free);
> -     list_init (&instance->token_send_buf_free);
> -     list_init (&instance->mcast_send_buf_head);
>       list_init (&instance->token_send_buf_head);
>       list_init (&instance->recv_token_recv_buf_head);
>  }
>  
> -static inline struct send_buf *mcast_send_buf_get (
> -     struct totemiba_instance *instance)
> +static struct iba_buf *iba_buf_alloc (void)
>  {
> -     struct send_buf *send_buf;
> -
> -     if (list_empty (&instance->mcast_send_buf_free) == 0) {
> -             send_buf = list_entry (instance->mcast_send_buf_free.next, 
> struct send_buf, list_free);
> -             list_del (&send_buf->list_free);
> -             return (send_buf);
> -     }
> +     struct iba_buf *new_buf = totembuf_alloc (free_list);
>  
> -     send_buf = malloc (sizeof (struct send_buf));
> -     if (send_buf == NULL) {
> +     if (new_buf == NULL) {
>               return (NULL);
>       }
> -     send_buf->mr = ibv_reg_mr (instance->mcast_pd,
> -             send_buf->buffer,
> +
> +     new_buf->magic = TOTEMIBA_BUFFER_MAGIC;
> +     list_init (&new_buf->list_active);
> +     new_buf->mr = NULL;
> +
> +     return (new_buf);
> +}
> +
> +static int iba_buf_register (
> +     struct totemiba_instance *instance,
> +     struct iba_buf *buf,
> +     struct ibv_pd *pd)
> +{
> +     buf->mr = ibv_reg_mr (pd, buf->b.buffer,
>               2048, IBV_ACCESS_LOCAL_WRITE);
> -     if (send_buf->mr == NULL) {
> +     if (buf->mr == NULL) {
>               log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory 
> range\n");
> -             return (NULL);
> +             if (!totembuf_is_retained (buf)) {
> +                     totembuf_dealloc (buf);
> +             }
> +             return (-1);
>       }
> -     list_init (&send_buf->list_all);
> -     list_add_tail (&send_buf->list_all, &instance->mcast_send_buf_head);
> -             
> -     return (send_buf);
> +     return (0);
>  }
>  
> -static inline void mcast_send_buf_put (
> +static inline struct iba_buf *send_buf_new (
>       struct totemiba_instance *instance,
> -     struct send_buf *send_buf)
> +     const void *data,
> +     size_t length)
>  {
> -     list_init (&send_buf->list_free);
> -     list_add_tail (&send_buf->list_free, &instance->mcast_send_buf_free);
> +     struct iba_buf *new_buf = iba_buf_alloc();
> +
> +     if (new_buf == NULL) {
> +             return (NULL);
> +     }
> +
> +     assert (length <= (sizeof (new_buf->b.buffer) - sizeof 
> (new_buf->b.p.header)));
> +     memcpy (new_buf->b.p.packet, data, length);
> +
> +     return (new_buf);
>  }
>  
> -static inline struct send_buf *token_send_buf_get (
> -     struct totemiba_instance *instance)
> +static inline struct iba_buf *mcast_send_buf_get (
> +     struct totemiba_instance *instance,
> +     const void *ms,
> +     size_t length)
>  {
> -     struct send_buf *send_buf;
> +     struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ms);
> +     assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>  
> -     if (list_empty (&instance->token_send_buf_free) == 0) {
> -             send_buf = list_entry (instance->token_send_buf_free.next, 
> struct send_buf, list_free);
> -             list_del (&send_buf->list_free);
> -             return (send_buf);
> +     if (send_buf->mr) {
> +             /* Buffer is already enqueued. Make a copy. */
> +             send_buf = send_buf_new (instance, send_buf->b.p.packet, 
> length);
>       }
>  
> -     send_buf = malloc (sizeof (struct send_buf));
> -     if (send_buf == NULL) {
> +     if (iba_buf_register (instance, send_buf, instance->mcast_pd)) {
>               return (NULL);
>       }
> -     send_buf->mr = ibv_reg_mr (instance->send_token_pd,
> -             send_buf->buffer,
> -             2048, IBV_ACCESS_LOCAL_WRITE);
> -     if (send_buf->mr == NULL) {
> -             log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory 
> range\n");
> -             return (NULL);
> -     }
> -     list_init (&send_buf->list_all);
> -     list_add_tail (&send_buf->list_all, &instance->token_send_buf_head);
> -             
> +
>       return (send_buf);
>  }
>  
> -static inline void token_send_buf_destroy (struct totemiba_instance 
> *instance)
> +static void iba_buf_put (struct iba_buf *iba_buf)
>  {
> -     struct list_head *list;
> -     struct send_buf *send_buf;
> -
> -        for (list = instance->token_send_buf_head.next; list != 
> &instance->token_send_buf_head;) {
> -                send_buf = list_entry (list, struct send_buf, list_all);
> -             list = list->next;
> -             ibv_dereg_mr (send_buf->mr);
> -             free (send_buf);
> +     list_del (&iba_buf->list_active);
> +     list_init (&iba_buf->list_active);
> +     ibv_dereg_mr (iba_buf->mr);
> +     iba_buf->mr = NULL;
> +     if (!totembuf_is_retained (iba_buf)) {
> +             totembuf_dealloc (iba_buf);
>       }
> -
> -     list_init (&instance->token_send_buf_free);
> -     list_init (&instance->token_send_buf_head);
>  }
>  
> -static inline void token_send_buf_put (
> +static inline struct iba_buf *token_send_buf_get (
>       struct totemiba_instance *instance,
> -     struct send_buf *send_buf)
> +     const void *ms,
> +     size_t length)
>  {
> -     list_init (&send_buf->list_free);
> -     list_add_tail (&send_buf->list_free, &instance->token_send_buf_free);
> +     struct iba_buf *send_buf;
> +
> +     send_buf = send_buf_new (instance, ms, length);
> +
> +     if (iba_buf_register (instance, send_buf, instance->send_token_pd)) {
> +             return (NULL);
> +     }
> +
> +     list_add_tail (&send_buf->list_active, &instance->token_send_buf_head);
> +
> +     return (send_buf);
>  }
>  
> -static inline struct recv_buf *recv_token_recv_buf_create (
> +static inline struct iba_buf *recv_token_recv_buf_get (
>       struct totemiba_instance *instance)
>  {
> -     struct recv_buf *recv_buf;
> +     struct iba_buf *recv_buf;
>  
> -     recv_buf = malloc (sizeof (struct recv_buf));
> +     recv_buf = iba_buf_alloc();
>       if (recv_buf == NULL) {
>               return (NULL);
>       }
>  
> -     recv_buf->mr = ibv_reg_mr (instance->recv_token_pd, &recv_buf->buffer,
> -             2048,
> -             IBV_ACCESS_LOCAL_WRITE);
> -
> -     recv_buf->recv_wr.next = NULL;
> -     recv_buf->recv_wr.sg_list = &recv_buf->sge;
> -     recv_buf->recv_wr.num_sge = 1;
> -     recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
> +     if (iba_buf_register (instance, recv_buf, instance->recv_token_pd)) {
> +             return (NULL);
> +     }
>  
> -     recv_buf->sge.length = 2048;
> -     recv_buf->sge.lkey = recv_buf->mr->lkey;
> -     recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
> +     list_add (&recv_buf->list_active, &instance->recv_token_recv_buf_head);
>  
> -     list_init (&recv_buf->list_all);
> -     list_add (&recv_buf->list_all, &instance->recv_token_recv_buf_head);
>       return (recv_buf);
>  }
>  
> -static inline int recv_token_recv_buf_post (struct totemiba_instance 
> *instance, struct recv_buf *recv_buf)
> +static inline int recv_token_recv_buf_post (struct totemiba_instance 
> *instance, struct iba_buf *recv_buf)
>  {
> -     struct ibv_recv_wr *fail_recv;
> +     struct ibv_recv_wr recv_wr, *fail_recv;
> +     struct ibv_sge sge;
>       int res;
>  
> -     res = ibv_post_recv (instance->recv_token_cma_id->qp, 
> &recv_buf->recv_wr, &fail_recv);
> +     recv_wr.next = NULL;
> +     recv_wr.sg_list = &sge;
> +     recv_wr.num_sge = 1;
> +     recv_wr.wr_id = (uintptr_t)recv_buf;
> +
> +     sge.length = 2048;
> +     sge.lkey = recv_buf->mr->lkey;
> +     sge.addr = (uintptr_t)recv_buf->b.buffer;
> +
> +     res = ibv_post_recv (instance->recv_token_cma_id->qp, &recv_wr, 
> &fail_recv);
>  
>       return (res);
>  }
>  
>  static inline void recv_token_recv_buf_post_initial (struct 
> totemiba_instance *instance)
>  {
> -     struct recv_buf *recv_buf;
> +     struct iba_buf *recv_buf;
>       unsigned int i;
>  
>       for (i = 0; i < TOTAL_READ_POSTS; i++) {
> -             recv_buf = recv_token_recv_buf_create (instance);
> +             recv_buf = recv_token_recv_buf_get (instance);
>  
> -             recv_token_recv_buf_post (instance, recv_buf);
> +             if (recv_buf) {
> +                     recv_token_recv_buf_post (instance, recv_buf);
> +             }
>       }
>  }
>  
> -static inline void recv_token_recv_buf_post_destroy (
> -     struct totemiba_instance *instance)
> +static inline void buf_list_destroy (struct list_head *head)
>  {
> -     struct recv_buf *recv_buf;
> -     struct list_head *list;
> +     struct list_head *list = head->next;
>  
> -     for (list = instance->recv_token_recv_buf_head.next;
> -             list != &instance->recv_token_recv_buf_head;) {
> -
> -             recv_buf = list_entry (list, struct recv_buf, list_all);
> +     while (list != head) {
> +             struct iba_buf *iba_buf = list_entry (list, struct iba_buf, 
> list_active);
>               list = list->next;
> -             ibv_dereg_mr (recv_buf->mr);
> -             free (recv_buf);
> +             iba_buf_put (iba_buf);
>       }
> -     list_init (&instance->recv_token_recv_buf_head);
>  }
>  
> -static inline struct recv_buf *mcast_recv_buf_create (struct 
> totemiba_instance *instance)
> +static inline struct iba_buf *mcast_recv_buf_get (
> +     struct totemiba_instance *instance)
>  {
> -     struct recv_buf *recv_buf;
> -     struct ibv_mr *mr;
> +     struct iba_buf *recv_buf;
>  
> -     recv_buf = malloc (sizeof (struct recv_buf));
> +     recv_buf = iba_buf_alloc();
>       if (recv_buf == NULL) {
>               return (NULL);
>       }
>  
> -     mr = ibv_reg_mr (instance->mcast_pd, &recv_buf->buffer,
> -             2048,
> -             IBV_ACCESS_LOCAL_WRITE);
> -
> -     recv_buf->recv_wr.next = NULL;
> -     recv_buf->recv_wr.sg_list = &recv_buf->sge;
> -     recv_buf->recv_wr.num_sge = 1;
> -     recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
> -
> -     recv_buf->sge.length = 2048;
> -     recv_buf->sge.lkey = mr->lkey;
> -     recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
> +     if (iba_buf_register (instance, recv_buf, instance->mcast_pd)) {
> +             return (NULL);
> +     }
>  
>       return (recv_buf);
>  }
>  
> -static inline int mcast_recv_buf_post (struct totemiba_instance *instance, 
> struct recv_buf *recv_buf)
> +static inline int mcast_recv_buf_post (struct totemiba_instance *instance, 
> struct iba_buf *recv_buf)
>  {
> -     struct ibv_recv_wr *fail_recv;
> +     struct ibv_recv_wr recv_wr, *fail_recv;
> +     struct ibv_sge sge;
>       int res;
>  
> -     res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_buf->recv_wr, 
> &fail_recv);
> +     recv_wr.next = NULL;
> +     recv_wr.sg_list = &sge;
> +     recv_wr.num_sge = 1;
> +     recv_wr.wr_id = (uintptr_t)recv_buf;
> +
> +     sge.length = 2048;
> +     sge.lkey = recv_buf->mr->lkey;
> +     sge.addr = (uintptr_t)recv_buf->b.buffer;
> +
> +     res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_wr, &fail_recv);
>  
>       return (res);
>  }
>  
>  static inline void mcast_recv_buf_post_initial (struct totemiba_instance 
> *instance)
>  {
> -     struct recv_buf *recv_buf;
> +     struct iba_buf *recv_buf;
>       unsigned int i;
>  
>       for (i = 0; i < TOTAL_READ_POSTS; i++) {
> -             recv_buf = mcast_recv_buf_create (instance);
> +             recv_buf = mcast_recv_buf_get (instance);
>  
>               mcast_recv_buf_post (instance, recv_buf);
>       }
>  }
>  
> -static inline void iba_deliver_fn (struct totemiba_instance *instance, 
> uint64_t wr_id, uint32_t bytes)
> +static inline void iba_deliver_fn (
> +     struct totemiba_instance *instance,
> +     const struct iba_buf *recv_buf,
> +     uint32_t bytes)
>  {
> -     const char *addr;
> -     const struct recv_buf *recv_buf;
> -
> -     recv_buf = wrid2void(wr_id);
> -     addr = &recv_buf->buffer[sizeof (struct ibv_grh)];
> -
> -     instance->totemiba_deliver_fn (instance->rrp_context, addr, bytes);
> +     instance->totemiba_deliver_fn (instance->rrp_context, 
> recv_buf->b.p.packet, bytes);
>  }
>  
>  static int mcast_cq_send_event_fn (hdb_handle_t poll_handle,  int events,  
> int suck,  void *context)
> @@ -481,7 +483,7 @@ static int mcast_cq_send_event_fn (hdb_handle_t 
> poll_handle,  int events,  int s
>       res = ibv_poll_cq (instance->mcast_send_cq, 32, wc);
>       if (res > 0) {
>               for (i = 0; i < res; i++) {
> -                     mcast_send_buf_put (instance, wrid2void(wc[i].wr_id));
> +                     iba_buf_put (wrid2void (wc[i].wr_id));
>               }
>       }
>  
> @@ -504,8 +506,17 @@ static int mcast_cq_recv_event_fn (hdb_handle_t 
> poll_handle,  int events,  int s
>       res = ibv_poll_cq (instance->mcast_recv_cq, 64, wc);
>       if (res > 0) {
>               for (i = 0; i < res; i++) {
> -                     iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
> -                     mcast_recv_buf_post (instance, wrid2void(wc[i].wr_id));
> +                     struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
> +                     iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
> +
> +                     if (totembuf_is_retained (recv_buf)) {
> +                             /* Buffer has been added to the send queue */
> +                             iba_buf_put (recv_buf);
> +
> +                             /* Get a new receive buffer to post to the 
> receive queue */
> +                             recv_buf = mcast_recv_buf_get (instance);
> +                     }
> +                     mcast_recv_buf_post (instance, recv_buf);
>               }
>       }
>  
> @@ -573,8 +584,9 @@ static int recv_token_cq_send_event_fn (hdb_handle_t 
> poll_handle,  int events,
>       res = ibv_poll_cq (instance->recv_token_send_cq, 32, wc);
>       if (res > 0) {
>               for (i = 0; i < res; i++) {
> -                     iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
> -                     ibv_dereg_mr (wrid2void(wc[i].wr_id));
> +                     struct iba_buf *send_buf = wrid2void (wc[i].wr_id);
> +                     iba_deliver_fn (instance, send_buf, wc[i].byte_len);
> +                     iba_buf_put (send_buf);
>               }
>       }
>  
> @@ -597,8 +609,17 @@ static int recv_token_cq_recv_event_fn (hdb_handle_t 
> poll_handle,  int events,
>       res = ibv_poll_cq (instance->recv_token_recv_cq, 32, wc);
>       if (res > 0) {
>               for (i = 0; i < res; i++) {
> -                     iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
> -                     recv_token_recv_buf_post (instance, 
> wrid2void(wc[i].wr_id));
> +                     struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
> +                     iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
> +
> +                     if (totembuf_is_retained (recv_buf)) {
> +                             /* Buffer has been added to the send queue */
> +                             iba_buf_put (recv_buf);
> +
> +                             /* Get a new receive buffer to post to the 
> receive queue */
> +                             recv_buf = recv_token_recv_buf_get (instance);
> +                     }
> +                     recv_token_recv_buf_post (instance, recv_buf);
>               }
>       }
>  
> @@ -613,7 +634,7 @@ static int recv_token_accept_destroy (struct 
> totemiba_instance *instance)
>  
>       rdma_destroy_qp (instance->recv_token_cma_id);
>  
> -     recv_token_recv_buf_post_destroy (instance);
> +     buf_list_destroy (&instance->recv_token_recv_buf_head);
>  
>       ibv_destroy_cq (instance->recv_token_send_cq);
>  
> @@ -779,7 +800,7 @@ static int send_token_cq_send_event_fn (hdb_handle_t 
> poll_handle,  int events,
>       res = ibv_poll_cq (instance->send_token_send_cq, 32, wc);
>       if (res > 0) {
>               for (i = 0; i < res; i++) {
> -                     token_send_buf_put (instance, wrid2void(wc[i].wr_id));
> +                     iba_buf_put (wrid2void (wc[i].wr_id));
>               }
>       }
>  
> @@ -802,7 +823,9 @@ static int send_token_cq_recv_event_fn (hdb_handle_t 
> poll_handle,  int events,
>       res = ibv_poll_cq (instance->send_token_recv_cq, 32, wc);
>       if (res > 0) {
>               for (i = 0; i < res; i++) {
> -                     iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
> +                     struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
> +                     iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
> +                     iba_buf_put (recv_buf);
>               }
>       }
>  
> @@ -1020,7 +1043,7 @@ static int send_token_unbind (struct totemiba_instance 
> *instance)
>       ibv_destroy_cq (instance->send_token_recv_cq);
>       ibv_destroy_comp_channel (instance->send_token_send_completion_channel);
>       ibv_destroy_comp_channel (instance->send_token_recv_completion_channel);
> -     token_send_buf_destroy (instance);
> +     buf_list_destroy (&instance->token_send_buf_head);
>       ibv_dealloc_pd (instance->send_token_pd);
>       rdma_destroy_id (instance->send_token_cma_id);
>       rdma_destroy_event_channel (instance->send_token_channel);
> @@ -1283,6 +1306,10 @@ int totemiba_initialize (
>       struct totemiba_instance *instance;
>       int res = 0;
>  
> +     if (!free_list) {
> +             free_list = totembuf_list_init (sizeof (struct iba_buf));
> +     }
> +
>       instance = malloc (sizeof (struct totemiba_instance));
>       if (instance == NULL) {
>               return (-1);
> @@ -1319,21 +1346,36 @@ int totemiba_initialize (
>  
>  void *totemiba_buffer_alloc (void)
>  {
> -     return malloc (MAX_MTU_SIZE);
> +     struct iba_buf *send_buf = iba_buf_alloc();
> +     if (send_buf == NULL) {
> +             return (NULL);
> +     }
> +     return (send_buf->b.p.packet);
>  }
>  
>  void *totemiba_buffer_retain (void *ptr)
>  {
> -     void *new_buf = totemiba_buffer_alloc();
> -     if (new_buf) {
> -             memcpy (new_buf, ptr, FRAME_SIZE_MAX);
> +     struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ptr);
> +     assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
> +
> +     send_buf = totembuf_retain (send_buf);
> +     if (send_buf == NULL) {
> +             return (NULL);
>       }
> -     return new_buf;
> +     return (send_buf->b.p.packet);
>  }
>  
>  void totemiba_buffer_release (void *ptr)
>  {
> -     return free (ptr);
> +     struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ptr);
> +     assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
> +
> +     if (send_buf->mr) {
> +             /* Still awaiting transmission */
> +             totembuf_release (send_buf);
> +     } else {
> +             totembuf_dealloc (send_buf);
> +     }
>  }
>  
>  int totemiba_processor_count_set (
> @@ -1367,15 +1409,12 @@ int totemiba_token_send (
>       int res = 0;
>       struct ibv_send_wr send_wr, *failed_send_wr;
>       struct ibv_sge sge;
> -     void *msg;
> -     struct send_buf *send_buf;
> +     struct iba_buf *send_buf;
>  
> -     send_buf = token_send_buf_get (instance);
> +     send_buf = token_send_buf_get (instance, ms, msg_len);
>       if (send_buf == NULL) {
>               return (-1);
>       }
> -     msg = send_buf->buffer;
> -     memcpy (msg, ms, msg_len);
>  
>       send_wr.next = NULL;
>       send_wr.sg_list = &sge;
> @@ -1390,7 +1429,7 @@ int totemiba_token_send (
>  
>       sge.length = msg_len;
>       sge.lkey = send_buf->mr->lkey;
> -     sge.addr = (uintptr_t)msg;
> +     sge.addr = (uintptr_t)send_buf->b.p.packet;
>  
>       res = ibv_post_send (instance->send_token_cma_id->qp, &send_wr, 
> &failed_send_wr);
>  
> @@ -1406,16 +1445,14 @@ int totemiba_mcast_flush_send (
>       int res = 0;
>       struct ibv_send_wr send_wr, *failed_send_wr;
>       struct ibv_sge sge;
> -     void *msg;
> -     struct send_buf *send_buf;
> +     struct iba_buf *send_buf;
>  
> -     send_buf = mcast_send_buf_get (instance);
> -     if (send_buf == NULL) {
> +     send_buf = send_buf_new (instance, ms, msg_len);
> +     if (send_buf == NULL ||
> +         iba_buf_register (instance, send_buf, instance->mcast_pd)) {
>               return (-1);
>       }
>  
> -     msg = send_buf->buffer;
> -     memcpy (msg, ms, msg_len);
>       send_wr.next = NULL;
>       send_wr.sg_list = &sge;
>       send_wr.num_sge = 1;
> @@ -1429,7 +1466,7 @@ int totemiba_mcast_flush_send (
>  
>       sge.length = msg_len;
>       sge.lkey = send_buf->mr->lkey;
> -     sge.addr = (uintptr_t)msg;
> +     sge.addr = (uintptr_t)send_buf->b.p.packet;
>  
>       res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, 
> &failed_send_wr);
>       return (res);
> @@ -1444,16 +1481,13 @@ int totemiba_mcast_noflush_send (
>       int res = 0;
>       struct ibv_send_wr send_wr, *failed_send_wr;
>       struct ibv_sge sge;
> -     void *msg;
> -     struct send_buf *send_buf;
> +     struct iba_buf *send_buf;
>  
> -     send_buf = mcast_send_buf_get (instance);
> +     send_buf = mcast_send_buf_get (instance, ms, msg_len);
>       if (send_buf == NULL) {
>               return (-1);
>       }
>  
> -     msg = send_buf->buffer;
> -     memcpy (msg, ms, msg_len);
>       send_wr.next = NULL;
>       send_wr.sg_list = &sge;
>       send_wr.num_sge = 1;
> @@ -1467,7 +1501,7 @@ int totemiba_mcast_noflush_send (
>  
>       sge.length = msg_len;
>       sge.lkey = send_buf->mr->lkey;
> -     sge.addr = (uintptr_t)msg;
> +     sge.addr = (uintptr_t)send_buf->b.p.packet;
>  
>       res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, 
> &failed_send_wr);
>       return (res);
> 
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to