Attached is patch implementing cpg_model_init and CPG_MODEL_V1 has additional callback with totem ringid and members changes. It is basically merge of two previous patches. I made that patch one big patch, because in such case, we don't need to solve some "incompatibilities"
It is applicable to todays trunk. Man page is in progress, but testcpg is modified to use cpg_model_init -> reference. Some problems Steve had with old patch should be fixed now. Regards, Honza
commit 1738d676e75131ac6d36326c39a2d165e731a5d8 Author: Jan Friesse <[email protected]> Date: Thu Apr 1 16:42:32 2010 +0200 CPG model_initialize and ringid + members callback Patch adds new function to initialize cpg, cpg_model_initialize. Model is set of callbacks. With this function, future addions of models should be possible without changing the ABI. Patch also contains callback in CPG_MODEL_V1 for notification about Totem membership changes. diff --git a/trunk/include/corosync/cpg.h b/trunk/include/corosync/cpg.h index b5609df..fbca6a1 100644 --- a/trunk/include/corosync/cpg.h +++ b/trunk/include/corosync/cpg.h @@ -78,6 +78,10 @@ typedef enum { CPG_ITERATION_ALL = 3, } cpg_iteration_type_t; +typedef enum { + CPG_MODEL_V1 = 1, +} cpg_model_t; + struct cpg_address { uint32_t nodeid; uint32_t pid; @@ -98,6 +102,11 @@ struct cpg_iteration_description_t { uint32_t pid; }; +struct cpg_ring_id { + uint32_t nodeid; + uint64_t seq; +}; + typedef void (*cpg_deliver_fn_t) ( cpg_handle_t handle, const struct cpg_name *group_name, @@ -117,11 +126,29 @@ typedef void (*cpg_confchg_fn_t) ( const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries); +typedef void (*cpg_totem_confchg_fn_t) ( + cpg_handle_t handle, + struct cpg_ring_id ring_id, + uint32_t member_list_entries, + const uint32_t *member_list); + typedef struct { cpg_deliver_fn_t cpg_deliver_fn; cpg_confchg_fn_t cpg_confchg_fn; } cpg_callbacks_t; +typedef struct { + cpg_model_t model; +} cpg_model_data_t; + +typedef struct { + cpg_model_t model; + cpg_deliver_fn_t cpg_deliver_fn; + cpg_confchg_fn_t cpg_confchg_fn; + cpg_totem_confchg_fn_t cpg_totem_confchg_fn; +} cpg_model_v1_data_t; + + /** @} */ /* @@ -132,6 +159,15 @@ cs_error_t cpg_initialize ( cpg_callbacks_t *callbacks); /* + * Create a new cpg connection, initialize with model + */ +cs_error_t cpg_model_initialize ( + cpg_handle_t *handle, + cpg_model_t model, + cpg_model_data_t *model_data, + void *context); + +/* * Close the cpg handle */ cs_error_t cpg_finalize ( diff --git a/trunk/include/corosync/ipc_cpg.h b/trunk/include/corosync/ipc_cpg.h index 8f55ae8..633a158 100644 --- a/trunk/include/corosync/ipc_cpg.h +++ b/trunk/include/corosync/ipc_cpg.h @@ -65,6 +65,7 @@ enum res_cpg_types { MESSAGE_RES_CPG_ITERATIONNEXT = 10, MESSAGE_RES_CPG_ITERATIONFINALIZE = 11, MESSAGE_RES_CPG_FINALIZE = 12, + MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK = 13, }; enum lib_cpg_confchg_reason { @@ -149,6 +150,19 @@ static inline void marshall_from_mar_cpg_iteration_description_t( marshall_from_mar_cpg_name_t (&dest->group, &src->group); }; +typedef struct { + mar_uint32_t nodeid __attribute__((aligned(8))); + mar_uint64_t seq __attribute__((aligned(8))); +} mar_cpg_ring_id_t; + +static inline void marshall_from_mar_cpg_ring_id_t ( + struct cpg_ring_id *dest, + const mar_cpg_ring_id_t *src) +{ + dest->nodeid = src->nodeid; + dest->seq = src->seq; +} + struct req_lib_cpg_join { coroipc_request_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); @@ -238,6 +252,13 @@ struct res_lib_cpg_confchg_callback { // struct cpg_address joined_list[]; }; +struct res_lib_cpg_totem_confchg_callback { + coroipc_response_header_t header __attribute__((aligned(8))); + mar_cpg_ring_id_t ring_id __attribute__((aligned(8))); + mar_uint32_t member_list_entries __attribute__((aligned(8))); + mar_uint32_t member_list[]; +}; + struct req_lib_cpg_leave { coroipc_request_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); diff --git a/trunk/lib/cpg.c b/trunk/lib/cpg.c index 79836e4..1844feb 100644 --- a/trunk/lib/cpg.c +++ b/trunk/lib/cpg.c @@ -62,8 +62,11 @@ struct cpg_inst { hdb_handle_t handle; int finalize; - cpg_callbacks_t callbacks; void *context; + union { + cpg_model_data_t model_data; + cpg_model_v1_data_t model_v1_data; + }; struct list_head iteration_list_head; }; @@ -118,9 +121,32 @@ cs_error_t cpg_initialize ( cpg_handle_t *handle, cpg_callbacks_t *callbacks) { + cpg_model_v1_data_t model_v1_data; + + memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t)); + + if (callbacks) { + model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn; + model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn; + } + + return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL)); +} + +cs_error_t cpg_model_initialize ( + cpg_handle_t *handle, + cpg_model_t model, + cpg_model_data_t *model_data, + void *context) +{ cs_error_t error; struct cpg_inst *cpg_inst; + if (model != CPG_MODEL_V1) { + error = CPG_ERR_INVALID_PARAM; + goto error_no_destroy; + } + error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle)); if (error != CS_OK) { goto error_no_destroy; @@ -142,10 +168,21 @@ cs_error_t cpg_initialize ( goto error_put_destroy; } - if (callbacks) { - memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t)); + if (model_data != NULL) { + switch (model) { + case CPG_MODEL_V1: + memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t)); + break; + default: + error = CS_ERR_LIBRARY; + goto error_destroy; + break; + } } + cpg_inst->model_data.model = model; + cpg_inst->context = context; + list_init(&cpg_inst->iteration_list_head); hdb_handle_put (&cpg_handle_t_db, *handle); @@ -283,7 +320,8 @@ cs_error_t cpg_dispatch ( struct cpg_inst *cpg_inst; struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; - cpg_callbacks_t callbacks; + struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback; + struct cpg_inst cpg_inst_copy; coroipc_response_header_t *dispatch_data; struct cpg_address member_list[CPG_MEMBERS_MAX]; struct cpg_address left_list[CPG_MEMBERS_MAX]; @@ -292,6 +330,8 @@ cs_error_t cpg_dispatch ( mar_cpg_address_t *left_list_start; mar_cpg_address_t *joined_list_start; unsigned int i; + struct cpg_ring_id ring_id; + uint32_t totem_member_list[CPG_MEMBERS_MAX]; error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst)); if (error != CS_OK) { @@ -332,74 +372,96 @@ cs_error_t cpg_dispatch ( * A risk of this dispatch method is that the callback routines may * operate at the same time that cpgFinalize has been called. */ - memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t)); - /* - * Dispatch incoming message - */ - switch (dispatch_data->id) { - case MESSAGE_RES_CPG_DELIVER_CALLBACK: - if (callbacks.cpg_deliver_fn == NULL) { - continue; - } - - res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data; - - marshall_from_mar_cpg_name_t ( - &group_name, - &res_cpg_deliver_callback->group_name); - - callbacks.cpg_deliver_fn (handle, - &group_name, - res_cpg_deliver_callback->nodeid, - res_cpg_deliver_callback->pid, - &res_cpg_deliver_callback->message, - res_cpg_deliver_callback->msglen); - break; - - case MESSAGE_RES_CPG_CONFCHG_CALLBACK: - if (callbacks.cpg_confchg_fn == NULL) { - continue; - } - - res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data; - - for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) { - marshall_from_mar_cpg_address_t (&member_list[i], - &res_cpg_confchg_callback->member_list[i]); - } - left_list_start = res_cpg_confchg_callback->member_list + - res_cpg_confchg_callback->member_list_entries; - for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) { - marshall_from_mar_cpg_address_t (&left_list[i], - &left_list_start[i]); - } - joined_list_start = res_cpg_confchg_callback->member_list + - res_cpg_confchg_callback->member_list_entries + - res_cpg_confchg_callback->left_list_entries; - for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) { - marshall_from_mar_cpg_address_t (&joined_list[i], - &joined_list_start[i]); - } - marshall_from_mar_cpg_name_t ( - &group_name, - &res_cpg_confchg_callback->group_name); - - callbacks.cpg_confchg_fn (handle, - &group_name, - member_list, - res_cpg_confchg_callback->member_list_entries, - left_list, - res_cpg_confchg_callback->left_list_entries, - joined_list, - res_cpg_confchg_callback->joined_list_entries); - break; - - default: - coroipcc_dispatch_put (cpg_inst->handle); - error = CS_ERR_LIBRARY; - goto error_put; - break; - } + memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst)); + + switch (cpg_inst_copy.model_data.model) { + case CPG_MODEL_V1: + /* + * Dispatch incoming message + */ + switch (dispatch_data->id) { + case MESSAGE_RES_CPG_DELIVER_CALLBACK: + if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) { + continue; + } + + res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data; + + marshall_from_mar_cpg_name_t ( + &group_name, + &res_cpg_deliver_callback->group_name); + + cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle, + &group_name, + res_cpg_deliver_callback->nodeid, + res_cpg_deliver_callback->pid, + &res_cpg_deliver_callback->message, + res_cpg_deliver_callback->msglen); + break; + + case MESSAGE_RES_CPG_CONFCHG_CALLBACK: + if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) { + continue; + } + + res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data; + + for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) { + marshall_from_mar_cpg_address_t (&member_list[i], + &res_cpg_confchg_callback->member_list[i]); + } + left_list_start = res_cpg_confchg_callback->member_list + + res_cpg_confchg_callback->member_list_entries; + for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) { + marshall_from_mar_cpg_address_t (&left_list[i], + &left_list_start[i]); + } + joined_list_start = res_cpg_confchg_callback->member_list + + res_cpg_confchg_callback->member_list_entries + + res_cpg_confchg_callback->left_list_entries; + for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) { + marshall_from_mar_cpg_address_t (&joined_list[i], + &joined_list_start[i]); + } + marshall_from_mar_cpg_name_t ( + &group_name, + &res_cpg_confchg_callback->group_name); + + cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle, + &group_name, + member_list, + res_cpg_confchg_callback->member_list_entries, + left_list, + res_cpg_confchg_callback->left_list_entries, + joined_list, + res_cpg_confchg_callback->joined_list_entries); + + break; + case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK: + if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) { + continue; + } + + res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data; + + marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id); + for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) { + totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i]; + } + + cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle, + ring_id, + res_cpg_totem_confchg_callback->member_list_entries, + totem_member_list); + break; + default: + coroipcc_dispatch_put (cpg_inst->handle); + error = CS_ERR_LIBRARY; + goto error_put; + break; + } /* - switch (dispatch_data->id) */ + break; /* case CPG_MODEL_V1 */ + } /* - switch (cpg_inst_copy.model_data.model) */ coroipcc_dispatch_put (cpg_inst->handle); /* diff --git a/trunk/lib/libcpg.verso b/trunk/lib/libcpg.verso index 1454f6e..ee74734 100644 --- a/trunk/lib/libcpg.verso +++ b/trunk/lib/libcpg.verso @@ -1 +1 @@ -4.0.1 +4.1.0 diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c index ede426f..48e18bf 100644 --- a/trunk/services/cpg.c +++ b/trunk/services/cpg.c @@ -160,6 +160,8 @@ static struct corosync_api_v1 *api = NULL; static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST; +static mar_cpg_ring_id_t last_sync_ring_id; + struct process_info { unsigned int nodeid; uint32_t pid; @@ -255,6 +257,10 @@ static void cpg_sync_activate (void); static void cpg_sync_abort (void); +static int notify_lib_totem_membership ( + int member_list_entries, + const unsigned int *member_list); + /* * Library Handler Definition */ @@ -432,6 +438,9 @@ static void cpg_sync_init_v2 ( sizeof (unsigned int)); my_member_list_entries = member_list_entries; + last_sync_ring_id.nodeid = ring_id->rep.nodeid; + last_sync_ring_id.seq = ring_id->seq; + for (i = 0; i < my_member_list_entries; i++) { if (my_member_list[i] < lowest_nodeid) { lowest_nodeid = my_member_list[i]; @@ -482,13 +491,45 @@ static void cpg_sync_activate (void) memcpy (my_old_member_list, my_member_list, my_member_list_entries * sizeof (unsigned int)); my_old_member_list_entries = my_member_list_entries; + + notify_lib_totem_membership (my_member_list_entries, my_member_list); } static void cpg_sync_abort (void) { } +static int notify_lib_totem_membership ( + int member_list_entries, + const unsigned int *member_list) +{ + struct list_head *iter; + char *buf; + int size; + struct res_lib_cpg_totem_confchg_callback *res; + size = sizeof(struct res_lib_cpg_totem_confchg_callback) + + sizeof(mar_uint32_t) * (member_list_entries); + buf = alloca(size); + if (!buf) + return CPG_ERR_LIBRARY; + + res = (struct res_lib_cpg_totem_confchg_callback *)buf; + res->member_list_entries = member_list_entries; + res->header.size = size; + res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK; + res->header.error = CS_OK; + + memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t)); + memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t)); + + for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) { + struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list); + api->ipc_dispatch_send (cpg_pd->conn, buf, size); + } + + return CPG_OK; +} static int notify_lib_joinlist( const mar_cpg_name_t *group_name, diff --git a/trunk/test/testcpg.c b/trunk/test/testcpg.c index 2abe83d..50bab97 100644 --- a/trunk/test/testcpg.c +++ b/trunk/test/testcpg.c @@ -132,9 +132,28 @@ static void ConfchgCallback ( } } -static cpg_callbacks_t callbacks = { +static void TotemConfchgCallback ( + cpg_handle_t handle, + struct cpg_ring_id ring_id, + uint32_t member_list_entries, + const uint32_t *member_list) +{ + int i; + + printf("\nTotemConfchgCallback: ringid (%u.%llu)\n", ring_id.nodeid, ring_id.seq); + + printf("active processors %lu: ", + (unsigned long int) member_list_entries); + for (i=0; i<member_list_entries; i++) { + printf("%d ", member_list[i]); + } + printf ("\n"); +} + +static cpg_model_v1_data_t callbacks = { .cpg_deliver_fn = DeliverCallback, .cpg_confchg_fn = ConfchgCallback, + .cpg_totem_confchg_fn = TotemConfchgCallback, }; static void sigintr_handler (int signum) __attribute__((__noreturn__)); @@ -170,7 +189,7 @@ int main (int argc, char *argv[]) { group_name.length = 6; } - result = cpg_initialize (&handle, &callbacks); + result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&callbacks, NULL); if (result != CS_OK) { printf ("Could not initialize Cluster Process Group API instance error %d\n", result); exit (1);
_______________________________________________ Openais mailing list [email protected] https://lists.linux-foundation.org/mailman/listinfo/openais
