see previous mail on subject

patch included

Index: test/magbench.c
===================================================================
--- test/magbench.c	(revision 0)
+++ test/magbench.c	(revision 0)
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2008, 2009 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake ([email protected])
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <config.h>
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+#include <corosync/engine/logsys.h>
+#include <corosync/magalloc.h>
+
+DECLARE_MAGAZINE (magbench, 10000);
+
+#define ITERATIONS 10000000
+
+static struct timeval tv1, tv2, tv_elapsed;
+
+#define timersub(a, b, result)					\
+do {								\
+	(result)->tv_sec = (a)->tv_sec - (b)->tv_sec;		\
+	(result)->tv_usec = (a)->tv_usec - (b)->tv_usec;	\
+	if ((result)->tv_usec < 0) {				\
+		--(result)->tv_sec;				\
+		(result)->tv_usec += 1000000;			\
+	}							\
+} while (0)
+
+static void bm_start (void)
+{
+        gettimeofday (&tv1, NULL);
+}
+static void bm_finish (const char *operation)
+{
+        gettimeofday (&tv2, NULL);
+        timersub (&tv2, &tv1, &tv_elapsed);
+
+	if (strlen (operation) > 22) {
+        	printf ("%s\t\t", operation);
+	} else {
+        	printf ("%s\t\t\t", operation);
+	}
+        printf ("%9.3f operations/sec\n",
+                ((float)ITERATIONS) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+}
+
+int main (void)
+{
+	int i, j;
+	void **allocs[20];
+	
+	bm_start();
+	for (j = 0; j < ITERATIONS; j++) {
+	for (i = 0; i < 5; i++) {
+		allocs[i] = mag_alloc (&magbench);
+	}
+	for (i = 0; i < 5; i++) {
+		mag_free (&magbench, allocs[i]);
+	}
+	}
+	bm_finish ("mag_alloc and free of 5 items");
+
+	bm_start();
+	for (j = 0; j < ITERATIONS; j++) {
+	for (i = 0; i < 5; i++) {
+		allocs[i] = malloc (10000);
+	}
+	for (i = 0; i < 5; i++) {
+		free (allocs[i]);
+	}
+	}
+	bm_finish ("malloc/free of 5 items");
+exit (1);
+
+};
Index: test/Makefile.am
===================================================================
--- test/Makefile.am	(revision 2058)
+++ test/Makefile.am	(working copy)
@@ -34,7 +34,7 @@
 INCLUDES       		= -I$(top_builddir)/include -I$(top_srcdir)/include
 
 noinst_PROGRAMS		= testevs evsbench evsverify testcpg testcpg2 cpgbench testconfdb	\
-			logsysbench logsysrec testquorum testvotequorum1 testvotequorum2
+			logsysbench logsysrec testquorum testvotequorum1 testvotequorum2 magbench
 
 testevs_LDADD		= -levs
 testevs_LDFLAGS		= -L../lib
@@ -58,6 +58,8 @@
 cpgbench_LDFLAGS	= -L../lib
 logsysbench_LDADD	= -llogsys
 logsysbench_LDFLAGS	= -L../exec
+magbench_LDADD		= 
+magbench_LDFLAGS	= 
 logsysrec_LDADD		= -llogsys
 logsysrec_LDFLAGS	= -L../exec
 
Index: include/corosync/magalloc.h
===================================================================
--- include/corosync/magalloc.h	(revision 0)
+++ include/corosync/magalloc.h	(revision 0)
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2009 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake ([email protected])
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef MAGALLOC_H_DEFINED
+#define MAGALLOC_H_DEFINED
+
+#include <config.h>
+#include <stdlib.h>
+
+struct magazine_db {
+	int object_size;
+	void **objects;
+	unsigned int object_count;
+	unsigned int object_position;
+};
+
+#define DECLARE_MAGAZINE(magazine_name,object_sz)	\
+static struct magazine_db (magazine_name) = {		\
+	.object_size = object_sz,			\
+	.objects = NULL,				\
+	.object_count = 0,				\
+	.object_position = 0,				\
+};							
+	
+static inline void *mag_alloc (struct magazine_db *db) {
+	void *ptr;
+	void *new_objects;
+
+	if (db->object_count == db->object_position) {
+		db->object_count += 1;
+		new_objects = realloc (db->objects, sizeof (void *) * db->object_count);
+		if (new_objects == NULL) {
+			return (NULL);
+		}
+		db->objects = new_objects;
+		db->objects[db->object_position] = malloc (db->object_size);
+		if (db->objects[db->object_position] == NULL) {
+			db->object_count -= 1;
+			return (NULL);
+		}
+	}
+	ptr = db->objects[db->object_position];
+	db->object_position++;
+	return (ptr);
+}
+
+static inline void mag_free (struct magazine_db *db, void *addr) {
+	db->object_position--;
+	db->objects[db->object_position] = addr;
+}
+#endif /* MAGALLOC_H_DEFINED */
Index: include/Makefile.am
===================================================================
--- include/Makefile.am	(revision 2058)
+++ include/Makefile.am	(working copy)
@@ -32,7 +32,7 @@
 MAINTAINERCLEANFILES    = Makefile.in corosync/config.h.in
 
 CS_H			= hdb.h cs_config.h cpg.h cfg.h evs.h ipc_gen.h mar_gen.h swab.h 	\
-		  	coroipcc.h confdb.h list.h corotypes.h quorum.h votequorum.h
+		  	coroipcc.h confdb.h list.h corotypes.h quorum.h votequorum.h magalloc.h
 
 CS_INTERNAL_H		= ipc_cfg.h ipc_confdb.h ipc_cpg.h ipc_evs.h ipc_pload.h ipc_quorum.h 	\
 			jhash.h mar_cpg.h pload.h queue.h quorum.h rmd.h sq.h ipc_votequorum.h
Index: exec/totempg.c
===================================================================
--- exec/totempg.c	(revision 2058)
+++ exec/totempg.c	(working copy)
@@ -98,6 +98,7 @@
 #include <corosync/list.h>
 #include <corosync/totem/coropoll.h>
 #include <corosync/totem/totempg.h>
+#include <corosync/magalloc.h>
 
 #include "totemmrp.h"
 #include "totemsrp.h"
@@ -184,9 +185,9 @@
 
 static enum throw_away_mode_t throw_away_mode = THROW_AWAY_INACTIVE;
 
-DECLARE_LIST_INIT(assembly_list_inuse);
+DECLARE_LIST_INIT(assembly_list_head);
 
-DECLARE_LIST_INIT(assembly_list_free);
+DECLARE_MAGAZINE(assembly_mag, sizeof (struct assembly));
 
 /*
  * Staging buffer for packed messages.  Messages are staged in this buffer
@@ -253,10 +254,10 @@
 	struct list_head *list;
 
 	/*
-	 * Search inuse list for node id and return assembly buffer if found
+	 * Search existing assembly lists
 	 */
-	for (list = assembly_list_inuse.next;
-		list != &assembly_list_inuse;
+	for (list = assembly_list_head.next;
+		list != &assembly_list_head;
 		list = list->next) { 
 
 		assembly = list_entry (list, struct assembly, list);
@@ -267,28 +268,14 @@
 	}
 
 	/*
-	 * Nothing found in inuse list get one from free list if available
+	 * Allocate new assembly list
 	 */
-	if (list_empty (&assembly_list_free) == 0) {
-		assembly = list_entry (assembly_list_free.next, struct assembly, list);
-		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_inuse);
-		assembly->nodeid = nodeid;
-		return (assembly);
-	}
-
-	/*
-	 * Nothing available in inuse or free list, so allocate a new one
-	 */
-	assembly = malloc (sizeof (struct assembly));
-	memset (assembly, 0, sizeof (struct assembly));
-	/*
-	 * TODO handle memory allocation failure here
-	 */
-	assert (assembly);
+	assembly = mag_alloc (&assembly_mag);
 	assembly->nodeid = nodeid;
+	assembly->index = 0;
+	assembly->last_frag_num = 0;
 	list_init (&assembly->list);
-	list_add (&assembly->list, &assembly_list_inuse);
+	list_add (&assembly->list, &assembly_list_head);
 
 	return (assembly);
 }
