Ah, thanks! I think the problem is that totemiba_buffer_alloc() should do 
totembuf_retain() on a new buffer before returning it. I'll fix that up.

cheers,
Zane.

On 2011/04/18, at 16:24, Steven Dake wrote:

> This code asserts unless iba_buf_put's totembuf_dealloc is commented
> out.  The assertion is:
> #0  0x0000003676c330c5 in raise () from /lib64/libc.so.6
> #1  0x0000003676c34a76 in abort () from /lib64/libc.so.6
> #2  0x0000003676c2b905 in __assert_fail () from /lib64/libc.so.6
> #3  0x00007ffff7dd550e in totembuf_release (ptr=<value optimized out>)
>    at totembuf.c:193
> #4  0x00007ffff7dcbc71 in totemsrp_buffer_release (instance=0x7ffff5db9010,
>    msg=<value optimized out>, msg_len=<value optimized out>,
>    endian_conversion_needed=<value optimized out>) at totemsrp.c:1383
> #5  messages_free (instance=0x7ffff5db9010, msg=<value optimized out>,
>    msg_len=<value optimized out>,
>    endian_conversion_needed=<value optimized out>) at totemsrp.c:2336
> #6  message_handler_orf_token (instance=0x7ffff5db9010,
>    msg=<value optimized out>, msg_len=<value optimized out>,
>    endian_conversion_needed=<value optimized out>) at totemsrp.c:3487
> #7  0x00007ffff7dc7db3 in rrp_deliver_fn (context=0x63a290, msg=0x825134,
>    msg_len=110) at totemrrp.c:1454
> #8  0x00007ffff7ddb863 in iba_deliver_fn (poll_handle=<value optimized
> out>,
>    events=<value optimized out>, suck=<value optimized out>,
>    context=0x682550) at totemiba.c:467
> #9  recv_token_cq_recv_event_fn (poll_handle=<value optimized out>,
>    events=<value optimized out>, suck=<value optimized out>,
>    context=0x682550) at totemiba.c:613
> #10 0x00007ffff7dc1c02 in poll_run (handle=150346236434579456)
> #11 0x0000000000407a2b in main (argc=<value optimized out>,
>    argv=<value optimized out>, envp=0x7fffffffe548) at main.c:1874
> 
> I wasn't able to understand why this happens, only that it appears
> dealloc will be called twice during this condition resulting in a double
> free.
> 
> On 04/16/2011 02:11 PM, Zane Bitter wrote:
>> (Work in Progress)
>> Note that this patch is completely untested to date.
>> ---
>> exec/totemiba.c |  388 
>> ++++++++++++++++++++++++++++++-------------------------
>> 1 files changed, 211 insertions(+), 177 deletions(-)
>> 
>> diff --git a/exec/totemiba.c b/exec/totemiba.c
>> index 5aa2787..8ef52c9 100644
>> --- a/exec/totemiba.c
>> +++ b/exec/totemiba.c
>> @@ -59,6 +59,7 @@
>> #include <stdio.h>
>> #include <string.h>
>> #include <stdlib.h>
>> +#include <stddef.h>
>> #include <sys/types.h>
>> #include <sys/socket.h>
>> #include <netdb.h>
>> @@ -73,6 +74,7 @@
>> #include <corosync/totem/coropoll.h>
>> #define LOGSYS_UTILS_ONLY 1
>> #include <corosync/engine/logsys.h>
>> +#include "totembuf.h"
>> #include "totemiba.h"
>> #include "wthread.h"
>> 
>> @@ -82,6 +84,8 @@
>> 
>> #define MAX_MTU_SIZE 4096
>> 
>> +#define TOTEMIBA_BUFFER_MAGIC 0xbeeffeedu
>> +
>> struct totemiba_instance {
>>      struct sockaddr bind_addr;
>> 
>> @@ -195,12 +199,6 @@ struct totemiba_instance {
>> 
>>      int totemiba_subsys_id;
>> 
>> -    struct list_head mcast_send_buf_free;
>> -
>> -    struct list_head token_send_buf_free;
>> -
>> -    struct list_head mcast_send_buf_head;
>> -
>>      struct list_head token_send_buf_head;
>> 
>>      struct list_head recv_token_recv_buf_head;
>> @@ -218,22 +216,22 @@ do {                                                   
>>                 \
>>                                  LOGSYS_RECID_LOG),                  \
>>                 __FUNCTION__, __FILE__, __LINE__,                    \
>>              (const char *)format, ##args);                          \
>> -} while (0);
>> +} while (0)
>> 
>> -struct recv_buf {
>> -    struct list_head list_all;
>> -    struct ibv_recv_wr recv_wr;
>> -    struct ibv_sge sge;
>> +struct iba_buf {
>> +    uint32_t magic;
>> +    struct list_head list_active;
>>      struct ibv_mr *mr;
>> -    char buffer[MAX_MTU_SIZE];
>> +    union {
>> +            char buffer[MAX_MTU_SIZE];
>> +            struct {
>> +                    struct ibv_grh header;
>> +                    char packet[0];
>> +            } p;
>> +    } b;
>> };
>> 
>> -struct send_buf {
>> -    struct list_head list_free;
>> -    struct list_head list_all;
>> -    struct ibv_mr *mr;
>> -    char buffer[MAX_MTU_SIZE];
>> -};
>> +#define IBA_BUF_FROM_PACKET(PACKET) (struct iba_buf *)(((char *)(PACKET)) - 
>> offsetof(struct iba_buf, b.p.packet))
>> 
>> static hdb_handle_t
>> void2wrid (void *v) { union u u; u.v = v; return u.wr_id; }
>> @@ -241,228 +239,232 @@ void2wrid (void *v) { union u u; u.v = v; return 
>> u.wr_id; }
>> static void *
>> wrid2void (uint64_t wr_id) { union u u; u.wr_id = wr_id; return u.v; }
>> 
>> +static struct totembuf_list *free_list = NULL;
>> +
>> static void totemiba_instance_initialize (struct totemiba_instance *instance)
>> {
>>      memset (instance, 0, sizeof (struct totemiba_instance));
>> -    list_init (&instance->mcast_send_buf_free);
>> -    list_init (&instance->token_send_buf_free);
>> -    list_init (&instance->mcast_send_buf_head);
>>      list_init (&instance->token_send_buf_head);
>>      list_init (&instance->recv_token_recv_buf_head);
>> }
>> 
>> -static inline struct send_buf *mcast_send_buf_get (
>> -    struct totemiba_instance *instance)
>> +static struct iba_buf *iba_buf_alloc (void)
>> {
>> -    struct send_buf *send_buf;
>> -
>> -    if (list_empty (&instance->mcast_send_buf_free) == 0) {
>> -            send_buf = list_entry (instance->mcast_send_buf_free.next, 
>> struct send_buf, list_free);
>> -            list_del (&send_buf->list_free);
>> -            return (send_buf);
>> -    }
>> +    struct iba_buf *new_buf = totembuf_alloc (free_list);
>> 
>> -    send_buf = malloc (sizeof (struct send_buf));
>> -    if (send_buf == NULL) {
>> +    if (new_buf == NULL) {
>>              return (NULL);
>>      }
>> -    send_buf->mr = ibv_reg_mr (instance->mcast_pd,
>> -            send_buf->buffer,
>> +
>> +    new_buf->magic = TOTEMIBA_BUFFER_MAGIC;
>> +    list_init (&new_buf->list_active);
>> +    new_buf->mr = NULL;
>> +
>> +    return (new_buf);
>> +}
>> +
>> +static int iba_buf_register (
>> +    struct totemiba_instance *instance,
>> +    struct iba_buf *buf,
>> +    struct ibv_pd *pd)
>> +{
>> +    buf->mr = ibv_reg_mr (pd, buf->b.buffer,
>>              2048, IBV_ACCESS_LOCAL_WRITE);
>> -    if (send_buf->mr == NULL) {
>> +    if (buf->mr == NULL) {
>>              log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory 
>> range\n");
>> -            return (NULL);
>> +            if (!totembuf_is_retained (buf)) {
>> +                    totembuf_dealloc (buf);
>> +            }
>> +            return (-1);
>>      }
>> -    list_init (&send_buf->list_all);
>> -    list_add_tail (&send_buf->list_all, &instance->mcast_send_buf_head);
>> -            
>> -    return (send_buf);
>> +    return (0);
>> }
>> 
>> -static inline void mcast_send_buf_put (
>> +static inline struct iba_buf *send_buf_new (
>>      struct totemiba_instance *instance,
>> -    struct send_buf *send_buf)
>> +    const void *data,
>> +    size_t length)
>> {
>> -    list_init (&send_buf->list_free);
>> -    list_add_tail (&send_buf->list_free, &instance->mcast_send_buf_free);
>> +    struct iba_buf *new_buf = iba_buf_alloc();
>> +
>> +    if (new_buf == NULL) {
>> +            return (NULL);
>> +    }
>> +
>> +    assert (length <= (sizeof (new_buf->b.buffer) - sizeof 
>> (new_buf->b.p.header)));
>> +    memcpy (new_buf->b.p.packet, data, length);
>> +
>> +    return (new_buf);
>> }
>> 
>> -static inline struct send_buf *token_send_buf_get (
>> -    struct totemiba_instance *instance)
>> +static inline struct iba_buf *mcast_send_buf_get (
>> +    struct totemiba_instance *instance,
>> +    const void *ms,
>> +    size_t length)
>> {
>> -    struct send_buf *send_buf;
>> +    struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ms);
>> +    assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>> 
>> -    if (list_empty (&instance->token_send_buf_free) == 0) {
>> -            send_buf = list_entry (instance->token_send_buf_free.next, 
>> struct send_buf, list_free);
>> -            list_del (&send_buf->list_free);
>> -            return (send_buf);
>> +    if (send_buf->mr) {
>> +            /* Buffer is already enqueued. Make a copy. */
>> +            send_buf = send_buf_new (instance, send_buf->b.p.packet, 
>> length);
>>      }
>> 
>> -    send_buf = malloc (sizeof (struct send_buf));
>> -    if (send_buf == NULL) {
>> +    if (iba_buf_register (instance, send_buf, instance->mcast_pd)) {
>>              return (NULL);
>>      }
>> -    send_buf->mr = ibv_reg_mr (instance->send_token_pd,
>> -            send_buf->buffer,
>> -            2048, IBV_ACCESS_LOCAL_WRITE);
>> -    if (send_buf->mr == NULL) {
>> -            log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory 
>> range\n");
>> -            return (NULL);
>> -    }
>> -    list_init (&send_buf->list_all);
>> -    list_add_tail (&send_buf->list_all, &instance->token_send_buf_head);
>> -            
>> +
>>      return (send_buf);
>> }
>> 
>> -static inline void token_send_buf_destroy (struct totemiba_instance 
>> *instance)
>> +static void iba_buf_put (struct iba_buf *iba_buf)
>> {
>> -    struct list_head *list;
>> -    struct send_buf *send_buf;
>> -
>> -        for (list = instance->token_send_buf_head.next; list != 
>> &instance->token_send_buf_head;) {
>> -                send_buf = list_entry (list, struct send_buf, list_all);
>> -            list = list->next;
>> -            ibv_dereg_mr (send_buf->mr);
>> -            free (send_buf);
>> +    list_del (&iba_buf->list_active);
>> +    list_init (&iba_buf->list_active);
>> +    ibv_dereg_mr (iba_buf->mr);
>> +    iba_buf->mr = NULL;
>> +    if (!totembuf_is_retained (iba_buf)) {
>> +            totembuf_dealloc (iba_buf);
>>      }
>> -
>> -    list_init (&instance->token_send_buf_free);
>> -    list_init (&instance->token_send_buf_head);
>> }
>> 
>> -static inline void token_send_buf_put (
>> +static inline struct iba_buf *token_send_buf_get (
>>      struct totemiba_instance *instance,
>> -    struct send_buf *send_buf)
>> +    const void *ms,
>> +    size_t length)
>> {
>> -    list_init (&send_buf->list_free);
>> -    list_add_tail (&send_buf->list_free, &instance->token_send_buf_free);
>> +    struct iba_buf *send_buf;
>> +
>> +    send_buf = send_buf_new (instance, ms, length);
>> +
>> +    if (iba_buf_register (instance, send_buf, instance->send_token_pd)) {
>> +            return (NULL);
>> +    }
>> +
>> +    list_add_tail (&send_buf->list_active, &instance->token_send_buf_head);
>> +
>> +    return (send_buf);
>> }
>> 
>> -static inline struct recv_buf *recv_token_recv_buf_create (
>> +static inline struct iba_buf *recv_token_recv_buf_get (
>>      struct totemiba_instance *instance)
>> {
>> -    struct recv_buf *recv_buf;
>> +    struct iba_buf *recv_buf;
>> 
>> -    recv_buf = malloc (sizeof (struct recv_buf));
>> +    recv_buf = iba_buf_alloc();
>>      if (recv_buf == NULL) {
>>              return (NULL);
>>      }
>> 
>> -    recv_buf->mr = ibv_reg_mr (instance->recv_token_pd, &recv_buf->buffer,
>> -            2048,
>> -            IBV_ACCESS_LOCAL_WRITE);
>> -
>> -    recv_buf->recv_wr.next = NULL;
>> -    recv_buf->recv_wr.sg_list = &recv_buf->sge;
>> -    recv_buf->recv_wr.num_sge = 1;
>> -    recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
>> +    if (iba_buf_register (instance, recv_buf, instance->recv_token_pd)) {
>> +            return (NULL);
>> +    }
>> 
>> -    recv_buf->sge.length = 2048;
>> -    recv_buf->sge.lkey = recv_buf->mr->lkey;
>> -    recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
>> +    list_add (&recv_buf->list_active, &instance->recv_token_recv_buf_head);
>> 
>> -    list_init (&recv_buf->list_all);
>> -    list_add (&recv_buf->list_all, &instance->recv_token_recv_buf_head);
>>      return (recv_buf);
>> }
>> 
>> -static inline int recv_token_recv_buf_post (struct totemiba_instance 
>> *instance, struct recv_buf *recv_buf)
>> +static inline int recv_token_recv_buf_post (struct totemiba_instance 
>> *instance, struct iba_buf *recv_buf)
>> {
>> -    struct ibv_recv_wr *fail_recv;
>> +    struct ibv_recv_wr recv_wr, *fail_recv;
>> +    struct ibv_sge sge;
>>      int res;
>> 
>> -    res = ibv_post_recv (instance->recv_token_cma_id->qp, 
>> &recv_buf->recv_wr, &fail_recv);
>> +    recv_wr.next = NULL;
>> +    recv_wr.sg_list = &sge;
>> +    recv_wr.num_sge = 1;
>> +    recv_wr.wr_id = (uintptr_t)recv_buf;
>> +
>> +    sge.length = 2048;
>> +    sge.lkey = recv_buf->mr->lkey;
>> +    sge.addr = (uintptr_t)recv_buf->b.buffer;
>> +
>> +    res = ibv_post_recv (instance->recv_token_cma_id->qp, &recv_wr, 
>> &fail_recv);
>> 
>>      return (res);
>> }
>> 
>> static inline void recv_token_recv_buf_post_initial (struct 
>> totemiba_instance *instance)
>> {
>> -    struct recv_buf *recv_buf;
>> +    struct iba_buf *recv_buf;
>>      unsigned int i;
>> 
>>      for (i = 0; i < TOTAL_READ_POSTS; i++) {
>> -            recv_buf = recv_token_recv_buf_create (instance);
>> +            recv_buf = recv_token_recv_buf_get (instance);
>> 
>> -            recv_token_recv_buf_post (instance, recv_buf);
>> +            if (recv_buf) {
>> +                    recv_token_recv_buf_post (instance, recv_buf);
>> +            }
>>      }
>> }
>> 
>> -static inline void recv_token_recv_buf_post_destroy (
>> -    struct totemiba_instance *instance)
>> +static inline void buf_list_destroy (struct list_head *head)
>> {
>> -    struct recv_buf *recv_buf;
>> -    struct list_head *list;
>> +    struct list_head *list = head->next;
>> 
>> -    for (list = instance->recv_token_recv_buf_head.next;
>> -            list != &instance->recv_token_recv_buf_head;) {
>> -
>> -            recv_buf = list_entry (list, struct recv_buf, list_all);
>> +    while (list != head) {
>> +            struct iba_buf *iba_buf = list_entry (list, struct iba_buf, 
>> list_active);
>>              list = list->next;
>> -            ibv_dereg_mr (recv_buf->mr);
>> -            free (recv_buf);
>> +            iba_buf_put (iba_buf);
>>      }
>> -    list_init (&instance->recv_token_recv_buf_head);
>> }
>> 
>> -static inline struct recv_buf *mcast_recv_buf_create (struct 
>> totemiba_instance *instance)
>> +static inline struct iba_buf *mcast_recv_buf_get (
>> +    struct totemiba_instance *instance)
>> {
>> -    struct recv_buf *recv_buf;
>> -    struct ibv_mr *mr;
>> +    struct iba_buf *recv_buf;
>> 
>> -    recv_buf = malloc (sizeof (struct recv_buf));
>> +    recv_buf = iba_buf_alloc();
>>      if (recv_buf == NULL) {
>>              return (NULL);
>>      }
>> 
>> -    mr = ibv_reg_mr (instance->mcast_pd, &recv_buf->buffer,
>> -            2048,
>> -            IBV_ACCESS_LOCAL_WRITE);
>> -
>> -    recv_buf->recv_wr.next = NULL;
>> -    recv_buf->recv_wr.sg_list = &recv_buf->sge;
>> -    recv_buf->recv_wr.num_sge = 1;
>> -    recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
>> -
>> -    recv_buf->sge.length = 2048;
>> -    recv_buf->sge.lkey = mr->lkey;
>> -    recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
>> +    if (iba_buf_register (instance, recv_buf, instance->mcast_pd)) {
>> +            return (NULL);
>> +    }
>> 
>>      return (recv_buf);
>> }
>> 
>> -static inline int mcast_recv_buf_post (struct totemiba_instance *instance, 
>> struct recv_buf *recv_buf)
>> +static inline int mcast_recv_buf_post (struct totemiba_instance *instance, 
>> struct iba_buf *recv_buf)
>> {
>> -    struct ibv_recv_wr *fail_recv;
>> +    struct ibv_recv_wr recv_wr, *fail_recv;
>> +    struct ibv_sge sge;
>>      int res;
>> 
>> -    res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_buf->recv_wr, 
>> &fail_recv);
>> +    recv_wr.next = NULL;
>> +    recv_wr.sg_list = &sge;
>> +    recv_wr.num_sge = 1;
>> +    recv_wr.wr_id = (uintptr_t)recv_buf;
>> +
>> +    sge.length = 2048;
>> +    sge.lkey = recv_buf->mr->lkey;
>> +    sge.addr = (uintptr_t)recv_buf->b.buffer;
>> +
>> +    res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_wr, &fail_recv);
>> 
>>      return (res);
>> }
>> 
>> static inline void mcast_recv_buf_post_initial (struct totemiba_instance 
>> *instance)
>> {
>> -    struct recv_buf *recv_buf;
>> +    struct iba_buf *recv_buf;
>>      unsigned int i;
>> 
>>      for (i = 0; i < TOTAL_READ_POSTS; i++) {
>> -            recv_buf = mcast_recv_buf_create (instance);
>> +            recv_buf = mcast_recv_buf_get (instance);
>> 
>>              mcast_recv_buf_post (instance, recv_buf);
>>      }
>> }
>> 
>> -static inline void iba_deliver_fn (struct totemiba_instance *instance, 
>> uint64_t wr_id, uint32_t bytes)
>> +static inline void iba_deliver_fn (
>> +    struct totemiba_instance *instance,
>> +    const struct iba_buf *recv_buf,
>> +    uint32_t bytes)
>> {
>> -    const char *addr;
>> -    const struct recv_buf *recv_buf;
>> -
>> -    recv_buf = wrid2void(wr_id);
>> -    addr = &recv_buf->buffer[sizeof (struct ibv_grh)];
>> -
>> -    instance->totemiba_deliver_fn (instance->rrp_context, addr, bytes);
>> +    instance->totemiba_deliver_fn (instance->rrp_context, 
>> recv_buf->b.p.packet, bytes);
>> }
>> 
>> static int mcast_cq_send_event_fn (hdb_handle_t poll_handle,  int events,  
>> int suck,  void *context)
>> @@ -481,7 +483,7 @@ static int mcast_cq_send_event_fn (hdb_handle_t 
>> poll_handle,  int events,  int s
>>      res = ibv_poll_cq (instance->mcast_send_cq, 32, wc);
>>      if (res > 0) {
>>              for (i = 0; i < res; i++) {
>> -                    mcast_send_buf_put (instance, wrid2void(wc[i].wr_id));
>> +                    iba_buf_put (wrid2void (wc[i].wr_id));
>>              }
>>      }
>> 
>> @@ -504,8 +506,17 @@ static int mcast_cq_recv_event_fn (hdb_handle_t 
>> poll_handle,  int events,  int s
>>      res = ibv_poll_cq (instance->mcast_recv_cq, 64, wc);
>>      if (res > 0) {
>>              for (i = 0; i < res; i++) {
>> -                    iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> -                    mcast_recv_buf_post (instance, wrid2void(wc[i].wr_id));
>> +                    struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
>> +                    iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
>> +
>> +                    if (totembuf_is_retained (recv_buf)) {
>> +                            /* Buffer has been added to the send queue */
>> +                            iba_buf_put (recv_buf);
>> +
>> +                            /* Get a new receive buffer to post to the 
>> receive queue */
>> +                            recv_buf = mcast_recv_buf_get (instance);
>> +                    }
>> +                    mcast_recv_buf_post (instance, recv_buf);
>>              }
>>      }
>> 
>> @@ -573,8 +584,9 @@ static int recv_token_cq_send_event_fn (hdb_handle_t 
>> poll_handle,  int events,
>>      res = ibv_poll_cq (instance->recv_token_send_cq, 32, wc);
>>      if (res > 0) {
>>              for (i = 0; i < res; i++) {
>> -                    iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> -                    ibv_dereg_mr (wrid2void(wc[i].wr_id));
>> +                    struct iba_buf *send_buf = wrid2void (wc[i].wr_id);
>> +                    iba_deliver_fn (instance, send_buf, wc[i].byte_len);
>> +                    iba_buf_put (send_buf);
>>              }
>>      }
>> 
>> @@ -597,8 +609,17 @@ static int recv_token_cq_recv_event_fn (hdb_handle_t 
>> poll_handle,  int events,
>>      res = ibv_poll_cq (instance->recv_token_recv_cq, 32, wc);
>>      if (res > 0) {
>>              for (i = 0; i < res; i++) {
>> -                    iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> -                    recv_token_recv_buf_post (instance, 
>> wrid2void(wc[i].wr_id));
>> +                    struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
>> +                    iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
>> +
>> +                    if (totembuf_is_retained (recv_buf)) {
>> +                            /* Buffer has been added to the send queue */
>> +                            iba_buf_put (recv_buf);
>> +
>> +                            /* Get a new receive buffer to post to the 
>> receive queue */
>> +                            recv_buf = recv_token_recv_buf_get (instance);
>> +                    }
>> +                    recv_token_recv_buf_post (instance, recv_buf);
>>              }
>>      }
>> 
>> @@ -613,7 +634,7 @@ static int recv_token_accept_destroy (struct 
>> totemiba_instance *instance)
>> 
>>      rdma_destroy_qp (instance->recv_token_cma_id);
>> 
>> -    recv_token_recv_buf_post_destroy (instance);
>> +    buf_list_destroy (&instance->recv_token_recv_buf_head);
>> 
>>      ibv_destroy_cq (instance->recv_token_send_cq);
>> 
>> @@ -779,7 +800,7 @@ static int send_token_cq_send_event_fn (hdb_handle_t 
>> poll_handle,  int events,
>>      res = ibv_poll_cq (instance->send_token_send_cq, 32, wc);
>>      if (res > 0) {
>>              for (i = 0; i < res; i++) {
>> -                    token_send_buf_put (instance, wrid2void(wc[i].wr_id));
>> +                    iba_buf_put (wrid2void (wc[i].wr_id));
>>              }
>>      }
>> 
>> @@ -802,7 +823,9 @@ static int send_token_cq_recv_event_fn (hdb_handle_t 
>> poll_handle,  int events,
>>      res = ibv_poll_cq (instance->send_token_recv_cq, 32, wc);
>>      if (res > 0) {
>>              for (i = 0; i < res; i++) {
>> -                    iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> +                    struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
>> +                    iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
>> +                    iba_buf_put (recv_buf);
>>              }
>>      }
>> 
>> @@ -1020,7 +1043,7 @@ static int send_token_unbind (struct totemiba_instance 
>> *instance)
>>      ibv_destroy_cq (instance->send_token_recv_cq);
>>      ibv_destroy_comp_channel (instance->send_token_send_completion_channel);
>>      ibv_destroy_comp_channel (instance->send_token_recv_completion_channel);
>> -    token_send_buf_destroy (instance);
>> +    buf_list_destroy (&instance->token_send_buf_head);
>>      ibv_dealloc_pd (instance->send_token_pd);
>>      rdma_destroy_id (instance->send_token_cma_id);
>>      rdma_destroy_event_channel (instance->send_token_channel);
>> @@ -1283,6 +1306,10 @@ int totemiba_initialize (
>>      struct totemiba_instance *instance;
>>      int res = 0;
>> 
>> +    if (!free_list) {
>> +            free_list = totembuf_list_init (sizeof (struct iba_buf));
>> +    }
>> +
>>      instance = malloc (sizeof (struct totemiba_instance));
>>      if (instance == NULL) {
>>              return (-1);
>> @@ -1319,21 +1346,36 @@ int totemiba_initialize (
>> 
>> void *totemiba_buffer_alloc (void)
>> {
>> -    return malloc (MAX_MTU_SIZE);
>> +    struct iba_buf *send_buf = iba_buf_alloc();
>> +    if (send_buf == NULL) {
>> +            return (NULL);
>> +    }
>> +    return (send_buf->b.p.packet);
>> }
>> 
>> void *totemiba_buffer_retain (void *ptr)
>> {
>> -    void *new_buf = totemiba_buffer_alloc();
>> -    if (new_buf) {
>> -            memcpy (new_buf, ptr, FRAME_SIZE_MAX);
>> +    struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ptr);
>> +    assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>> +
>> +    send_buf = totembuf_retain (send_buf);
>> +    if (send_buf == NULL) {
>> +            return (NULL);
>>      }
>> -    return new_buf;
>> +    return (send_buf->b.p.packet);
>> }
>> 
>> void totemiba_buffer_release (void *ptr)
>> {
>> -    return free (ptr);
>> +    struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ptr);
>> +    assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>> +
>> +    if (send_buf->mr) {
>> +            /* Still awaiting transmission */
>> +            totembuf_release (send_buf);
>> +    } else {
>> +            totembuf_dealloc (send_buf);
>> +    }
>> }
>> 
>> int totemiba_processor_count_set (
>> @@ -1367,15 +1409,12 @@ int totemiba_token_send (
>>      int res = 0;
>>      struct ibv_send_wr send_wr, *failed_send_wr;
>>      struct ibv_sge sge;
>> -    void *msg;
>> -    struct send_buf *send_buf;
>> +    struct iba_buf *send_buf;
>> 
>> -    send_buf = token_send_buf_get (instance);
>> +    send_buf = token_send_buf_get (instance, ms, msg_len);
>>      if (send_buf == NULL) {
>>              return (-1);
>>      }
>> -    msg = send_buf->buffer;
>> -    memcpy (msg, ms, msg_len);
>> 
>>      send_wr.next = NULL;
>>      send_wr.sg_list = &sge;
>> @@ -1390,7 +1429,7 @@ int totemiba_token_send (
>> 
>>      sge.length = msg_len;
>>      sge.lkey = send_buf->mr->lkey;
>> -    sge.addr = (uintptr_t)msg;
>> +    sge.addr = (uintptr_t)send_buf->b.p.packet;
>> 
>>      res = ibv_post_send (instance->send_token_cma_id->qp, &send_wr, 
>> &failed_send_wr);
>> 
>> @@ -1406,16 +1445,14 @@ int totemiba_mcast_flush_send (
>>      int res = 0;
>>      struct ibv_send_wr send_wr, *failed_send_wr;
>>      struct ibv_sge sge;
>> -    void *msg;
>> -    struct send_buf *send_buf;
>> +    struct iba_buf *send_buf;
>> 
>> -    send_buf = mcast_send_buf_get (instance);
>> -    if (send_buf == NULL) {
>> +    send_buf = send_buf_new (instance, ms, msg_len);
>> +    if (send_buf == NULL ||
>> +        iba_buf_register (instance, send_buf, instance->mcast_pd)) {
>>              return (-1);
>>      }
>> 
>> -    msg = send_buf->buffer;
>> -    memcpy (msg, ms, msg_len);
>>      send_wr.next = NULL;
>>      send_wr.sg_list = &sge;
>>      send_wr.num_sge = 1;
>> @@ -1429,7 +1466,7 @@ int totemiba_mcast_flush_send (
>> 
>>      sge.length = msg_len;
>>      sge.lkey = send_buf->mr->lkey;
>> -    sge.addr = (uintptr_t)msg;
>> +    sge.addr = (uintptr_t)send_buf->b.p.packet;
>> 
>>      res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, 
>> &failed_send_wr);
>>      return (res);
>> @@ -1444,16 +1481,13 @@ int totemiba_mcast_noflush_send (
>>      int res = 0;
>>      struct ibv_send_wr send_wr, *failed_send_wr;
>>      struct ibv_sge sge;
>> -    void *msg;
>> -    struct send_buf *send_buf;
>> +    struct iba_buf *send_buf;
>> 
>> -    send_buf = mcast_send_buf_get (instance);
>> +    send_buf = mcast_send_buf_get (instance, ms, msg_len);
>>      if (send_buf == NULL) {
>>              return (-1);
>>      }
>> 
>> -    msg = send_buf->buffer;
>> -    memcpy (msg, ms, msg_len);
>>      send_wr.next = NULL;
>>      send_wr.sg_list = &sge;
>>      send_wr.num_sge = 1;
>> @@ -1467,7 +1501,7 @@ int totemiba_mcast_noflush_send (
>> 
>>      sge.length = msg_len;
>>      sge.lkey = send_buf->mr->lkey;
>> -    sge.addr = (uintptr_t)msg;
>> +    sge.addr = (uintptr_t)send_buf->b.p.packet;
>> 
>>      res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr, 
>> &failed_send_wr);
>>      return (res);
>> 
>> _______________________________________________
>> Openais mailing list
>> [email protected]
>> https://lists.linux-foundation.org/mailman/listinfo/openais
> 

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to