see previous mail on subject patch included
Index: test/magbench.c =================================================================== --- test/magbench.c (revision 0) +++ test/magbench.c (revision 0) @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2008, 2009 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake ([email protected]) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <config.h> + +#include <stdio.h> +#include <stdint.h> +#include <string.h> +#include <sys/time.h> +#include <time.h> +#include <corosync/engine/logsys.h> +#include <corosync/magalloc.h> + +DECLARE_MAGAZINE (magbench, 10000); + +#define ITERATIONS 10000000 + +static struct timeval tv1, tv2, tv_elapsed; + +#define timersub(a, b, result) \ +do { \ + (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ + (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ + if ((result)->tv_usec < 0) { \ + --(result)->tv_sec; \ + (result)->tv_usec += 1000000; \ + } \ +} while (0) + +static void bm_start (void) +{ + gettimeofday (&tv1, NULL); +} +static void bm_finish (const char *operation) +{ + gettimeofday (&tv2, NULL); + timersub (&tv2, &tv1, &tv_elapsed); + + if (strlen (operation) > 22) { + printf ("%s\t\t", operation); + } else { + printf ("%s\t\t\t", operation); + } + printf ("%9.3f operations/sec\n", + ((float)ITERATIONS) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0))); +} + +int main (void) +{ + int i, j; + void **allocs[20]; + + bm_start(); + for (j = 0; j < ITERATIONS; j++) { + for (i = 0; i < 5; i++) { + allocs[i] = mag_alloc (&magbench); + } + for (i = 0; i < 5; i++) { + mag_free (&magbench, allocs[i]); + } + } + bm_finish ("mag_alloc and free of 5 items"); + + bm_start(); + for (j = 0; j < ITERATIONS; j++) { + for (i = 0; i < 5; i++) { + allocs[i] = malloc (10000); + } + for (i = 0; i < 5; i++) { + free (allocs[i]); + } + } + bm_finish ("malloc/free of 5 items"); +exit (1); + +}; Index: test/Makefile.am =================================================================== --- test/Makefile.am (revision 2058) +++ test/Makefile.am (working copy) @@ -34,7 +34,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include noinst_PROGRAMS = testevs evsbench evsverify testcpg testcpg2 cpgbench testconfdb \ - logsysbench logsysrec testquorum testvotequorum1 testvotequorum2 + logsysbench logsysrec testquorum testvotequorum1 testvotequorum2 magbench testevs_LDADD = -levs testevs_LDFLAGS = -L../lib @@ -58,6 +58,8 @@ cpgbench_LDFLAGS = -L../lib logsysbench_LDADD = -llogsys logsysbench_LDFLAGS = -L../exec +magbench_LDADD = +magbench_LDFLAGS = logsysrec_LDADD = -llogsys logsysrec_LDFLAGS = -L../exec Index: include/corosync/magalloc.h =================================================================== --- include/corosync/magalloc.h (revision 0) +++ include/corosync/magalloc.h (revision 0) @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2009 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake ([email protected]) + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef MAGALLOC_H_DEFINED +#define MAGALLOC_H_DEFINED + +#include <config.h> +#include <stdlib.h> + +struct magazine_db { + int object_size; + void **objects; + unsigned int object_count; + unsigned int object_position; +}; + +#define DECLARE_MAGAZINE(magazine_name,object_sz) \ +static struct magazine_db (magazine_name) = { \ + .object_size = object_sz, \ + .objects = NULL, \ + .object_count = 0, \ + .object_position = 0, \ +}; + +static inline void *mag_alloc (struct magazine_db *db) { + void *ptr; + void *new_objects; + + if (db->object_count == db->object_position) { + db->object_count += 1; + new_objects = realloc (db->objects, sizeof (void *) * db->object_count); + if (new_objects == NULL) { + return (NULL); + } + db->objects = new_objects; + db->objects[db->object_position] = malloc (db->object_size); + if (db->objects[db->object_position] == NULL) { + db->object_count -= 1; + return (NULL); + } + } + ptr = db->objects[db->object_position]; + db->object_position++; + return (ptr); +} + +static inline void mag_free (struct magazine_db *db, void *addr) { + db->object_position--; + db->objects[db->object_position] = addr; +} +#endif /* MAGALLOC_H_DEFINED */ Index: include/Makefile.am =================================================================== --- include/Makefile.am (revision 2058) +++ include/Makefile.am (working copy) @@ -32,7 +32,7 @@ MAINTAINERCLEANFILES = Makefile.in corosync/config.h.in CS_H = hdb.h cs_config.h cpg.h cfg.h evs.h ipc_gen.h mar_gen.h swab.h \ - coroipcc.h confdb.h list.h corotypes.h quorum.h votequorum.h + coroipcc.h confdb.h list.h corotypes.h quorum.h votequorum.h magalloc.h CS_INTERNAL_H = ipc_cfg.h ipc_confdb.h ipc_cpg.h ipc_evs.h ipc_pload.h ipc_quorum.h \ jhash.h mar_cpg.h pload.h queue.h quorum.h rmd.h sq.h ipc_votequorum.h Index: exec/totempg.c =================================================================== --- exec/totempg.c (revision 2058) +++ exec/totempg.c (working copy) @@ -98,6 +98,7 @@ #include <corosync/list.h> #include <corosync/totem/coropoll.h> #include <corosync/totem/totempg.h> +#include <corosync/magalloc.h> #include "totemmrp.h" #include "totemsrp.h" @@ -184,9 +185,9 @@ static enum throw_away_mode_t throw_away_mode = THROW_AWAY_INACTIVE; -DECLARE_LIST_INIT(assembly_list_inuse); +DECLARE_LIST_INIT(assembly_list_head); -DECLARE_LIST_INIT(assembly_list_free); +DECLARE_MAGAZINE(assembly_mag, sizeof (struct assembly)); /* * Staging buffer for packed messages. Messages are staged in this buffer @@ -253,10 +254,10 @@ struct list_head *list; /* - * Search inuse list for node id and return assembly buffer if found + * Search existing assembly lists */ - for (list = assembly_list_inuse.next; - list != &assembly_list_inuse; + for (list = assembly_list_head.next; + list != &assembly_list_head; list = list->next) { assembly = list_entry (list, struct assembly, list); @@ -267,28 +268,14 @@ } /* - * Nothing found in inuse list get one from free list if available + * Allocate new assembly list */ - if (list_empty (&assembly_list_free) == 0) { - assembly = list_entry (assembly_list_free.next, struct assembly, list); - list_del (&assembly->list); - list_add (&assembly->list, &assembly_list_inuse); - assembly->nodeid = nodeid; - return (assembly); - } - - /* - * Nothing available in inuse or free list, so allocate a new one - */ - assembly = malloc (sizeof (struct assembly)); - memset (assembly, 0, sizeof (struct assembly)); - /* - * TODO handle memory allocation failure here - */ - assert (assembly); + assembly = mag_alloc (&assembly_mag); assembly->nodeid = nodeid; + assembly->index = 0; + assembly->last_frag_num = 0; list_init (&assembly->list); - list_add (&assembly->list, &assembly_list_inuse); + list_add (&assembly->list, &assembly_list_head); return (assembly); } @@ -296,7 +283,7 @@ static void assembly_deref (struct assembly *assembly) { list_del (&assembly->list); - list_add (&assembly->list, &assembly_list_free); + mag_free (&assembly_mag, assembly); } static inline void app_confchg_fn ( Index: exec/totemsrp.c =================================================================== --- exec/totemsrp.c (revision 2058) +++ exec/totemsrp.c (working copy) @@ -77,6 +77,7 @@ #include <corosync/list.h> #include <corosync/hdb.h> #include <corosync/totem/coropoll.h> +#include <corosync/magalloc.h> #include "totemsrp.h" #include "totemrrp.h" #include "wthread.h" @@ -275,14 +276,13 @@ }__attribute__((packed)); struct message_item { - struct mcast *mcast; - struct iovec iovec[MAXIOVS]; - unsigned int iov_len; + char *msg; + int mlen; }; struct sort_queue_item { - struct iovec iovec[MAXIOVS]; - unsigned int iov_len; + char *msg; + int mlen; }; struct orf_token_mcast_thread_state { @@ -614,6 +614,10 @@ */ DECLARE_HDB_DATABASE (totemsrp_instance_database); +DECLARE_MAGAZINE (mag_db,10000); + +DECLARE_MAGAZINE (callback_mag_db, sizeof (struct token_callback_instance)); + struct message_handlers totemsrp_message_handlers = { 6, { @@ -1556,61 +1560,47 @@ /* * Convert recovery message into regular message */ - if (recovery_message_item->iov_len > 1) { - mcast = recovery_message_item->iovec[1].iov_base; - memcpy (®ular_message_item.iovec[0], - &recovery_message_item->iovec[1], - sizeof (struct iovec) * recovery_message_item->iov_len); - } else { - mcast = recovery_message_item->iovec[0].iov_base; - if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) { - /* - * Message is a recovery message encapsulated - * in a new ring message - */ - regular_message_item.iovec[0].iov_base = - (char *)recovery_message_item->iovec[0].iov_base + sizeof (struct mcast); - regular_message_item.iovec[0].iov_len = - recovery_message_item->iovec[0].iov_len - sizeof (struct mcast); - regular_message_item.iov_len = 1; - mcast = regular_message_item.iovec[0].iov_base; - } else { - continue; /* TODO this case shouldn't happen */ - /* - * Message is originated on new ring and not - * encapsulated - */ - regular_message_item.iovec[0].iov_base = - recovery_message_item->iovec[0].iov_base; - regular_message_item.iovec[0].iov_len = - recovery_message_item->iovec[0].iov_len; - } - } + mcast = recovery_message_item->msg; + if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) { + /* + * Message is a recovery message encapsulated + * in a new ring message + */ + regular_message_item.msg = + (struct mcast *)mag_alloc (&mag_db); + memcpy (regular_message_item.msg, + recovery_message_item->msg + + sizeof (struct mcast), + recovery_message_item->mlen - sizeof (struct mcast)); - log_printf (instance->totemsrp_log_level_debug, - "comparing if ring id is for this processors old ring seqno %d\n", - mcast->seq); + regular_message_item.mlen = + recovery_message_item->mlen - sizeof (struct mcast); - /* - * Only add this message to the regular sort - * queue if it was originated with the same ring - * id as the previous ring - */ - if (memcmp (&instance->my_old_ring_id, &mcast->ring_id, - sizeof (struct memb_ring_id)) == 0) { + log_printf (instance->totemsrp_log_level_debug, + "comparing if ring id is for this processors old ring seqno %d\n", + mcast->seq); - regular_message_item.iov_len = recovery_message_item->iov_len; - res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq); - if (res == 0) { - sq_item_add (&instance->regular_sort_queue, - ®ular_message_item, mcast->seq); - if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) { - instance->old_ring_state_high_seq_received = mcast->seq; + /* + * Only add this message to the regular sort + * queue if it was originated with the same ring + * id as the previous ring + */ + if (memcmp (&instance->my_old_ring_id, &mcast->ring_id, + sizeof (struct memb_ring_id)) == 0) { + + regular_message_item.mlen = recovery_message_item->mlen; + res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq); + if (res == 0) { + sq_item_add (&instance->regular_sort_queue, + ®ular_message_item, mcast->seq); + if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) { + instance->old_ring_state_high_seq_received = mcast->seq; + } } + } else { + log_printf (instance->totemsrp_log_level_notice, + "-not adding msg with seq no %x\n", mcast->seq); } - } else { - log_printf (instance->totemsrp_log_level_notice, - "-not adding msg with seq no %x\n", mcast->seq); } } } @@ -1841,6 +1831,7 @@ char seqno_string_hex[10]; struct srp_addr *addr; struct memb_commit_token_memb_entry *memb_list; + struct mcast *mcast; addr = (struct srp_addr *)commit_token->end_of_commit_token; memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries); @@ -1963,25 +1954,25 @@ } strcat (is_originated, seqno_string_hex); sort_queue_item = ptr; - assert (sort_queue_item->iov_len > 0); - assert (sort_queue_item->iov_len <= MAXIOVS); messages_originated++; - memset (&message_item, 0, sizeof (struct message_item)); // TODO LEAK - message_item.mcast = malloc (sizeof (struct mcast)); - assert (message_item.mcast); - message_item.mcast->header.type = MESSAGE_TYPE_MCAST; - srp_addr_copy (&message_item.mcast->system_from, &instance->my_id); - message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED; - message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid; - assert (message_item.mcast->header.nodeid); - message_item.mcast->header.endian_detector = ENDIAN_LOCAL; - memcpy (&message_item.mcast->ring_id, &instance->my_ring_id, + + mcast = (struct mcast *)mag_alloc (&mag_db); + mcast->header.type = MESSAGE_TYPE_MCAST; + srp_addr_copy (&mcast->system_from, &instance->my_id); + mcast->header.encapsulated = MESSAGE_ENCAPSULATED; + mcast->header.nodeid = instance->my_id.addr[0].nodeid; + assert (mcast->header.nodeid); + mcast->header.endian_detector = ENDIAN_LOCAL; + memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); - message_item.iov_len = sort_queue_item->iov_len; - memcpy (&message_item.iovec, &sort_queue_item->iovec, - sizeof (struct iovec) * sort_queue_item->iov_len); - queue_item_add (&instance->retrans_message_queue, &message_item); + + message_item.mlen = sort_queue_item->mlen - sizeof (struct mcast); + message_item.msg = (char *)mcast; + memcpy (message_item.msg + sizeof (struct mcast), + sort_queue_item->msg + sizeof (struct mcast), + sort_queue_item->mlen - sizeof (struct mcast)); + queue_item_add (&instance->retrans_message_queue, &message_item); } log_printf (instance->totemsrp_log_level_notice, "Originated %d messages in RECOVERY.\n", messages_originated); @@ -2036,10 +2027,12 @@ int guarantee) { int i; - int j; struct message_item message_item; struct totemsrp_instance *instance; unsigned int res; + int total_size = 0; + char *addr; + struct mcast *mcast; res = hdb_handle_get (&totemsrp_instance_database, handle, (void *)&instance); @@ -2051,63 +2044,52 @@ log_printf (instance->totemsrp_log_level_warning, "queue full\n"); return (-1); } - for (j = 0, i = 0; i < iov_len; i++) { - j+= iovec[i].iov_len; - } - memset (&message_item, 0, sizeof (struct message_item)); - /* - * Allocate pending item - */ -// TODO LEAK - message_item.mcast = malloc (sizeof (struct mcast)); - if (message_item.mcast == 0) { - goto error_mcast; + * Determine size of memory to allocate and message length + */ + total_size = sizeof (struct mcast); + for (i = 0; i < iov_len; i++) { + total_size += iovec[i].iov_len; } + message_item.msg = mag_alloc (&mag_db); + if (message_item.msg == 0) { + goto error_put; + } + message_item.mlen = total_size; + + mcast = (struct mcast *)message_item.msg; + + addr = message_item.msg + sizeof (struct mcast); + /* * Set mcast header */ - message_item.mcast->header.type = MESSAGE_TYPE_MCAST; - message_item.mcast->header.endian_detector = ENDIAN_LOCAL; - message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED; - message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid; - assert (message_item.mcast->header.nodeid); + mcast->header.type = MESSAGE_TYPE_MCAST; + mcast->header.endian_detector = ENDIAN_LOCAL; + mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED; + mcast->header.nodeid = instance->my_id.addr[0].nodeid; + mcast->guarantee = guarantee; + srp_addr_copy (&mcast->system_from, &instance->my_id); + assert (mcast->header.nodeid); - message_item.mcast->guarantee = guarantee; - srp_addr_copy (&message_item.mcast->system_from, &instance->my_id); - + /* + * Copy message contents into message + */ for (i = 0; i < iov_len; i++) { -// TODO LEAK - message_item.iovec[i].iov_base = malloc (iovec[i].iov_len); - - if (message_item.iovec[i].iov_base == 0) { - goto error_iovec; - } - - memcpy (message_item.iovec[i].iov_base, iovec[i].iov_base, + memcpy (addr, iovec[i].iov_base, iovec[i].iov_len); - - message_item.iovec[i].iov_len = iovec[i].iov_len; + addr += iovec[i].iov_len; } - - message_item.iov_len = iov_len; - + log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n"); queue_item_add (&instance->new_message_queue, &message_item); hdb_handle_put (&totemsrp_instance_database, handle); return (0); -error_iovec: - for (j = 0; j < i; j++) { - free (message_item.iovec[j].iov_base); - } - - free(message_item.mcast); - -error_mcast: +error_put: hdb_handle_put (&totemsrp_instance_database, handle); error_exit: @@ -2152,7 +2134,7 @@ struct sort_queue_item *sort_queue_item; int res; void *ptr; - + struct iovec iovec; struct sq *sort_queue; if (instance->memb_state == MEMB_STATE_RECOVERY) { @@ -2177,9 +2159,12 @@ sort_queue_item = ptr; + iovec.iov_base = sort_queue_item->msg; + iovec.iov_len = sort_queue_item->mlen; + totemrrp_mcast_noflush_send (instance->totemrrp_handle, - sort_queue_item->iovec, - sort_queue_item->iov_len); + &iovec, + 1); return (0); } @@ -2193,7 +2178,7 @@ unsigned int token_aru) { struct sort_queue_item *regular_message; - unsigned int i, j; + unsigned int i; int res; int log_release = 0; unsigned int release_to; @@ -2227,9 +2212,8 @@ instance->last_released + i, &ptr); if (res == 0) { regular_message = ptr; - for (j = 0; j < regular_message->iov_len; j++) { - free (regular_message->iovec[j].iov_base); - } + + mag_free (&mag_db, regular_message->msg); } sq_items_release (&instance->regular_sort_queue, instance->last_released + i); @@ -2295,6 +2279,7 @@ struct sort_queue_item *sort_queue_item_ptr; struct mcast *mcast; unsigned int fcc_mcast_current; + struct iovec iovec; if (instance->memb_state == MEMB_STATE_RECOVERY) { mcast_queue = &instance->retrans_message_queue; @@ -2310,6 +2295,8 @@ break; } message_item = (struct message_item *)queue_item_get (mcast_queue); + mcast = (struct mcast *)message_item->msg; + /* preincrement required by algo */ if (instance->old_ring_state_saved && (instance->memb_state == MEMB_STATE_GATHER || @@ -2321,36 +2308,32 @@ return (0); } - message_item->mcast->seq = ++token->seq; - message_item->mcast->this_seqno = instance->global_seqno++; + mcast->seq = ++token->seq; + mcast->this_seqno = instance->global_seqno++; /* * Build IO vector */ memset (&sort_queue_item, 0, sizeof (struct sort_queue_item)); - sort_queue_item.iovec[0].iov_base = message_item->mcast; - sort_queue_item.iovec[0].iov_len = sizeof (struct mcast); + sort_queue_item.msg = message_item->msg; + sort_queue_item.mlen = message_item->mlen; - mcast = sort_queue_item.iovec[0].iov_base; + mcast = (struct mcast *)sort_queue_item.msg; - memcpy (&sort_queue_item.iovec[1], message_item->iovec, - message_item->iov_len * sizeof (struct iovec)); - memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id)); - sort_queue_item.iov_len = message_item->iov_len + 1; - - assert (sort_queue_item.iov_len < 16); - /* * Add message to retransmit queue */ sort_queue_item_ptr = sq_item_add (sort_queue, - &sort_queue_item, message_item->mcast->seq); + &sort_queue_item, mcast->seq); + iovec.iov_base = sort_queue_item_ptr->msg; + iovec.iov_len = sort_queue_item_ptr->mlen; + totemrrp_mcast_noflush_send (instance->totemrrp_handle, - sort_queue_item_ptr->iovec, - sort_queue_item_ptr->iov_len); + &iovec, + 1); /* * Delete item from pending queue @@ -3016,7 +2999,7 @@ token_hold_cancel_send (instance); - callback_handle = malloc (sizeof (struct token_callback_instance)); + callback_handle = mag_alloc (&callback_mag_db); if (callback_handle == 0) { return (-1); } @@ -3048,7 +3031,7 @@ if (*handle_out) { h = (struct token_callback_instance *)*handle_out; list_del (&h->list); - free (h); + mag_free (&callback_mag_db, h); h = NULL; *handle_out = 0; } @@ -3096,7 +3079,7 @@ if (res == -1 && del == 1) { list_add (list, callback_listhead); } else if (del) { - free (token_callback_instance); + mag_free (&callback_mag_db, token_callback_instance); } } } @@ -3502,6 +3485,7 @@ unsigned int range = 0; int endian_conversion_required; unsigned int my_high_delivered_stored = 0; + struct iovec iovec; range = end_point - instance->my_high_delivered; @@ -3548,8 +3532,7 @@ sort_queue_item_p = ptr; - mcast_in = sort_queue_item_p->iovec[0].iov_base; - assert (mcast_in != (struct mcast *)0xdeadbeef); + mcast_in = (struct mcast *)sort_queue_item_p->msg; endian_conversion_required = 0; if (mcast_in->header.endian_detector != ENDIAN_LOCAL) { @@ -3580,30 +3563,14 @@ "Delivering MCAST message with seq %x to pending delivery queue\n", mcast_header.seq); - /* - * Message is locally originated multicast - */ - if (sort_queue_item_p->iov_len > 1 && - sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) { - instance->totemsrp_deliver_fn ( - mcast_header.header.nodeid, - &sort_queue_item_p->iovec[1], - sort_queue_item_p->iov_len - 1, - endian_conversion_required); - } else { - sort_queue_item_p->iovec[0].iov_len -= sizeof (struct mcast); - sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base + sizeof (struct mcast); + iovec.iov_base = sort_queue_item_p->msg + sizeof (struct mcast); + iovec.iov_len = sort_queue_item_p->mlen - sizeof (struct mcast); - instance->totemsrp_deliver_fn ( - mcast_header.header.nodeid, - sort_queue_item_p->iovec, - sort_queue_item_p->iov_len, - endian_conversion_required); - - sort_queue_item_p->iovec[0].iov_len += sizeof (struct mcast); - sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base - sizeof (struct mcast); - } -//TODO instance->stats_delv += 1; + instance->totemsrp_deliver_fn ( + mcast_header.header.nodeid, + &iovec, + 1, + endian_conversion_required); } } @@ -3705,15 +3672,13 @@ * Allocate new multicast memory block */ // TODO LEAK - sort_queue_item.iovec[0].iov_base = malloc (msg_len); - if (sort_queue_item.iovec[0].iov_base == 0) { + sort_queue_item.msg = mag_alloc (&mag_db); + if (sort_queue_item.msg == 0) { return (-1); /* error here is corrected by the algorithm */ } - memcpy (sort_queue_item.iovec[0].iov_base, msg, msg_len); - sort_queue_item.iovec[0].iov_len = msg_len; - assert (sort_queue_item.iovec[0].iov_len > 0); - assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX); - sort_queue_item.iov_len = 1; + memcpy (sort_queue_item.msg, msg, msg_len); + sort_queue_item.mlen = msg_len; + assert (sort_queue_item.mlen < FRAME_SIZE_MAX); if (sq_lt_compare (instance->my_high_seq_received, mcast_header.seq)) {
_______________________________________________ Openais mailing list [email protected] https://lists.linux-foundation.org/mailman/listinfo/openais
