It can be quite a nuisance that you can't query the internal state of
many openais plugins. Many times I have wanted to look into CPG to see
just which groups are allocated and which nodes/processes are using them.
Well, this patch adds a call to libcpg to do just that. Because of the
potential size of the output I've returned the list of groups as a
callback. It minimises the amount of memory that needs to be allocated
by both the exec and the library.
The patch is against trunk
Chrissie
Index: test/testcpg.c
===================================================================
--- test/testcpg.c (revision 1568)
+++ test/testcpg.c (working copy)
@@ -141,9 +141,36 @@
}
}
+void GroupsGetCallback(cpg_handle_t handle,
+ uint32_t groupnum,
+ uint32_t groupmax,
+ struct cpg_name *group_name,
+ struct cpg_address *member_list, int member_list_entries)
+{
+ int i;
+ struct in_addr saddr;
+
+ printf("Groups List Callback %d/%d: ", groupnum, groupmax);
+ print_cpgname(group_name);
+ printf("\n");
+ for (i=0; i<member_list_entries; i++) {
+ if (show_ip) {
+ saddr.s_addr = member_list[i].nodeid;
+ printf("node/pid: %s/%d\n",
+ inet_ntoa (saddr), member_list[i].pid);
+ }
+ else {
+ printf("node/pid: %d/%d\n",
+ member_list[i].nodeid, member_list[i].pid);
+ }
+ }
+}
+
+
cpg_callbacks_t callbacks = {
.cpg_deliver_fn = DeliverCallback,
.cpg_confchg_fn = ConfchgCallback,
+ .cpg_groups_get_fn = GroupsGetCallback
};
void sigintr_handler (int signum) {
@@ -159,6 +186,7 @@
const char *options = "i";
int opt;
unsigned int nodeid;
+ unsigned int num_groups;
while ( (opt = getopt(argc, argv, options)) != -1 ) {
switch (opt) {
@@ -195,6 +223,13 @@
exit (1);
}
+ cpg_groups_get(handle, &num_groups);
+ if (result != SA_AIS_OK) {
+ printf ("Could not get list of groups, error %d\n", result);
+ exit (1);
+ }
+ printf("%d groups known to this node\n", num_groups);
+
FD_ZERO (&read_fds);
cpg_fd_get(handle, &select_fd);
printf ("Type EXIT to finish\n");
Index: include/ipc_cpg.h
===================================================================
--- include/ipc_cpg.h (revision 1568)
+++ include/ipc_cpg.h (working copy)
@@ -46,7 +46,8 @@
MESSAGE_REQ_CPG_MEMBERSHIP = 3,
MESSAGE_REQ_CPG_TRACKSTART = 4,
MESSAGE_REQ_CPG_TRACKSTOP = 5,
- MESSAGE_REQ_CPG_LOCAL_GET = 6
+ MESSAGE_REQ_CPG_LOCAL_GET = 6,
+ MESSAGE_REQ_CPG_GROUPS_GET = 7
};
enum res_cpg_types {
@@ -59,7 +60,9 @@
MESSAGE_RES_CPG_TRACKSTART = 6,
MESSAGE_RES_CPG_TRACKSTOP = 7,
MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
- MESSAGE_RES_CPG_LOCAL_GET = 9
+ MESSAGE_RES_CPG_LOCAL_GET = 9,
+ MESSAGE_RES_CPG_GROUPS_GET = 10,
+ MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
};
enum lib_cpg_confchg_reason {
@@ -158,4 +161,22 @@
mar_res_header_t header __attribute__((aligned(8)));
};
+struct req_lib_cpg_groups_get {
+ mar_req_header_t header __attribute__((aligned(8)));
+};
+
+struct res_lib_cpg_groups_get {
+ mar_res_header_t header __attribute__((aligned(8)));
+ mar_uint32_t num_groups __attribute__((aligned(8)));
+};
+
+struct res_lib_cpg_groups_get_callback {
+ mar_res_header_t header __attribute__((aligned(8)));
+ mar_uint32_t group_num __attribute__((aligned(8)));
+ mar_uint32_t total_groups __attribute__((aligned(8)));
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
+ mar_uint32_t num_members __attribute__((aligned(8)));
+ mar_cpg_address_t member_list[];
+};
+
#endif /* IPC_CPG_H_DEFINED */
Index: include/cpg.h
===================================================================
--- include/cpg.h (revision 1568)
+++ include/cpg.h (working copy)
@@ -114,9 +114,17 @@
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries);
+typedef void (*cpg_groups_get_fn_t) (
+ cpg_handle_t handle,
+ uint32_t group_num,
+ uint32_t group_total,
+ struct cpg_name *group_name,
+ struct cpg_address *member_list, int member_list_entries);
+
typedef struct {
cpg_deliver_fn_t cpg_deliver_fn;
cpg_confchg_fn_t cpg_confchg_fn;
+ cpg_groups_get_fn_t cpg_groups_get_fn;
} cpg_callbacks_t;
/** @} */
@@ -189,7 +197,6 @@
struct iovec *iovec,
int iov_len);
-
/*
* Get membership information from cpg
*/
@@ -203,6 +210,10 @@
cpg_handle_t handle,
unsigned int *local_nodeid);
+cpg_error_t cpg_groups_get (
+ cpg_handle_t handle,
+ unsigned int *num_groups);
+
cpg_error_t cpg_flow_control_state_get (
cpg_handle_t handle,
cpg_flow_control_state_t *flow_control_enabled);
Index: exec/cpg.c
===================================================================
--- exec/cpg.c (revision 1568)
+++ exec/cpg.c (working copy)
@@ -1,10 +1,10 @@
/*
- * Copyright (c) 2006 Red Hat, Inc.
+ * Copyright (c) 2006, 2008 Red Hat, Inc.
* Copyright (c) 2006 Sun Microsystems, Inc.
*
* All rights reserved.
*
- * Author: Patrick Caulfield ([EMAIL PROTECTED])
+ * Author: Christine Caulfield ([EMAIL PROTECTED])
*
* This software licensed under BSD license, the text of which follows:
*
@@ -180,6 +180,8 @@
static void message_handler_req_lib_cpg_local_get (void *conn, void *message);
+static void message_handler_req_lib_cpg_groups_get (void *conn, void *message);
+
static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason);
static int cpg_exec_send_joinlist(void);
@@ -234,6 +236,12 @@
.response_size = sizeof (struct res_lib_cpg_local_get),
.response_id = MESSAGE_RES_CPG_LOCAL_GET,
.flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
+ },
+ { /* 7 */
+ .lib_handler_fn = message_handler_req_lib_cpg_groups_get,
+ .response_size = sizeof (struct res_lib_cpg_groups_get),
+ .response_id = MESSAGE_RES_CPG_GROUPS_GET,
+ .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
}
};
@@ -494,6 +502,20 @@
return (0);
}
+static int count_groups(void)
+{
+ struct list_head *iter;
+ int num_groups = 0;
+ uint32_t hash;
+
+ for (hash=0 ; hash < GROUP_HASH_SIZE; hash++) {
+ for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) {
+ num_groups++;
+ }
+ }
+ return num_groups;
+}
+
static struct group_info *get_group(mar_cpg_name_t *name)
{
struct list_head *iter;
@@ -523,6 +545,70 @@
return gi;
}
+static void send_group_list_callbacks(int num_groups, void *conn)
+{
+ struct list_head *iter, *piter;
+ struct group_info *gi;
+ uint32_t hash;
+ int max_proc_count=0;
+ int size;
+ int group_counter = 0;
+ char *buf = NULL;
+ struct res_lib_cpg_groups_get_callback *res;
+ mar_cpg_address_t *retgi;
+
+ for (hash=0; hash < GROUP_HASH_SIZE; hash++) {
+ for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) {
+ gi = list_entry(iter, struct group_info, list);
+ int proc_count = 0;
+
+ /* First, we need to know how many processes are in the list */
+ for (piter = gi->members.next; piter != &gi->members; piter = piter->next) {
+ struct process_info *pi = list_entry(piter, struct process_info, list);
+ if (pi->pid)
+ proc_count++;
+ }
+
+ /* Make sure we have adequate buffer space */
+ if (proc_count > max_proc_count) {
+ max_proc_count = proc_count+10;
+ size = max_proc_count*sizeof(mar_cpg_address_t) +
+ sizeof(struct res_lib_cpg_groups_get_callback);
+ buf = realloc(buf, size);
+ if (!buf) {
+ log_printf(LOG_LEVEL_WARNING, "Unable to allocate group_list struct");
+ return;
+ }
+ }
+
+ res = (struct res_lib_cpg_groups_get_callback *)buf;
+ retgi = res->member_list;
+
+ res->header.size = size;
+ res->header.id = MESSAGE_RES_CPG_GROUPS_CALLBACK;
+
+
+ memcpy(&res->group_name, &gi->group_name, sizeof(mar_cpg_name_t));
+ res->num_members = proc_count;
+ res->group_num = ++group_counter;
+ res->total_groups = num_groups;
+
+ for (piter = gi->members.next; piter != &gi->members; piter = piter->next) {
+ struct process_info *pi = list_entry(piter, struct process_info, list);
+ if (pi->pid) {
+ retgi->nodeid = pi->nodeid;
+ retgi->pid = pi->pid;
+ retgi->reason = 0;
+ retgi++;
+ }
+ }
+ openais_conn_send_response(conn, buf, size);
+ }
+ }
+ if (buf)
+ free(buf);
+}
+
static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason)
{
struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
@@ -696,7 +782,7 @@
unsigned int i;
req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
-
+
for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
}
@@ -920,7 +1006,7 @@
/* Send to all interested members */
for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
struct process_info *pi = list_entry(iter, struct process_info, list);
- if (pi->trackerconn) {
+ if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
openais_conn_send_response(
pi->trackerconn,
buf,
@@ -1225,3 +1311,21 @@
openais_conn_send_response(conn, &res_lib_cpg_local_get,
sizeof(res_lib_cpg_local_get));
}
+
+static void message_handler_req_lib_cpg_groups_get (void *conn, void *message)
+{
+ struct res_lib_cpg_groups_get res_lib_cpg_groups_get;
+
+ res_lib_cpg_groups_get.header.size = sizeof(res_lib_cpg_groups_get);
+ res_lib_cpg_groups_get.header.id = MESSAGE_RES_CPG_GROUPS_GET;
+ res_lib_cpg_groups_get.header.error = SA_AIS_OK;
+ res_lib_cpg_groups_get.num_groups = count_groups();
+
+ openais_conn_send_response(conn, &res_lib_cpg_groups_get,
+ sizeof(res_lib_cpg_groups_get));
+
+ /* Now do the callbacks for each group */
+ send_group_list_callbacks(res_lib_cpg_groups_get.num_groups,
+ openais_conn_partner_get (conn));
+}
+
Index: lib/cpg.c
===================================================================
--- lib/cpg.c (revision 1568)
+++ lib/cpg.c (working copy)
@@ -72,7 +72,7 @@
};
/*
- * Clean up function for a cpg instance (cpg_nitialize) handle
+ * Clean up function for a cpg instance (cpg_initialize) handle
*/
static void cpg_instance_destructor (void *instance)
{
@@ -250,6 +250,7 @@
struct cpg_inst *cpg_inst;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+ struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
cpg_callbacks_t callbacks;
struct res_overlay dispatch_data;
int ignore_dispatch = 0;
@@ -396,7 +397,24 @@
joined_list,
res_cpg_confchg_callback->joined_list_entries);
break;
+ case MESSAGE_RES_CPG_GROUPS_CALLBACK:
+ res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data;
+ marshall_from_mar_cpg_name_t (
+ &group_name,
+ &res_lib_cpg_groups_get_callback->group_name);
+ for (i = 0; i < res_lib_cpg_groups_get_callback->num_members; i++) {
+ marshall_from_mar_cpg_address_t (&member_list[i],
+ &res_lib_cpg_groups_get_callback->member_list[i]);
+ }
+ callbacks.cpg_groups_get_fn(handle,
+ res_lib_cpg_groups_get_callback->group_num,
+ res_lib_cpg_groups_get_callback->total_groups,
+ &group_name,
+ member_list,
+ res_lib_cpg_groups_get_callback->num_members);
+ break;
+
default:
error = SA_AIS_ERR_LIBRARY;
goto error_nounlock;
@@ -690,6 +708,48 @@
return (error);
}
+cpg_error_t cpg_groups_get (
+ cpg_handle_t handle,
+ unsigned int *num_groups)
+{
+ cpg_error_t error;
+ struct cpg_inst *cpg_inst;
+ struct iovec iov;
+ struct req_lib_cpg_groups_get req_lib_cpg_groups_get;
+ struct res_lib_cpg_groups_get res_lib_cpg_groups_get;
+
+ error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ req_lib_cpg_groups_get.header.size = sizeof (mar_req_header_t);
+ req_lib_cpg_groups_get.header.id = MESSAGE_REQ_CPG_GROUPS_GET;
+
+ iov.iov_base = &req_lib_cpg_groups_get;
+ iov.iov_len = sizeof (struct req_lib_cpg_groups_get);
+
+ pthread_mutex_lock (&cpg_inst->response_mutex);
+
+ error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1,
+ &res_lib_cpg_groups_get, sizeof (res_lib_cpg_groups_get));
+
+ pthread_mutex_unlock (&cpg_inst->response_mutex);
+
+ if (error != SA_AIS_OK) {
+ goto error_exit;
+ }
+
+ *num_groups = res_lib_cpg_groups_get.num_groups;
+ error = res_lib_cpg_groups_get.header.error;
+
+ /* Real output is delivered via a callback */
+error_exit:
+ saHandleInstancePut (&cpg_handle_t_db, handle);
+
+ return (error);
+}
+
cpg_error_t cpg_flow_control_state_get (
cpg_handle_t handle,
cpg_flow_control_state_t *flow_control_state)
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais