Some network architectures, such as infiniband, require an asynchronous
resolution of IP address into an internally usable address for the
purposes of setting the token target.

This patch adds that callback to the totemsrp->totemrrp->totemnet as
well as totemudp/totemiba iterfaces.  It also calls the callback in
totemudp when the target token has been set.

Regards
-steve
Index: exec/totemnet.c
===================================================================
--- exec/totemnet.c	(revision 2370)
+++ exec/totemnet.c	(working copy)
@@ -57,8 +57,11 @@
 
 		void (*iface_change_fn) (
 			void *context,
-			const struct totem_ip_address *iface_address));
+			const struct totem_ip_address *iface_address),
 
+		void (*target_set_completed) (
+			void *context));
+
 	int (*processor_count_set) (
 		void *transport_context,
 		int processor_count);
@@ -195,7 +198,10 @@
 
 	void (*iface_change_fn) (
 		void *context,
-		const struct totem_ip_address *iface_address))
+		const struct totem_ip_address *iface_address),
+
+	void (*target_set_completed) (
+		void *context))
 {
 	struct totemnet_instance *instance;
 	unsigned int res;
@@ -208,7 +214,7 @@
 
 	res = instance->transport->initialize (poll_handle,
 		&instance->transport_context, totem_config,
-		interface_no, context, deliver_fn, iface_change_fn);
+		interface_no, context, deliver_fn, iface_change_fn, target_set_completed);
 
 	if (res == -1) {
 		goto error_destroy;
Index: exec/totemnet.h
===================================================================
--- exec/totemnet.h	(revision 2370)
+++ exec/totemnet.h	(working copy)
@@ -64,8 +64,11 @@
 
 	void (*iface_change_fn) (
 		void *context,
-		const struct totem_ip_address *iface_address));
+		const struct totem_ip_address *iface_address),
 
+	void (*target_set_completed) (
+		void *context));
+
 extern int totemnet_processor_count_set (
 	void *net_context,
 	int processor_count);
Index: exec/totemudp.c
===================================================================
--- exec/totemudp.c	(revision 2370)
+++ exec/totemudp.c	(working copy)
@@ -142,6 +142,8 @@
 		void *context,
 		const struct totem_ip_address *iface_address);
 
+	void (*totemudp_target_set_completed) (void *context);
+
 	/*
 	 * Function and data used to log messages
 	 */
@@ -1707,7 +1709,10 @@
 
 	void (*iface_change_fn) (
 		void *context,
-		const struct totem_ip_address *iface_address))
+		const struct totem_ip_address *iface_address),
+
+	void (*target_set_completed) (
+		void *context))
 {
 	struct totemudp_instance *instance;
 
@@ -1769,6 +1774,8 @@
 
 	instance->totemudp_iface_change_fn = iface_change_fn;
 
+	instance->totemudp_target_set_completed = target_set_completed;
+
 	totemip_localhost (instance->mcast_address.family, &localhost);
 
 	/*
@@ -1940,6 +1947,8 @@
 	memcpy (&instance->token_target, token_target,
 		sizeof (struct totem_ip_address));
 
+	instance->totemudp_target_set_completed (instance->context);
+
 	return (res);
 }
 
Index: exec/totemudp.h
===================================================================
--- exec/totemudp.h	(revision 2370)
+++ exec/totemudp.h	(working copy)
@@ -57,8 +57,11 @@
 
 	void (*iface_change_fn) (
 		void *context,
-		const struct totem_ip_address *iface_address));
+		const struct totem_ip_address *iface_address),
 
+	void (*target_set_completed) (
+		void *context));
+
 extern int totemudp_processor_count_set (
 	void *udp_context,
 	int processor_count);
Index: exec/totemrrp.c
===================================================================
--- exec/totemrrp.c	(revision 2370)
+++ exec/totemrrp.c	(working copy)
@@ -194,6 +194,9 @@
 		unsigned int *seqid,
 		unsigned int *token_is);
 
+	void (*totemrrp_target_set_completed) (
+		void *context);
+
 	unsigned int (*totemrrp_msgs_missing) (void);
 
 	/*
@@ -1432,6 +1435,13 @@
 	return (0);
 }
 
+static void rrp_target_set_completed (void *context)
+{
+	struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
+
+	deliver_fn_context->instance->totemrrp_target_set_completed (deliver_fn_context->context);
+}
+
 /*
  * Totem Redundant Ring interface
  * depends on poll abstraction, POSIX, IPV4
@@ -1461,7 +1471,9 @@
 		unsigned int *seqid,
 		unsigned int *token_is),
 
-	unsigned int (*msgs_missing) (void))
+	unsigned int (*msgs_missing) (void),
+
+	void (*target_set_completed) (void *context))
 {
 	struct totemrrp_instance *instance;
 	unsigned int res;
@@ -1504,6 +1516,8 @@
 
 	instance->totemrrp_token_seqid_get = token_seqid_get;
 
+	instance->totemrrp_target_set_completed = target_set_completed;
+
 	instance->totemrrp_msgs_missing = msgs_missing;
 
 	instance->interface_count = totem_config->interface_count;
@@ -1522,6 +1536,7 @@
 		deliver_fn_context->instance = instance;
 		deliver_fn_context->context = context;
 		deliver_fn_context->iface_no = i;
+printf ("deliver fn context %p\n", deliver_fn_context);
 
 		totemnet_initialize (
 			poll_handle,
@@ -1530,7 +1545,8 @@
 			i,
 			(void *)deliver_fn_context,
 			rrp_deliver_fn,
-			rrp_iface_change_fn);
+			rrp_iface_change_fn,
+			rrp_target_set_completed);
 
 		totemnet_net_mtu_adjust (instance->net_handles[i], totem_config);
 	}
Index: exec/totemsrp.c
===================================================================
--- exec/totemsrp.c	(revision 2370)
+++ exec/totemsrp.c	(working copy)
@@ -505,6 +505,10 @@
 	unsigned int my_cbl;
 
 	struct timeval pause_timestamp;
+
+	struct memb_commit_token *commit_token;
+
+	char commit_token_storage[9000];
 };
 
 struct message_handlers {
@@ -586,10 +590,12 @@
 
 static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
 	const struct memb_ring_id *ring_id);
-static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
-static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
-static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
-static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
+static void target_set_completed (void *context);
+static void memb_state_commit_token_update (struct totemsrp_instance *instance);
+static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
+static int memb_state_commit_token_send (struct totemsrp_instance *instance);
+static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
+static void memb_state_commit_token_create (struct totemsrp_instance *instance);
 static int token_hold_cancel_send (struct totemsrp_instance *instance);
 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
@@ -661,6 +667,8 @@
 	instance->my_high_seq_received = SEQNO_START_MSG;
 
 	instance->my_high_delivered = SEQNO_START_MSG;
+
+	instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
 }
 
 static void main_token_seqid_get (
@@ -872,7 +880,8 @@
 		main_deliver_fn,
 		main_iface_change_fn,
 		main_token_seqid_get,
-		main_msgs_missing);
+		main_msgs_missing,
+		target_set_completed);
 
 	/*
 	 * Must have net_mtu adjusted by totemrrp_initialize first
@@ -1792,24 +1801,26 @@
 
 static void timer_function_token_retransmit_timeout (void *data);
 
+static void target_set_completed (
+	void *context)
+{
+	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
+
+	memb_state_commit_token_send (instance);
+
+}
+
 static void memb_state_commit_enter (
-	struct totemsrp_instance *instance,
-	struct memb_commit_token *commit_token)
+	struct totemsrp_instance *instance)
 {
 	ring_save (instance);
 
 	old_ring_state_save (instance);
 
-	memb_state_commit_token_update (instance, commit_token);
+	memb_state_commit_token_update (instance);
 
-	memb_state_commit_token_target_set (instance, commit_token);
+	memb_state_commit_token_target_set (instance);
 
-	memb_ring_id_set_and_store (instance, &commit_token->ring_id);
-
-	memb_state_commit_token_send (instance, commit_token);
-
-	instance->token_ring_id_seq = instance->my_ring_id.seq;
-
 	poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
 
 	instance->memb_timer_state_gather_join_timeout = 0;
@@ -1818,21 +1829,27 @@
 
 	instance->memb_timer_state_gather_consensus_timeout = 0;
 
-	reset_token_timeout (instance); // REVIEWED
-	reset_token_retransmit_timeout (instance); // REVIEWED
+	memb_ring_id_set_and_store (instance, &instance->commit_token->ring_id);
 
+	instance->token_ring_id_seq = instance->my_ring_id.seq;
+
 	log_printf (instance->totemsrp_log_level_debug,
 		"entering COMMIT state.\n");
 
 	instance->memb_state = MEMB_STATE_COMMIT;
+	reset_token_retransmit_timeout (instance); // REVIEWED
+	reset_token_timeout (instance); // REVIEWED
 
+
 	/*
 	 * reset all flow control variables since we are starting a new ring
 	 */
 	instance->my_trc = 0;
 	instance->my_pbl = 0;
 	instance->my_cbl = 0;
-	return;
+	/*
+	 * commit token sent after callback that token target has been set
+	 */
 }
 
 static void memb_state_recovery_enter (
@@ -1863,7 +1880,7 @@
 
 	low_ring_aru = instance->old_ring_state_high_seq_received;
 
-	memb_state_commit_token_send (instance, commit_token);
+	memb_state_commit_token_send_recovery (instance, commit_token);
 
 	instance->my_token_seq = SEQNO_START_TOKEN - 1;
 
@@ -2604,27 +2621,26 @@
 }
 
 static void memb_state_commit_token_update (
-	struct totemsrp_instance *instance,
-	struct memb_commit_token *commit_token)
+	struct totemsrp_instance *instance)
 {
 	struct srp_addr *addr;
 	struct memb_commit_token_memb_entry *memb_list;
 	unsigned int high_aru;
 	unsigned int i;
 
-	addr = (struct srp_addr *)commit_token->end_of_commit_token;
-	memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
+	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
+	memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
 
 	memcpy (instance->my_new_memb_list, addr,
-		sizeof (struct srp_addr) * commit_token->addr_entries);
+		sizeof (struct srp_addr) * instance->commit_token->addr_entries);
 
-	instance->my_new_memb_entries = commit_token->addr_entries;
+	instance->my_new_memb_entries = instance->commit_token->addr_entries;
 
-	memcpy (&memb_list[commit_token->memb_index].ring_id,
+	memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
 		&instance->my_old_ring_id, sizeof (struct memb_ring_id));
 	assert (!totemip_zero_check(&instance->my_old_ring_id.rep));
 
-	memb_list[commit_token->memb_index].aru = instance->old_ring_state_aru;
+	memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
 	/*
 	 *  TODO high delivered is really instance->my_aru, but with safe this
 	 * could change?
@@ -2632,17 +2648,17 @@
 	instance->my_received_flg =
 		(instance->my_aru == instance->my_high_seq_received);
 
-	memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
+	memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
 
-	memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
+	memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
 	/*
 	 * find high aru up to current memb_index for all matching ring ids
 	 * if any ring id matching memb_index has aru less then high aru set
 	 * received flag for that entry to false
 	 */
-	high_aru = memb_list[commit_token->memb_index].aru;
-	for (i = 0; i <= commit_token->memb_index; i++) {
-		if (memcmp (&memb_list[commit_token->memb_index].ring_id,
+	high_aru = memb_list[instance->commit_token->memb_index].aru;
+	for (i = 0; i <= instance->commit_token->memb_index; i++) {
+		if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
 			&memb_list[i].ring_id,
 			sizeof (struct memb_ring_id)) == 0) {
 
@@ -2652,45 +2668,44 @@
 		}
 	}
 
-	for (i = 0; i <= commit_token->memb_index; i++) {
-		if (memcmp (&memb_list[commit_token->memb_index].ring_id,
+	for (i = 0; i <= instance->commit_token->memb_index; i++) {
+		if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
 			&memb_list[i].ring_id,
 			sizeof (struct memb_ring_id)) == 0) {
 
 			if (sq_lt_compare (memb_list[i].aru, high_aru)) {
 				memb_list[i].received_flg = 0;
-				if (i == commit_token->memb_index) {
+				if (i == instance->commit_token->memb_index) {
 					instance->my_received_flg = 0;
 				}
 			}
 		}
 	}
 
-	commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
-	commit_token->memb_index += 1;
-	assert (commit_token->memb_index <= commit_token->addr_entries);
-	assert (commit_token->header.nodeid);
+	instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
+	instance->commit_token->memb_index += 1;
+	assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
+	assert (instance->commit_token->header.nodeid);
 }
 
 static void memb_state_commit_token_target_set (
-	struct totemsrp_instance *instance,
-	struct memb_commit_token *commit_token)
+	struct totemsrp_instance *instance)
 {
 	struct srp_addr *addr;
 	unsigned int i;
 
-	addr = (struct srp_addr *)commit_token->end_of_commit_token;
+	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
 
 	for (i = 0; i < instance->totem_config->interface_count; i++) {
 		totemrrp_token_target_set (
 			instance->totemrrp_context,
-			&addr[commit_token->memb_index %
-				commit_token->addr_entries].addr[i],
+			&addr[instance->commit_token->memb_index %
+				instance->commit_token->addr_entries].addr[i],
 			i);
 	}
 }
 
-static int memb_state_commit_token_send (
+static int memb_state_commit_token_send_recovery (
 	struct totemsrp_instance *instance,
 	struct memb_commit_token *commit_token)
 {
@@ -2722,7 +2737,38 @@
 	return (0);
 }
 
+static int memb_state_commit_token_send (
+	struct totemsrp_instance *instance)
+{
+	struct srp_addr *addr;
+	struct memb_commit_token_memb_entry *memb_list;
+	unsigned int commit_token_size;
 
+	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
+	memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
+
+	instance->commit_token->token_seq++;
+	commit_token_size = sizeof (struct memb_commit_token) +
+		((sizeof (struct srp_addr) +
+			sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
+	/*
+	 * Make a copy for retransmission if necessary
+	 */
+	memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
+	instance->orf_token_retransmit_size = commit_token_size;
+
+	totemrrp_token_send (instance->totemrrp_context,
+		instance->commit_token,
+		commit_token_size);
+
+	/*
+	 * Request retransmission of the commit token in case it is lost
+	 */
+	reset_token_retransmit_timeout (instance);
+	return (0);
+}
+
+
 static int memb_lowest_in_config (struct totemsrp_instance *instance)
 {
 	struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
@@ -2756,8 +2802,7 @@
 }
 
 static void memb_state_commit_token_create (
-	struct totemsrp_instance *instance,
-	struct memb_commit_token *commit_token)
+	struct totemsrp_instance *instance)
 {
 	struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
 	struct srp_addr *addr;
@@ -2771,16 +2816,16 @@
 		instance->my_proc_list, instance->my_proc_list_entries,
 		instance->my_failed_list, instance->my_failed_list_entries);
 
-	memset (commit_token, 0, sizeof (struct memb_commit_token));
-	commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
-	commit_token->header.endian_detector = ENDIAN_LOCAL;
-	commit_token->header.encapsulated = 0;
-	commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
-	assert (commit_token->header.nodeid);
+	memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
+	instance->commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
+	instance->commit_token->header.endian_detector = ENDIAN_LOCAL;
+	instance->commit_token->header.encapsulated = 0;
+	instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
+	assert (instance->commit_token->header.nodeid);
 
-	totemip_copy(&commit_token->ring_id.rep, &instance->my_id.addr[0]);
+	totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
 
-	commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
+	instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
 
 	/*
 	 * This qsort is necessary to ensure the commit token traverses
@@ -2789,11 +2834,11 @@
 	qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
 		srp_addr_compare);
 
-	commit_token->memb_index = 0;
-	commit_token->addr_entries = token_memb_entries;
+	instance->commit_token->memb_index = 0;
+	instance->commit_token->addr_entries = token_memb_entries;
 
-	addr = (struct srp_addr *)commit_token->end_of_commit_token;
-	memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
+	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
+	memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
 
 	memcpy (addr, token_memb,
 		token_memb_entries * sizeof (struct srp_addr));
@@ -3349,6 +3394,12 @@
 
 		fcc_rtr_limit (instance, token, &transmits_allowed);
 		mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
+/*
+if (mcasted_regular) {
+printf ("mcasted regular %d\n", mcasted_regular);
+printf ("token seq %d\n", token->seq);
+}
+*/
 		fcc_token_update (instance, token, mcasted_retransmit +
 			mcasted_regular);
 
@@ -3779,15 +3830,18 @@
 	struct totemsrp_instance *instance,
 	const struct memb_join *memb_join)
 {
-	unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
-	struct memb_commit_token *my_commit_token =
-		(struct memb_commit_token *)commit_token_storage;
 	struct srp_addr *proc_list;
 	struct srp_addr *failed_list;
 
 	proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
 	failed_list = proc_list + memb_join->proc_list_entries;
 
+/*
+	memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
+	memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
+	memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
+	memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
+*/
 	if (memb_set_equal (proc_list,
 		memb_join->proc_list_entries,
 		instance->my_proc_list,
@@ -3803,9 +3857,9 @@
 		if (memb_consensus_agreed (instance) &&
 			memb_lowest_in_config (instance)) {
 
-			memb_state_commit_token_create (instance, my_commit_token);
+			memb_state_commit_token_create (instance);
 
-			memb_state_commit_enter (instance, my_commit_token);
+			memb_state_commit_enter (instance);
 		} else {
 			return (0);
 		}
@@ -4089,8 +4143,8 @@
 				sub_entries) &&
 
 				memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
-
-				memb_state_commit_enter (instance, memb_commit_token);
+				memcpy (instance->commit_token, memb_commit_token, msg_len);
+				memb_state_commit_enter (instance);
 			}
 			break;
 
@@ -4159,6 +4213,7 @@
 
 	if ((int)message_header->type >= totemsrp_message_handlers.count) {
 		log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong...  ignoring %d.\n", (int)message_header->type);
+printf ("wrong message type\n");
 		return;
 	}
 
Index: exec/totemrrp.h
===================================================================
--- exec/totemrrp.h	(revision 2370)
+++ exec/totemrrp.h	(working copy)
@@ -72,9 +72,13 @@
 		unsigned int *seqid,
 		unsigned int *token_is),
 
-	unsigned int (*msgs_missing) (void));
+	unsigned int (*msgs_missing) (void),
 
+	void (*target_set_completed) (
+		void *context)
+	);
 
+
 extern int totemrrp_processor_count_set (
 	void *rrp_context,
 	unsigned int processor_count);
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to