@@ -296,7 +283,7 @@
 static void assembly_deref (struct assembly *assembly)
 {
 	list_del (&assembly->list);
-	list_add (&assembly->list, &assembly_list_free);
+	mag_free (&assembly_mag, assembly);
 }
 
 static inline void app_confchg_fn (
Index: exec/totemsrp.c
===================================================================
--- exec/totemsrp.c	(revision 2058)
+++ exec/totemsrp.c	(working copy)
@@ -77,6 +77,7 @@
 #include <corosync/list.h>
 #include <corosync/hdb.h>
 #include <corosync/totem/coropoll.h>
+#include <corosync/magalloc.h>
 #include "totemsrp.h"
 #include "totemrrp.h"
 #include "wthread.h"
@@ -275,14 +276,13 @@
 }__attribute__((packed));
 
 struct message_item {
-	struct mcast *mcast;
-	struct iovec iovec[MAXIOVS];
-	unsigned int iov_len;
+	char *msg;
+	int mlen;
 };
 
 struct sort_queue_item {
-	struct iovec iovec[MAXIOVS];
-	unsigned int iov_len;
+	char *msg;
+	int mlen;
 };
 
 struct orf_token_mcast_thread_state {
@@ -614,6 +614,10 @@
  */
 DECLARE_HDB_DATABASE (totemsrp_instance_database);
 
+DECLARE_MAGAZINE (mag_db,10000);
+
+DECLARE_MAGAZINE (callback_mag_db, sizeof (struct token_callback_instance));
+
 struct message_handlers totemsrp_message_handlers = {
 	6,
 	{
@@ -1556,61 +1560,47 @@
 		/*
 		 * Convert recovery message into regular message
 		 */
-		if (recovery_message_item->iov_len > 1) {
-			mcast = recovery_message_item->iovec[1].iov_base;
-			memcpy (&regular_message_item.iovec[0],
-				&recovery_message_item->iovec[1],
-				sizeof (struct iovec) * recovery_message_item->iov_len);
-		} else {
-			mcast = recovery_message_item->iovec[0].iov_base;
-			if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
-				/*
-				 * Message is a recovery message encapsulated
-				 * in a new ring message
-				 */
-				regular_message_item.iovec[0].iov_base =
-					(char *)recovery_message_item->iovec[0].iov_base + sizeof (struct mcast);
-				regular_message_item.iovec[0].iov_len =
-				recovery_message_item->iovec[0].iov_len - sizeof (struct mcast);
-				regular_message_item.iov_len = 1;
-				mcast = regular_message_item.iovec[0].iov_base;
-			} else {
-				continue; /* TODO this case shouldn't happen */
-				/*
-				 * Message is originated on new ring and not
-				 * encapsulated
-				 */
-				regular_message_item.iovec[0].iov_base =
-					recovery_message_item->iovec[0].iov_base;
-				regular_message_item.iovec[0].iov_len =
-				recovery_message_item->iovec[0].iov_len;
-			}
-		}
+		mcast = recovery_message_item->msg;
+		if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
+			/*
+			 * Message is a recovery message encapsulated
+			 * in a new ring message
+			 */
+			regular_message_item.msg =
+				(struct mcast *)mag_alloc (&mag_db);
+			memcpy (regular_message_item.msg,
+				recovery_message_item->msg +
+					sizeof (struct mcast),
+				recovery_message_item->mlen - sizeof (struct mcast));
 
-		log_printf (instance->totemsrp_log_level_debug,
-			"comparing if ring id is for this processors old ring seqno %d\n",
-			 mcast->seq);
+			regular_message_item.mlen = 
+				recovery_message_item->mlen - sizeof (struct mcast);
 
-		/*
-		 * Only add this message to the regular sort
-		 * queue if it was originated with the same ring
-		 * id as the previous ring
-		 */
-		if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
-			sizeof (struct memb_ring_id)) == 0) {
+			log_printf (instance->totemsrp_log_level_debug,
+				"comparing if ring id is for this processors old ring seqno %d\n",
+				 mcast->seq);
 
-			regular_message_item.iov_len = recovery_message_item->iov_len;
-			res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
-			if (res == 0) {
-				sq_item_add (&instance->regular_sort_queue,
-					&regular_message_item, mcast->seq);
-				if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
-					instance->old_ring_state_high_seq_received = mcast->seq;
+			/*
+			 * Only add this message to the regular sort
+			 * queue if it was originated with the same ring
+			 * id as the previous ring
+			 */
+			if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
+				sizeof (struct memb_ring_id)) == 0) {
+
+				regular_message_item.mlen = recovery_message_item->mlen;
+				res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
+				if (res == 0) {
+					sq_item_add (&instance->regular_sort_queue,
+						&regular_message_item, mcast->seq);
+					if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
+						instance->old_ring_state_high_seq_received = mcast->seq;
+					}
 				}
+			} else {
+				log_printf (instance->totemsrp_log_level_notice,
+					"-not adding msg with seq no %x\n", mcast->seq);
 			}
-		} else {
-			log_printf (instance->totemsrp_log_level_notice,
-				"-not adding msg with seq no %x\n", mcast->seq);
 		}
 	}
 }
@@ -1841,6 +1831,7 @@
 	char seqno_string_hex[10];
 	struct srp_addr *addr;
 	struct memb_commit_token_memb_entry *memb_list;
+	struct mcast *mcast;
 
 	addr = (struct srp_addr *)commit_token->end_of_commit_token;
 	memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
@@ -1963,25 +1954,25 @@
 	}
 	strcat (is_originated, seqno_string_hex);
 	sort_queue_item = ptr;
-	assert (sort_queue_item->iov_len > 0);
-	assert (sort_queue_item->iov_len <= MAXIOVS);
 	messages_originated++;
-	memset (&message_item, 0, sizeof (struct message_item));
 // TODO	 LEAK
-	message_item.mcast = malloc (sizeof (struct mcast));
-	assert (message_item.mcast);
-	message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
-	srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
-	message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
-	message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
-	assert (message_item.mcast->header.nodeid);
-	message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
-	memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
+
+	mcast = (struct mcast *)mag_alloc (&mag_db);
+	mcast->header.type = MESSAGE_TYPE_MCAST;
+	srp_addr_copy (&mcast->system_from, &instance->my_id);
+	mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
+	mcast->header.nodeid = instance->my_id.addr[0].nodeid;
+	assert (mcast->header.nodeid);
+	mcast->header.endian_detector = ENDIAN_LOCAL;
+	memcpy (&mcast->ring_id, &instance->my_ring_id,
 		sizeof (struct memb_ring_id));
-	message_item.iov_len = sort_queue_item->iov_len;
-	memcpy (&message_item.iovec, &sort_queue_item->iovec,
-		sizeof (struct iovec) * sort_queue_item->iov_len);
-		queue_item_add (&instance->retrans_message_queue, &message_item);
+
+	message_item.mlen = sort_queue_item->mlen - sizeof (struct mcast);
+	message_item.msg = (char *)mcast;
+	memcpy (message_item.msg + sizeof (struct mcast),
+		sort_queue_item->msg + sizeof (struct mcast),
+		sort_queue_item->mlen - sizeof (struct mcast));
+	queue_item_add (&instance->retrans_message_queue, &message_item);
 	}
 	log_printf (instance->totemsrp_log_level_notice,
 		"Originated %d messages in RECOVERY.\n", messages_originated);
@@ -2036,10 +2027,12 @@
 	int guarantee)
 {
 	int i;
-	int j;
 	struct message_item message_item;
 	struct totemsrp_instance *instance;
 	unsigned int res;
+	int total_size = 0;
+	char *addr;
+	struct mcast *mcast;
 
 	res = hdb_handle_get (&totemsrp_instance_database, handle,
 		(void *)&instance);
@@ -2051,63 +2044,52 @@
 		log_printf (instance->totemsrp_log_level_warning, "queue full\n");
 		return (-1);
 	}
-	for (j = 0, i = 0; i < iov_len; i++) {
-		j+= iovec[i].iov_len;
-	}
 
-	memset (&message_item, 0, sizeof (struct message_item));
-
 	/*
-	 * Allocate pending item
-	 */
-// TODO LEAK
-	message_item.mcast = malloc (sizeof (struct mcast));
-	if (message_item.mcast == 0) {
-		goto error_mcast;
+	 * Determine size of memory to allocate and message length
+	 */ 
+	total_size = sizeof (struct mcast);
+	for (i = 0; i < iov_len; i++) {
+		total_size += iovec[i].iov_len;
 	}
+	message_item.msg = mag_alloc (&mag_db);
+	if (message_item.msg == 0) {
+		goto error_put;
+	}
 
+	message_item.mlen = total_size;
+
+	mcast = (struct mcast *)message_item.msg;
+
+	addr = message_item.msg + sizeof (struct mcast);
+
 	/*
 	 * Set mcast header
 	 */
-	message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
-	message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
-	message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED;
-	message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
-	assert (message_item.mcast->header.nodeid);
+	mcast->header.type = MESSAGE_TYPE_MCAST;
+	mcast->header.endian_detector = ENDIAN_LOCAL;
+	mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED;
+	mcast->header.nodeid = instance->my_id.addr[0].nodeid;
+	mcast->guarantee = guarantee;
+	srp_addr_copy (&mcast->system_from, &instance->my_id);
+	assert (mcast->header.nodeid);
 
-	message_item.mcast->guarantee = guarantee;
-	srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
-
+	/*
+	 * Copy message contents into message
+	 */
 	for (i = 0; i < iov_len; i++) {
-// TODO LEAK
-		message_item.iovec[i].iov_base = malloc (iovec[i].iov_len);
-
-		if (message_item.iovec[i].iov_base == 0) {
-			goto error_iovec;
-		}
-
-		memcpy (message_item.iovec[i].iov_base, iovec[i].iov_base,
+		memcpy (addr, iovec[i].iov_base,
 			iovec[i].iov_len);
-
-		message_item.iovec[i].iov_len = iovec[i].iov_len;
+		addr += iovec[i].iov_len;
 	}
-
-	message_item.iov_len = iov_len;
-
+	
 	log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
 	queue_item_add (&instance->new_message_queue, &message_item);
 
 	hdb_handle_put (&totemsrp_instance_database, handle);
 	return (0);
 
-error_iovec:
-	for (j = 0; j < i; j++) {
-		free (message_item.iovec[j].iov_base);
-	}
-	
-	free(message_item.mcast);
-
-error_mcast:
+error_put:
 	hdb_handle_put (&totemsrp_instance_database, handle);
 
 error_exit:
@@ -2152,7 +2134,7 @@
 	struct sort_queue_item *sort_queue_item;
 	int res;
 	void *ptr;
-
+	struct iovec iovec;
 	struct sq *sort_queue;
 
 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
@@ -2177,9 +2159,12 @@
 
 	sort_queue_item = ptr;
 
+	iovec.iov_base = sort_queue_item->msg;
+	iovec.iov_len = sort_queue_item->mlen;
+
 	totemrrp_mcast_noflush_send (instance->totemrrp_handle,
-		sort_queue_item->iovec,
-		sort_queue_item->iov_len);
+		&iovec,
+		1);
 
 	return (0);
 }
@@ -2193,7 +2178,7 @@
 	unsigned int token_aru)
 {
 	struct sort_queue_item *regular_message;
-	unsigned int i, j;
+	unsigned int i;
 	int res;
 	int log_release = 0;
 	unsigned int release_to;
@@ -2227,9 +2212,8 @@
 			instance->last_released + i, &ptr);
 		if (res == 0) {
 			regular_message = ptr;
-			for (j = 0; j < regular_message->iov_len; j++) {
-				free (regular_message->iovec[j].iov_base);
-			}
+		
+			mag_free (&mag_db, regular_message->msg);
 		}
 		sq_items_release (&instance->regular_sort_queue,
 			instance->last_released + i);
@@ -2295,6 +2279,7 @@
 	struct sort_queue_item *sort_queue_item_ptr;
 	struct mcast *mcast;
 	unsigned int fcc_mcast_current;
+	struct iovec iovec;
 
 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
 		mcast_queue = &instance->retrans_message_queue;
@@ -2310,6 +2295,8 @@
 			break;
 		}
 		message_item = (struct message_item *)queue_item_get (mcast_queue);
+		mcast = (struct mcast *)message_item->msg;
+
 		/* preincrement required by algo */
 		if (instance->old_ring_state_saved &&
 			(instance->memb_state == MEMB_STATE_GATHER ||
@@ -2321,36 +2308,32 @@
 			return (0);
 		}
 
-		message_item->mcast->seq = ++token->seq;
-		message_item->mcast->this_seqno = instance->global_seqno++;
+		mcast->seq = ++token->seq;
+		mcast->this_seqno = instance->global_seqno++;
 
 		/*
 		 * Build IO vector
 		 */
 		memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
-		sort_queue_item.iovec[0].iov_base = message_item->mcast;
-		sort_queue_item.iovec[0].iov_len = sizeof (struct mcast);
+		sort_queue_item.msg = message_item->msg;
+		sort_queue_item.mlen = message_item->mlen;
 	
-		mcast = sort_queue_item.iovec[0].iov_base;
+		mcast = (struct mcast *)sort_queue_item.msg;
 	
-		memcpy (&sort_queue_item.iovec[1], message_item->iovec,
-			message_item->iov_len * sizeof (struct iovec));
-
 		memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
 
-		sort_queue_item.iov_len = message_item->iov_len + 1;
-
-		assert (sort_queue_item.iov_len < 16);
-
 		/*
 		 * Add message to retransmit queue
 		 */
 		sort_queue_item_ptr = sq_item_add (sort_queue,
-			&sort_queue_item, message_item->mcast->seq);
+			&sort_queue_item, mcast->seq);
 
+		iovec.iov_base = sort_queue_item_ptr->msg;
+		iovec.iov_len = sort_queue_item_ptr->mlen;
+
 		totemrrp_mcast_noflush_send (instance->totemrrp_handle,
-			sort_queue_item_ptr->iovec,
-			sort_queue_item_ptr->iov_len);
+			&iovec,
+			1);
 		
 		/*
 		 * Delete item from pending queue
@@ -3016,7 +2999,7 @@
 
 	token_hold_cancel_send (instance);
 
-	callback_handle = malloc (sizeof (struct token_callback_instance));
+	callback_handle = mag_alloc (&callback_mag_db);
 	if (callback_handle == 0) {
 		return (-1);
 	}
@@ -3048,7 +3031,7 @@
 	if (*handle_out) {
  		h = (struct token_callback_instance *)*handle_out;
 		list_del (&h->list);
-		free (h);
+		mag_free (&callback_mag_db, h);
 		h = NULL;
 		*handle_out = 0;
 	}
@@ -3096,7 +3079,7 @@
 		if (res == -1 && del == 1) {
 			list_add (list, callback_listhead);
 		} else	if (del) {
-			free (token_callback_instance);
+			mag_free (&callback_mag_db, token_callback_instance);
 		}
 	}
 }
@@ -3502,6 +3485,7 @@
 	unsigned int range = 0;
 	int endian_conversion_required;
 	unsigned int my_high_delivered_stored = 0;
+	struct iovec iovec;
 
 
 	range = end_point - instance->my_high_delivered;
@@ -3548,8 +3532,7 @@
 
 		sort_queue_item_p = ptr;
 
-		mcast_in = sort_queue_item_p->iovec[0].iov_base;
-		assert (mcast_in != (struct mcast *)0xdeadbeef);
+		mcast_in = (struct mcast *)sort_queue_item_p->msg;
 
 		endian_conversion_required = 0;
 		if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
@@ -3580,30 +3563,14 @@
 			"Delivering MCAST message with seq %x to pending delivery queue\n",
 			mcast_header.seq);
 
-		/*
-		 * Message is locally originated multicast
-		 */
-	 	if (sort_queue_item_p->iov_len > 1 &&
-			sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
-			instance->totemsrp_deliver_fn (
-				mcast_header.header.nodeid,
-				&sort_queue_item_p->iovec[1],
-				sort_queue_item_p->iov_len - 1,
-				endian_conversion_required);
-		} else {
-			sort_queue_item_p->iovec[0].iov_len -= sizeof (struct mcast);
-			sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base + sizeof (struct mcast);
+		iovec.iov_base = sort_queue_item_p->msg + sizeof (struct mcast);
+		iovec.iov_len = sort_queue_item_p->mlen - sizeof (struct mcast);
 
-			instance->totemsrp_deliver_fn (
-				mcast_header.header.nodeid,
-				sort_queue_item_p->iovec,
-				sort_queue_item_p->iov_len,
-				endian_conversion_required);
-
-			sort_queue_item_p->iovec[0].iov_len += sizeof (struct mcast);
-			sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base - sizeof (struct mcast);
-		}
-//TODO	instance->stats_delv += 1;
+		instance->totemsrp_deliver_fn (
+			mcast_header.header.nodeid,
+			&iovec,
+			1,
+			endian_conversion_required);
 	}
 }
 
@@ -3705,15 +3672,13 @@
 		 * Allocate new multicast memory block
 		 */
 // TODO LEAK
-		sort_queue_item.iovec[0].iov_base = malloc (msg_len);
-		if (sort_queue_item.iovec[0].iov_base == 0) {
+		sort_queue_item.msg = mag_alloc (&mag_db);
+		if (sort_queue_item.msg == 0) {
 			return (-1); /* error here is corrected by the algorithm */
 		}
-		memcpy (sort_queue_item.iovec[0].iov_base, msg, msg_len);
-		sort_queue_item.iovec[0].iov_len = msg_len;
-		assert (sort_queue_item.iovec[0].iov_len > 0);
-		assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
-		sort_queue_item.iov_len = 1;
+		memcpy (sort_queue_item.msg, msg, msg_len);
+		sort_queue_item.mlen = msg_len;
+		assert (sort_queue_item.mlen < FRAME_SIZE_MAX);
 		
 		if (sq_lt_compare (instance->my_high_seq_received,
 			mcast_header.seq)) {
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to