Ah, thanks! I think the problem is that totemiba_buffer_alloc() should do
totembuf_retain() on a new buffer before returning it. I'll fix that up.
cheers,
Zane.
On 2011/04/18, at 16:24, Steven Dake wrote:
> This code asserts unless iba_buf_put's totembuf_dealloc is commented
> out. The assertion is:
> #0 0x0000003676c330c5 in raise () from /lib64/libc.so.6
> #1 0x0000003676c34a76 in abort () from /lib64/libc.so.6
> #2 0x0000003676c2b905 in __assert_fail () from /lib64/libc.so.6
> #3 0x00007ffff7dd550e in totembuf_release (ptr=<value optimized out>)
> at totembuf.c:193
> #4 0x00007ffff7dcbc71 in totemsrp_buffer_release (instance=0x7ffff5db9010,
> msg=<value optimized out>, msg_len=<value optimized out>,
> endian_conversion_needed=<value optimized out>) at totemsrp.c:1383
> #5 messages_free (instance=0x7ffff5db9010, msg=<value optimized out>,
> msg_len=<value optimized out>,
> endian_conversion_needed=<value optimized out>) at totemsrp.c:2336
> #6 message_handler_orf_token (instance=0x7ffff5db9010,
> msg=<value optimized out>, msg_len=<value optimized out>,
> endian_conversion_needed=<value optimized out>) at totemsrp.c:3487
> #7 0x00007ffff7dc7db3 in rrp_deliver_fn (context=0x63a290, msg=0x825134,
> msg_len=110) at totemrrp.c:1454
> #8 0x00007ffff7ddb863 in iba_deliver_fn (poll_handle=<value optimized
> out>,
> events=<value optimized out>, suck=<value optimized out>,
> context=0x682550) at totemiba.c:467
> #9 recv_token_cq_recv_event_fn (poll_handle=<value optimized out>,
> events=<value optimized out>, suck=<value optimized out>,
> context=0x682550) at totemiba.c:613
> #10 0x00007ffff7dc1c02 in poll_run (handle=150346236434579456)
> #11 0x0000000000407a2b in main (argc=<value optimized out>,
> argv=<value optimized out>, envp=0x7fffffffe548) at main.c:1874
>
> I wasn't able to understand why this happens, only that it appears
> dealloc will be called twice during this condition resulting in a double
> free.
>
> On 04/16/2011 02:11 PM, Zane Bitter wrote:
>> (Work in Progress)
>> Note that this patch is completely untested to date.
>> ---
>> exec/totemiba.c | 388
>> ++++++++++++++++++++++++++++++-------------------------
>> 1 files changed, 211 insertions(+), 177 deletions(-)
>>
>> diff --git a/exec/totemiba.c b/exec/totemiba.c
>> index 5aa2787..8ef52c9 100644
>> --- a/exec/totemiba.c
>> +++ b/exec/totemiba.c
>> @@ -59,6 +59,7 @@
>> #include <stdio.h>
>> #include <string.h>
>> #include <stdlib.h>
>> +#include <stddef.h>
>> #include <sys/types.h>
>> #include <sys/socket.h>
>> #include <netdb.h>
>> @@ -73,6 +74,7 @@
>> #include <corosync/totem/coropoll.h>
>> #define LOGSYS_UTILS_ONLY 1
>> #include <corosync/engine/logsys.h>
>> +#include "totembuf.h"
>> #include "totemiba.h"
>> #include "wthread.h"
>>
>> @@ -82,6 +84,8 @@
>>
>> #define MAX_MTU_SIZE 4096
>>
>> +#define TOTEMIBA_BUFFER_MAGIC 0xbeeffeedu
>> +
>> struct totemiba_instance {
>> struct sockaddr bind_addr;
>>
>> @@ -195,12 +199,6 @@ struct totemiba_instance {
>>
>> int totemiba_subsys_id;
>>
>> - struct list_head mcast_send_buf_free;
>> -
>> - struct list_head token_send_buf_free;
>> -
>> - struct list_head mcast_send_buf_head;
>> -
>> struct list_head token_send_buf_head;
>>
>> struct list_head recv_token_recv_buf_head;
>> @@ -218,22 +216,22 @@ do {
>> \
>> LOGSYS_RECID_LOG), \
>> __FUNCTION__, __FILE__, __LINE__, \
>> (const char *)format, ##args); \
>> -} while (0);
>> +} while (0)
>>
>> -struct recv_buf {
>> - struct list_head list_all;
>> - struct ibv_recv_wr recv_wr;
>> - struct ibv_sge sge;
>> +struct iba_buf {
>> + uint32_t magic;
>> + struct list_head list_active;
>> struct ibv_mr *mr;
>> - char buffer[MAX_MTU_SIZE];
>> + union {
>> + char buffer[MAX_MTU_SIZE];
>> + struct {
>> + struct ibv_grh header;
>> + char packet[0];
>> + } p;
>> + } b;
>> };
>>
>> -struct send_buf {
>> - struct list_head list_free;
>> - struct list_head list_all;
>> - struct ibv_mr *mr;
>> - char buffer[MAX_MTU_SIZE];
>> -};
>> +#define IBA_BUF_FROM_PACKET(PACKET) (struct iba_buf *)(((char *)(PACKET)) -
>> offsetof(struct iba_buf, b.p.packet))
>>
>> static hdb_handle_t
>> void2wrid (void *v) { union u u; u.v = v; return u.wr_id; }
>> @@ -241,228 +239,232 @@ void2wrid (void *v) { union u u; u.v = v; return
>> u.wr_id; }
>> static void *
>> wrid2void (uint64_t wr_id) { union u u; u.wr_id = wr_id; return u.v; }
>>
>> +static struct totembuf_list *free_list = NULL;
>> +
>> static void totemiba_instance_initialize (struct totemiba_instance *instance)
>> {
>> memset (instance, 0, sizeof (struct totemiba_instance));
>> - list_init (&instance->mcast_send_buf_free);
>> - list_init (&instance->token_send_buf_free);
>> - list_init (&instance->mcast_send_buf_head);
>> list_init (&instance->token_send_buf_head);
>> list_init (&instance->recv_token_recv_buf_head);
>> }
>>
>> -static inline struct send_buf *mcast_send_buf_get (
>> - struct totemiba_instance *instance)
>> +static struct iba_buf *iba_buf_alloc (void)
>> {
>> - struct send_buf *send_buf;
>> -
>> - if (list_empty (&instance->mcast_send_buf_free) == 0) {
>> - send_buf = list_entry (instance->mcast_send_buf_free.next,
>> struct send_buf, list_free);
>> - list_del (&send_buf->list_free);
>> - return (send_buf);
>> - }
>> + struct iba_buf *new_buf = totembuf_alloc (free_list);
>>
>> - send_buf = malloc (sizeof (struct send_buf));
>> - if (send_buf == NULL) {
>> + if (new_buf == NULL) {
>> return (NULL);
>> }
>> - send_buf->mr = ibv_reg_mr (instance->mcast_pd,
>> - send_buf->buffer,
>> +
>> + new_buf->magic = TOTEMIBA_BUFFER_MAGIC;
>> + list_init (&new_buf->list_active);
>> + new_buf->mr = NULL;
>> +
>> + return (new_buf);
>> +}
>> +
>> +static int iba_buf_register (
>> + struct totemiba_instance *instance,
>> + struct iba_buf *buf,
>> + struct ibv_pd *pd)
>> +{
>> + buf->mr = ibv_reg_mr (pd, buf->b.buffer,
>> 2048, IBV_ACCESS_LOCAL_WRITE);
>> - if (send_buf->mr == NULL) {
>> + if (buf->mr == NULL) {
>> log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory
>> range\n");
>> - return (NULL);
>> + if (!totembuf_is_retained (buf)) {
>> + totembuf_dealloc (buf);
>> + }
>> + return (-1);
>> }
>> - list_init (&send_buf->list_all);
>> - list_add_tail (&send_buf->list_all, &instance->mcast_send_buf_head);
>> -
>> - return (send_buf);
>> + return (0);
>> }
>>
>> -static inline void mcast_send_buf_put (
>> +static inline struct iba_buf *send_buf_new (
>> struct totemiba_instance *instance,
>> - struct send_buf *send_buf)
>> + const void *data,
>> + size_t length)
>> {
>> - list_init (&send_buf->list_free);
>> - list_add_tail (&send_buf->list_free, &instance->mcast_send_buf_free);
>> + struct iba_buf *new_buf = iba_buf_alloc();
>> +
>> + if (new_buf == NULL) {
>> + return (NULL);
>> + }
>> +
>> + assert (length <= (sizeof (new_buf->b.buffer) - sizeof
>> (new_buf->b.p.header)));
>> + memcpy (new_buf->b.p.packet, data, length);
>> +
>> + return (new_buf);
>> }
>>
>> -static inline struct send_buf *token_send_buf_get (
>> - struct totemiba_instance *instance)
>> +static inline struct iba_buf *mcast_send_buf_get (
>> + struct totemiba_instance *instance,
>> + const void *ms,
>> + size_t length)
>> {
>> - struct send_buf *send_buf;
>> + struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ms);
>> + assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>>
>> - if (list_empty (&instance->token_send_buf_free) == 0) {
>> - send_buf = list_entry (instance->token_send_buf_free.next,
>> struct send_buf, list_free);
>> - list_del (&send_buf->list_free);
>> - return (send_buf);
>> + if (send_buf->mr) {
>> + /* Buffer is already enqueued. Make a copy. */
>> + send_buf = send_buf_new (instance, send_buf->b.p.packet,
>> length);
>> }
>>
>> - send_buf = malloc (sizeof (struct send_buf));
>> - if (send_buf == NULL) {
>> + if (iba_buf_register (instance, send_buf, instance->mcast_pd)) {
>> return (NULL);
>> }
>> - send_buf->mr = ibv_reg_mr (instance->send_token_pd,
>> - send_buf->buffer,
>> - 2048, IBV_ACCESS_LOCAL_WRITE);
>> - if (send_buf->mr == NULL) {
>> - log_printf (LOGSYS_LEVEL_ERROR, "couldn't register memory
>> range\n");
>> - return (NULL);
>> - }
>> - list_init (&send_buf->list_all);
>> - list_add_tail (&send_buf->list_all, &instance->token_send_buf_head);
>> -
>> +
>> return (send_buf);
>> }
>>
>> -static inline void token_send_buf_destroy (struct totemiba_instance
>> *instance)
>> +static void iba_buf_put (struct iba_buf *iba_buf)
>> {
>> - struct list_head *list;
>> - struct send_buf *send_buf;
>> -
>> - for (list = instance->token_send_buf_head.next; list !=
>> &instance->token_send_buf_head;) {
>> - send_buf = list_entry (list, struct send_buf, list_all);
>> - list = list->next;
>> - ibv_dereg_mr (send_buf->mr);
>> - free (send_buf);
>> + list_del (&iba_buf->list_active);
>> + list_init (&iba_buf->list_active);
>> + ibv_dereg_mr (iba_buf->mr);
>> + iba_buf->mr = NULL;
>> + if (!totembuf_is_retained (iba_buf)) {
>> + totembuf_dealloc (iba_buf);
>> }
>> -
>> - list_init (&instance->token_send_buf_free);
>> - list_init (&instance->token_send_buf_head);
>> }
>>
>> -static inline void token_send_buf_put (
>> +static inline struct iba_buf *token_send_buf_get (
>> struct totemiba_instance *instance,
>> - struct send_buf *send_buf)
>> + const void *ms,
>> + size_t length)
>> {
>> - list_init (&send_buf->list_free);
>> - list_add_tail (&send_buf->list_free, &instance->token_send_buf_free);
>> + struct iba_buf *send_buf;
>> +
>> + send_buf = send_buf_new (instance, ms, length);
>> +
>> + if (iba_buf_register (instance, send_buf, instance->send_token_pd)) {
>> + return (NULL);
>> + }
>> +
>> + list_add_tail (&send_buf->list_active, &instance->token_send_buf_head);
>> +
>> + return (send_buf);
>> }
>>
>> -static inline struct recv_buf *recv_token_recv_buf_create (
>> +static inline struct iba_buf *recv_token_recv_buf_get (
>> struct totemiba_instance *instance)
>> {
>> - struct recv_buf *recv_buf;
>> + struct iba_buf *recv_buf;
>>
>> - recv_buf = malloc (sizeof (struct recv_buf));
>> + recv_buf = iba_buf_alloc();
>> if (recv_buf == NULL) {
>> return (NULL);
>> }
>>
>> - recv_buf->mr = ibv_reg_mr (instance->recv_token_pd, &recv_buf->buffer,
>> - 2048,
>> - IBV_ACCESS_LOCAL_WRITE);
>> -
>> - recv_buf->recv_wr.next = NULL;
>> - recv_buf->recv_wr.sg_list = &recv_buf->sge;
>> - recv_buf->recv_wr.num_sge = 1;
>> - recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
>> + if (iba_buf_register (instance, recv_buf, instance->recv_token_pd)) {
>> + return (NULL);
>> + }
>>
>> - recv_buf->sge.length = 2048;
>> - recv_buf->sge.lkey = recv_buf->mr->lkey;
>> - recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
>> + list_add (&recv_buf->list_active, &instance->recv_token_recv_buf_head);
>>
>> - list_init (&recv_buf->list_all);
>> - list_add (&recv_buf->list_all, &instance->recv_token_recv_buf_head);
>> return (recv_buf);
>> }
>>
>> -static inline int recv_token_recv_buf_post (struct totemiba_instance
>> *instance, struct recv_buf *recv_buf)
>> +static inline int recv_token_recv_buf_post (struct totemiba_instance
>> *instance, struct iba_buf *recv_buf)
>> {
>> - struct ibv_recv_wr *fail_recv;
>> + struct ibv_recv_wr recv_wr, *fail_recv;
>> + struct ibv_sge sge;
>> int res;
>>
>> - res = ibv_post_recv (instance->recv_token_cma_id->qp,
>> &recv_buf->recv_wr, &fail_recv);
>> + recv_wr.next = NULL;
>> + recv_wr.sg_list = &sge;
>> + recv_wr.num_sge = 1;
>> + recv_wr.wr_id = (uintptr_t)recv_buf;
>> +
>> + sge.length = 2048;
>> + sge.lkey = recv_buf->mr->lkey;
>> + sge.addr = (uintptr_t)recv_buf->b.buffer;
>> +
>> + res = ibv_post_recv (instance->recv_token_cma_id->qp, &recv_wr,
>> &fail_recv);
>>
>> return (res);
>> }
>>
>> static inline void recv_token_recv_buf_post_initial (struct
>> totemiba_instance *instance)
>> {
>> - struct recv_buf *recv_buf;
>> + struct iba_buf *recv_buf;
>> unsigned int i;
>>
>> for (i = 0; i < TOTAL_READ_POSTS; i++) {
>> - recv_buf = recv_token_recv_buf_create (instance);
>> + recv_buf = recv_token_recv_buf_get (instance);
>>
>> - recv_token_recv_buf_post (instance, recv_buf);
>> + if (recv_buf) {
>> + recv_token_recv_buf_post (instance, recv_buf);
>> + }
>> }
>> }
>>
>> -static inline void recv_token_recv_buf_post_destroy (
>> - struct totemiba_instance *instance)
>> +static inline void buf_list_destroy (struct list_head *head)
>> {
>> - struct recv_buf *recv_buf;
>> - struct list_head *list;
>> + struct list_head *list = head->next;
>>
>> - for (list = instance->recv_token_recv_buf_head.next;
>> - list != &instance->recv_token_recv_buf_head;) {
>> -
>> - recv_buf = list_entry (list, struct recv_buf, list_all);
>> + while (list != head) {
>> + struct iba_buf *iba_buf = list_entry (list, struct iba_buf,
>> list_active);
>> list = list->next;
>> - ibv_dereg_mr (recv_buf->mr);
>> - free (recv_buf);
>> + iba_buf_put (iba_buf);
>> }
>> - list_init (&instance->recv_token_recv_buf_head);
>> }
>>
>> -static inline struct recv_buf *mcast_recv_buf_create (struct
>> totemiba_instance *instance)
>> +static inline struct iba_buf *mcast_recv_buf_get (
>> + struct totemiba_instance *instance)
>> {
>> - struct recv_buf *recv_buf;
>> - struct ibv_mr *mr;
>> + struct iba_buf *recv_buf;
>>
>> - recv_buf = malloc (sizeof (struct recv_buf));
>> + recv_buf = iba_buf_alloc();
>> if (recv_buf == NULL) {
>> return (NULL);
>> }
>>
>> - mr = ibv_reg_mr (instance->mcast_pd, &recv_buf->buffer,
>> - 2048,
>> - IBV_ACCESS_LOCAL_WRITE);
>> -
>> - recv_buf->recv_wr.next = NULL;
>> - recv_buf->recv_wr.sg_list = &recv_buf->sge;
>> - recv_buf->recv_wr.num_sge = 1;
>> - recv_buf->recv_wr.wr_id = (uintptr_t)recv_buf;
>> -
>> - recv_buf->sge.length = 2048;
>> - recv_buf->sge.lkey = mr->lkey;
>> - recv_buf->sge.addr = (uintptr_t)recv_buf->buffer;
>> + if (iba_buf_register (instance, recv_buf, instance->mcast_pd)) {
>> + return (NULL);
>> + }
>>
>> return (recv_buf);
>> }
>>
>> -static inline int mcast_recv_buf_post (struct totemiba_instance *instance,
>> struct recv_buf *recv_buf)
>> +static inline int mcast_recv_buf_post (struct totemiba_instance *instance,
>> struct iba_buf *recv_buf)
>> {
>> - struct ibv_recv_wr *fail_recv;
>> + struct ibv_recv_wr recv_wr, *fail_recv;
>> + struct ibv_sge sge;
>> int res;
>>
>> - res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_buf->recv_wr,
>> &fail_recv);
>> + recv_wr.next = NULL;
>> + recv_wr.sg_list = &sge;
>> + recv_wr.num_sge = 1;
>> + recv_wr.wr_id = (uintptr_t)recv_buf;
>> +
>> + sge.length = 2048;
>> + sge.lkey = recv_buf->mr->lkey;
>> + sge.addr = (uintptr_t)recv_buf->b.buffer;
>> +
>> + res = ibv_post_recv (instance->mcast_cma_id->qp, &recv_wr, &fail_recv);
>>
>> return (res);
>> }
>>
>> static inline void mcast_recv_buf_post_initial (struct totemiba_instance
>> *instance)
>> {
>> - struct recv_buf *recv_buf;
>> + struct iba_buf *recv_buf;
>> unsigned int i;
>>
>> for (i = 0; i < TOTAL_READ_POSTS; i++) {
>> - recv_buf = mcast_recv_buf_create (instance);
>> + recv_buf = mcast_recv_buf_get (instance);
>>
>> mcast_recv_buf_post (instance, recv_buf);
>> }
>> }
>>
>> -static inline void iba_deliver_fn (struct totemiba_instance *instance,
>> uint64_t wr_id, uint32_t bytes)
>> +static inline void iba_deliver_fn (
>> + struct totemiba_instance *instance,
>> + const struct iba_buf *recv_buf,
>> + uint32_t bytes)
>> {
>> - const char *addr;
>> - const struct recv_buf *recv_buf;
>> -
>> - recv_buf = wrid2void(wr_id);
>> - addr = &recv_buf->buffer[sizeof (struct ibv_grh)];
>> -
>> - instance->totemiba_deliver_fn (instance->rrp_context, addr, bytes);
>> + instance->totemiba_deliver_fn (instance->rrp_context,
>> recv_buf->b.p.packet, bytes);
>> }
>>
>> static int mcast_cq_send_event_fn (hdb_handle_t poll_handle, int events,
>> int suck, void *context)
>> @@ -481,7 +483,7 @@ static int mcast_cq_send_event_fn (hdb_handle_t
>> poll_handle, int events, int s
>> res = ibv_poll_cq (instance->mcast_send_cq, 32, wc);
>> if (res > 0) {
>> for (i = 0; i < res; i++) {
>> - mcast_send_buf_put (instance, wrid2void(wc[i].wr_id));
>> + iba_buf_put (wrid2void (wc[i].wr_id));
>> }
>> }
>>
>> @@ -504,8 +506,17 @@ static int mcast_cq_recv_event_fn (hdb_handle_t
>> poll_handle, int events, int s
>> res = ibv_poll_cq (instance->mcast_recv_cq, 64, wc);
>> if (res > 0) {
>> for (i = 0; i < res; i++) {
>> - iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> - mcast_recv_buf_post (instance, wrid2void(wc[i].wr_id));
>> + struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
>> + iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
>> +
>> + if (totembuf_is_retained (recv_buf)) {
>> + /* Buffer has been added to the send queue */
>> + iba_buf_put (recv_buf);
>> +
>> + /* Get a new receive buffer to post to the
>> receive queue */
>> + recv_buf = mcast_recv_buf_get (instance);
>> + }
>> + mcast_recv_buf_post (instance, recv_buf);
>> }
>> }
>>
>> @@ -573,8 +584,9 @@ static int recv_token_cq_send_event_fn (hdb_handle_t
>> poll_handle, int events,
>> res = ibv_poll_cq (instance->recv_token_send_cq, 32, wc);
>> if (res > 0) {
>> for (i = 0; i < res; i++) {
>> - iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> - ibv_dereg_mr (wrid2void(wc[i].wr_id));
>> + struct iba_buf *send_buf = wrid2void (wc[i].wr_id);
>> + iba_deliver_fn (instance, send_buf, wc[i].byte_len);
>> + iba_buf_put (send_buf);
>> }
>> }
>>
>> @@ -597,8 +609,17 @@ static int recv_token_cq_recv_event_fn (hdb_handle_t
>> poll_handle, int events,
>> res = ibv_poll_cq (instance->recv_token_recv_cq, 32, wc);
>> if (res > 0) {
>> for (i = 0; i < res; i++) {
>> - iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> - recv_token_recv_buf_post (instance,
>> wrid2void(wc[i].wr_id));
>> + struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
>> + iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
>> +
>> + if (totembuf_is_retained (recv_buf)) {
>> + /* Buffer has been added to the send queue */
>> + iba_buf_put (recv_buf);
>> +
>> + /* Get a new receive buffer to post to the
>> receive queue */
>> + recv_buf = recv_token_recv_buf_get (instance);
>> + }
>> + recv_token_recv_buf_post (instance, recv_buf);
>> }
>> }
>>
>> @@ -613,7 +634,7 @@ static int recv_token_accept_destroy (struct
>> totemiba_instance *instance)
>>
>> rdma_destroy_qp (instance->recv_token_cma_id);
>>
>> - recv_token_recv_buf_post_destroy (instance);
>> + buf_list_destroy (&instance->recv_token_recv_buf_head);
>>
>> ibv_destroy_cq (instance->recv_token_send_cq);
>>
>> @@ -779,7 +800,7 @@ static int send_token_cq_send_event_fn (hdb_handle_t
>> poll_handle, int events,
>> res = ibv_poll_cq (instance->send_token_send_cq, 32, wc);
>> if (res > 0) {
>> for (i = 0; i < res; i++) {
>> - token_send_buf_put (instance, wrid2void(wc[i].wr_id));
>> + iba_buf_put (wrid2void (wc[i].wr_id));
>> }
>> }
>>
>> @@ -802,7 +823,9 @@ static int send_token_cq_recv_event_fn (hdb_handle_t
>> poll_handle, int events,
>> res = ibv_poll_cq (instance->send_token_recv_cq, 32, wc);
>> if (res > 0) {
>> for (i = 0; i < res; i++) {
>> - iba_deliver_fn (instance, wc[i].wr_id, wc[i].byte_len);
>> + struct iba_buf *recv_buf = wrid2void (wc[i].wr_id);
>> + iba_deliver_fn (instance, recv_buf, wc[i].byte_len);
>> + iba_buf_put (recv_buf);
>> }
>> }
>>
>> @@ -1020,7 +1043,7 @@ static int send_token_unbind (struct totemiba_instance
>> *instance)
>> ibv_destroy_cq (instance->send_token_recv_cq);
>> ibv_destroy_comp_channel (instance->send_token_send_completion_channel);
>> ibv_destroy_comp_channel (instance->send_token_recv_completion_channel);
>> - token_send_buf_destroy (instance);
>> + buf_list_destroy (&instance->token_send_buf_head);
>> ibv_dealloc_pd (instance->send_token_pd);
>> rdma_destroy_id (instance->send_token_cma_id);
>> rdma_destroy_event_channel (instance->send_token_channel);
>> @@ -1283,6 +1306,10 @@ int totemiba_initialize (
>> struct totemiba_instance *instance;
>> int res = 0;
>>
>> + if (!free_list) {
>> + free_list = totembuf_list_init (sizeof (struct iba_buf));
>> + }
>> +
>> instance = malloc (sizeof (struct totemiba_instance));
>> if (instance == NULL) {
>> return (-1);
>> @@ -1319,21 +1346,36 @@ int totemiba_initialize (
>>
>> void *totemiba_buffer_alloc (void)
>> {
>> - return malloc (MAX_MTU_SIZE);
>> + struct iba_buf *send_buf = iba_buf_alloc();
>> + if (send_buf == NULL) {
>> + return (NULL);
>> + }
>> + return (send_buf->b.p.packet);
>> }
>>
>> void *totemiba_buffer_retain (void *ptr)
>> {
>> - void *new_buf = totemiba_buffer_alloc();
>> - if (new_buf) {
>> - memcpy (new_buf, ptr, FRAME_SIZE_MAX);
>> + struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ptr);
>> + assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>> +
>> + send_buf = totembuf_retain (send_buf);
>> + if (send_buf == NULL) {
>> + return (NULL);
>> }
>> - return new_buf;
>> + return (send_buf->b.p.packet);
>> }
>>
>> void totemiba_buffer_release (void *ptr)
>> {
>> - return free (ptr);
>> + struct iba_buf *send_buf = IBA_BUF_FROM_PACKET(ptr);
>> + assert (send_buf->magic == TOTEMIBA_BUFFER_MAGIC);
>> +
>> + if (send_buf->mr) {
>> + /* Still awaiting transmission */
>> + totembuf_release (send_buf);
>> + } else {
>> + totembuf_dealloc (send_buf);
>> + }
>> }
>>
>> int totemiba_processor_count_set (
>> @@ -1367,15 +1409,12 @@ int totemiba_token_send (
>> int res = 0;
>> struct ibv_send_wr send_wr, *failed_send_wr;
>> struct ibv_sge sge;
>> - void *msg;
>> - struct send_buf *send_buf;
>> + struct iba_buf *send_buf;
>>
>> - send_buf = token_send_buf_get (instance);
>> + send_buf = token_send_buf_get (instance, ms, msg_len);
>> if (send_buf == NULL) {
>> return (-1);
>> }
>> - msg = send_buf->buffer;
>> - memcpy (msg, ms, msg_len);
>>
>> send_wr.next = NULL;
>> send_wr.sg_list = &sge;
>> @@ -1390,7 +1429,7 @@ int totemiba_token_send (
>>
>> sge.length = msg_len;
>> sge.lkey = send_buf->mr->lkey;
>> - sge.addr = (uintptr_t)msg;
>> + sge.addr = (uintptr_t)send_buf->b.p.packet;
>>
>> res = ibv_post_send (instance->send_token_cma_id->qp, &send_wr,
>> &failed_send_wr);
>>
>> @@ -1406,16 +1445,14 @@ int totemiba_mcast_flush_send (
>> int res = 0;
>> struct ibv_send_wr send_wr, *failed_send_wr;
>> struct ibv_sge sge;
>> - void *msg;
>> - struct send_buf *send_buf;
>> + struct iba_buf *send_buf;
>>
>> - send_buf = mcast_send_buf_get (instance);
>> - if (send_buf == NULL) {
>> + send_buf = send_buf_new (instance, ms, msg_len);
>> + if (send_buf == NULL ||
>> + iba_buf_register (instance, send_buf, instance->mcast_pd)) {
>> return (-1);
>> }
>>
>> - msg = send_buf->buffer;
>> - memcpy (msg, ms, msg_len);
>> send_wr.next = NULL;
>> send_wr.sg_list = &sge;
>> send_wr.num_sge = 1;
>> @@ -1429,7 +1466,7 @@ int totemiba_mcast_flush_send (
>>
>> sge.length = msg_len;
>> sge.lkey = send_buf->mr->lkey;
>> - sge.addr = (uintptr_t)msg;
>> + sge.addr = (uintptr_t)send_buf->b.p.packet;
>>
>> res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr,
>> &failed_send_wr);
>> return (res);
>> @@ -1444,16 +1481,13 @@ int totemiba_mcast_noflush_send (
>> int res = 0;
>> struct ibv_send_wr send_wr, *failed_send_wr;
>> struct ibv_sge sge;
>> - void *msg;
>> - struct send_buf *send_buf;
>> + struct iba_buf *send_buf;
>>
>> - send_buf = mcast_send_buf_get (instance);
>> + send_buf = mcast_send_buf_get (instance, ms, msg_len);
>> if (send_buf == NULL) {
>> return (-1);
>> }
>>
>> - msg = send_buf->buffer;
>> - memcpy (msg, ms, msg_len);
>> send_wr.next = NULL;
>> send_wr.sg_list = &sge;
>> send_wr.num_sge = 1;
>> @@ -1467,7 +1501,7 @@ int totemiba_mcast_noflush_send (
>>
>> sge.length = msg_len;
>> sge.lkey = send_buf->mr->lkey;
>> - sge.addr = (uintptr_t)msg;
>> + sge.addr = (uintptr_t)send_buf->b.p.packet;
>>
>> res = ibv_post_send (instance->mcast_cma_id->qp, &send_wr,
>> &failed_send_wr);
>> return (res);
>>
>> _______________________________________________
>> Openais mailing list
>> [email protected]
>> https://lists.linux-foundation.org/mailman/listinfo/openais
>
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais