It can be quite a nuisance that you can't query the internal state of many openais plugins. Many times I have wanted to look into CPG to see just which groups are allocated and which nodes/processes are using them.

Well, this patch adds a call to libcpg to do just that. Because of the potential size of the output I've returned the list of groups as a callback. It minimises the amount of memory that needs to be allocated by both the exec and the library.

The patch is against trunk

Chrissie
Index: test/testcpg.c
===================================================================
--- test/testcpg.c	(revision 1568)
+++ test/testcpg.c	(working copy)
@@ -141,9 +141,36 @@
 	}
 }
 
+void GroupsGetCallback(cpg_handle_t handle,
+		       uint32_t groupnum,
+		       uint32_t groupmax,
+		       struct cpg_name *group_name,
+		       struct cpg_address *member_list, int member_list_entries)
+{
+	int i;
+	struct in_addr saddr;
+
+	printf("Groups List Callback %d/%d: ", groupnum, groupmax);
+	print_cpgname(group_name);
+	printf("\n");
+	for (i=0; i<member_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = member_list[i].nodeid;
+			printf("node/pid: %s/%d\n",
+			       inet_ntoa (saddr), member_list[i].pid);
+		}
+		else {
+			printf("node/pid: %d/%d\n",
+			       member_list[i].nodeid, member_list[i].pid);
+		}
+	}
+}
+
+
 cpg_callbacks_t callbacks = {
 	.cpg_deliver_fn =            DeliverCallback,
 	.cpg_confchg_fn =            ConfchgCallback,
+	.cpg_groups_get_fn =         GroupsGetCallback
 };
 
 void sigintr_handler (int signum) {
@@ -159,6 +186,7 @@
 	const char *options = "i";
 	int opt;
 	unsigned int nodeid;
+	unsigned int num_groups;
 
 	while ( (opt = getopt(argc, argv, options)) != -1 ) {
 		switch (opt) {
@@ -195,6 +223,13 @@
 		exit (1);
 	}
 
+	cpg_groups_get(handle, &num_groups);
+	if (result != SA_AIS_OK) {
+		printf ("Could not get list of groups, error %d\n", result);
+		exit (1);
+	}
+	printf("%d groups known to this node\n", num_groups);
+
 	FD_ZERO (&read_fds);
 	cpg_fd_get(handle, &select_fd);
 	printf ("Type EXIT to finish\n");
Index: include/ipc_cpg.h
===================================================================
--- include/ipc_cpg.h	(revision 1568)
+++ include/ipc_cpg.h	(working copy)
@@ -46,7 +46,8 @@
 	MESSAGE_REQ_CPG_MEMBERSHIP = 3,
 	MESSAGE_REQ_CPG_TRACKSTART = 4,
 	MESSAGE_REQ_CPG_TRACKSTOP = 5,
-	MESSAGE_REQ_CPG_LOCAL_GET = 6
+	MESSAGE_REQ_CPG_LOCAL_GET = 6,
+	MESSAGE_REQ_CPG_GROUPS_GET = 7
 };
 
 enum res_cpg_types {
@@ -59,7 +60,9 @@
 	MESSAGE_RES_CPG_TRACKSTART = 6,
 	MESSAGE_RES_CPG_TRACKSTOP = 7,
 	MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
-	MESSAGE_RES_CPG_LOCAL_GET = 9
+	MESSAGE_RES_CPG_LOCAL_GET = 9,
+	MESSAGE_RES_CPG_GROUPS_GET = 10,
+	MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
 };
 
 enum lib_cpg_confchg_reason {
@@ -158,4 +161,22 @@
 	mar_res_header_t header __attribute__((aligned(8)));
 };
 
+struct req_lib_cpg_groups_get {
+	mar_req_header_t header __attribute__((aligned(8)));
+};
+
+struct res_lib_cpg_groups_get {
+	mar_res_header_t header __attribute__((aligned(8)));
+	mar_uint32_t num_groups __attribute__((aligned(8)));
+};
+
+struct res_lib_cpg_groups_get_callback {
+	mar_res_header_t header __attribute__((aligned(8)));
+	mar_uint32_t group_num  __attribute__((aligned(8)));
+	mar_uint32_t total_groups  __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t num_members __attribute__((aligned(8)));
+	mar_cpg_address_t member_list[];
+};
+
 #endif /* IPC_CPG_H_DEFINED */
Index: include/cpg.h
===================================================================
--- include/cpg.h	(revision 1568)
+++ include/cpg.h	(working copy)
@@ -114,9 +114,17 @@
 	struct cpg_address *left_list, int left_list_entries,
 	struct cpg_address *joined_list, int joined_list_entries);
 
+typedef void (*cpg_groups_get_fn_t) (
+	cpg_handle_t handle,
+	uint32_t group_num,
+	uint32_t group_total,
+	struct cpg_name *group_name,
+	struct cpg_address *member_list, int member_list_entries);
+
 typedef struct {
 	cpg_deliver_fn_t cpg_deliver_fn;
 	cpg_confchg_fn_t cpg_confchg_fn;
+	cpg_groups_get_fn_t cpg_groups_get_fn;
 } cpg_callbacks_t;
 
 /** @} */
@@ -189,7 +197,6 @@
 	struct iovec *iovec,
 	int iov_len);
 
-
 /*
  * Get membership information from cpg
  */
@@ -203,6 +210,10 @@
 	cpg_handle_t handle,
 	unsigned int *local_nodeid);
 
+cpg_error_t cpg_groups_get (
+	cpg_handle_t handle,
+	unsigned int *num_groups);
+
 cpg_error_t cpg_flow_control_state_get (
 	cpg_handle_t handle,
 	cpg_flow_control_state_t *flow_control_enabled);
Index: exec/cpg.c
===================================================================
--- exec/cpg.c	(revision 1568)
+++ exec/cpg.c	(working copy)
@@ -1,10 +1,10 @@
 /*
- * Copyright (c) 2006 Red Hat, Inc.
+ * Copyright (c) 2006, 2008 Red Hat, Inc.
  * Copyright (c) 2006 Sun Microsystems, Inc.
  *
  * All rights reserved.
  *
- * Author: Patrick Caulfield ([EMAIL PROTECTED])
+ * Author: Christine Caulfield ([EMAIL PROTECTED])
  *
  * This software licensed under BSD license, the text of which follows:
  *
@@ -180,6 +180,8 @@
 
 static void message_handler_req_lib_cpg_local_get (void *conn, void *message);
 
+static void message_handler_req_lib_cpg_groups_get (void *conn, void *message);
+
 static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason);
 
 static int cpg_exec_send_joinlist(void);
@@ -234,6 +236,12 @@
 		.response_size				= sizeof (struct res_lib_cpg_local_get),
 		.response_id				= MESSAGE_RES_CPG_LOCAL_GET,
 		.flow_control				= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
+	},
+	{ /* 7 */
+		.lib_handler_fn				= message_handler_req_lib_cpg_groups_get,
+		.response_size				= sizeof (struct res_lib_cpg_groups_get),
+		.response_id				= MESSAGE_RES_CPG_GROUPS_GET,
+		.flow_control				= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
 	}
 };
 
@@ -494,6 +502,20 @@
 	return (0);
 }
 
+static int count_groups(void)
+{
+	struct list_head *iter;
+	int num_groups = 0;
+	uint32_t hash;
+
+	for (hash=0 ; hash < GROUP_HASH_SIZE; hash++) {
+		for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) {
+			num_groups++;
+		}
+	}
+	return num_groups;
+}
+
 static struct group_info *get_group(mar_cpg_name_t *name)
 {
 	struct list_head *iter;
@@ -523,6 +545,70 @@
 	return gi;
 }
 
+static void send_group_list_callbacks(int num_groups, void *conn)
+{
+	struct list_head *iter, *piter;
+	struct group_info *gi;
+	uint32_t hash;
+	int max_proc_count=0;
+	int size;
+	int group_counter = 0;
+	char *buf = NULL;
+	struct res_lib_cpg_groups_get_callback *res;
+	mar_cpg_address_t *retgi;
+
+	for (hash=0; hash < GROUP_HASH_SIZE; hash++) {
+		for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) {
+			gi = list_entry(iter, struct group_info, list);
+			int proc_count = 0;
+
+			/* First, we need to know how many processes are in the list */
+			for (piter = gi->members.next; piter != &gi->members; piter = piter->next) {
+				struct process_info *pi = list_entry(piter, struct process_info, list);
+				if (pi->pid)
+					proc_count++;
+			}
+
+			/* Make sure we have adequate buffer space */
+			if (proc_count > max_proc_count) {
+				max_proc_count = proc_count+10;
+				size = max_proc_count*sizeof(mar_cpg_address_t) +
+					sizeof(struct res_lib_cpg_groups_get_callback);
+				buf = realloc(buf, size);
+				if (!buf) {
+					log_printf(LOG_LEVEL_WARNING, "Unable to allocate group_list struct");
+					return;
+				}
+			}
+
+			res = (struct res_lib_cpg_groups_get_callback *)buf;
+			retgi = res->member_list;
+
+			res->header.size = size;
+			res->header.id = MESSAGE_RES_CPG_GROUPS_CALLBACK;
+
+
+			memcpy(&res->group_name, &gi->group_name, sizeof(mar_cpg_name_t));
+			res->num_members = proc_count;
+			res->group_num = ++group_counter;
+			res->total_groups = num_groups;
+
+			for (piter = gi->members.next; piter != &gi->members; piter = piter->next) {
+				struct process_info *pi = list_entry(piter, struct process_info, list);
+				if (pi->pid) {
+					retgi->nodeid = pi->nodeid;
+					retgi->pid = pi->pid;
+					retgi->reason = 0;
+					retgi++;
+				}
+			}
+			openais_conn_send_response(conn, buf, size);
+		}
+	}
+	if (buf)
+		free(buf);
+}
+
 static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason)
 {
 	struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
@@ -696,7 +782,7 @@
 	unsigned int i;
 
 	req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
- 
+
 	for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
 		req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
 	}
@@ -920,7 +1006,7 @@
 	/* Send to all interested members */
 	for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
 		struct process_info *pi = list_entry(iter, struct process_info, list);
-		if (pi->trackerconn) {
+		if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
 			openais_conn_send_response(
 				pi->trackerconn,
 				buf,
@@ -1225,3 +1311,21 @@
 	openais_conn_send_response(conn, &res_lib_cpg_local_get,
 		sizeof(res_lib_cpg_local_get));
 }
+
+static void message_handler_req_lib_cpg_groups_get (void *conn, void *message)
+{
+	struct res_lib_cpg_groups_get res_lib_cpg_groups_get;
+
+	res_lib_cpg_groups_get.header.size = sizeof(res_lib_cpg_groups_get);
+	res_lib_cpg_groups_get.header.id = MESSAGE_RES_CPG_GROUPS_GET;
+	res_lib_cpg_groups_get.header.error = SA_AIS_OK;
+	res_lib_cpg_groups_get.num_groups = count_groups();
+
+	openais_conn_send_response(conn, &res_lib_cpg_groups_get,
+		sizeof(res_lib_cpg_groups_get));
+
+	/* Now do the callbacks for each group */
+	send_group_list_callbacks(res_lib_cpg_groups_get.num_groups,
+		openais_conn_partner_get (conn));
+}
+
Index: lib/cpg.c
===================================================================
--- lib/cpg.c	(revision 1568)
+++ lib/cpg.c	(working copy)
@@ -72,7 +72,7 @@
 };
 
 /*
- * Clean up function for a cpg instance (cpg_nitialize) handle
+ * Clean up function for a cpg instance (cpg_initialize) handle
  */
 static void cpg_instance_destructor (void *instance)
 {
@@ -250,6 +250,7 @@
 	struct cpg_inst *cpg_inst;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+	struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
 	cpg_callbacks_t callbacks;
 	struct res_overlay dispatch_data;
 	int ignore_dispatch = 0;
@@ -396,7 +397,24 @@
 				joined_list,
 				res_cpg_confchg_callback->joined_list_entries);
 			break;
+		case MESSAGE_RES_CPG_GROUPS_CALLBACK:
+			res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data;
+			marshall_from_mar_cpg_name_t (
+				&group_name,
+				&res_lib_cpg_groups_get_callback->group_name);
+			for (i = 0; i < res_lib_cpg_groups_get_callback->num_members; i++) {
+				marshall_from_mar_cpg_address_t (&member_list[i],
+					&res_lib_cpg_groups_get_callback->member_list[i]);
+			}
 
+			callbacks.cpg_groups_get_fn(handle,
+						    res_lib_cpg_groups_get_callback->group_num,
+						    res_lib_cpg_groups_get_callback->total_groups,
+						    &group_name,
+						    member_list,
+						    res_lib_cpg_groups_get_callback->num_members);
+			break;
+
 		default:
 			error = SA_AIS_ERR_LIBRARY;
 			goto error_nounlock;
@@ -690,6 +708,48 @@
 	return (error);
 }
 
+cpg_error_t cpg_groups_get (
+	cpg_handle_t handle,
+	unsigned int *num_groups)
+{
+	cpg_error_t error;
+	struct cpg_inst *cpg_inst;
+	struct iovec iov;
+	struct req_lib_cpg_groups_get req_lib_cpg_groups_get;
+	struct res_lib_cpg_groups_get res_lib_cpg_groups_get;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != SA_AIS_OK) {
+		return (error);
+	}
+
+	req_lib_cpg_groups_get.header.size = sizeof (mar_req_header_t);
+	req_lib_cpg_groups_get.header.id = MESSAGE_REQ_CPG_GROUPS_GET;
+
+	iov.iov_base = &req_lib_cpg_groups_get;
+	iov.iov_len = sizeof (struct req_lib_cpg_groups_get);
+
+	pthread_mutex_lock (&cpg_inst->response_mutex);
+
+	error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1,
+		&res_lib_cpg_groups_get, sizeof (res_lib_cpg_groups_get));
+
+	pthread_mutex_unlock (&cpg_inst->response_mutex);
+
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
+
+	*num_groups = res_lib_cpg_groups_get.num_groups;
+	error = res_lib_cpg_groups_get.header.error;
+
+	/* Real output is delivered via a callback */
+error_exit:
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+
+	return (error);
+}
+
 cpg_error_t cpg_flow_control_state_get (
 	cpg_handle_t handle,
 	cpg_flow_control_state_t *flow_control_state)
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to