Same patch but rebased on top of Steve's change (today trunk). Regards, Honza
Jan Friesse wrote: > Attached is patch implementing cpg_model_init and CPG_MODEL_V1 has > additional callback with totem ringid and members changes. It is > basically merge of two previous patches. I made that patch one big > patch, because in such case, we don't need to solve some "incompatibilities" > > It is applicable to todays trunk. > > Man page is in progress, but testcpg is modified to use cpg_model_init > -> reference. > > Some problems Steve had with old patch should be fixed now. > > Regards, > Honza > >
commit 662ce5c79accd67a8e37219a647f671a15c4f495 Author: Jan Friesse <[email protected]> Date: Tue Apr 6 14:02:29 2010 +0200 CPG model_initialize and ringid + members callback Patch adds new function to initialize cpg, cpg_model_initialize. Model is set of callbacks. With this function, future addions of models should be possible without changing the ABI. Patch also contains callback in CPG_MODEL_V1 for notification about Totem membership changes. diff --git a/trunk/include/corosync/cpg.h b/trunk/include/corosync/cpg.h index b5609df..fbca6a1 100644 --- a/trunk/include/corosync/cpg.h +++ b/trunk/include/corosync/cpg.h @@ -78,6 +78,10 @@ typedef enum { CPG_ITERATION_ALL = 3, } cpg_iteration_type_t; +typedef enum { + CPG_MODEL_V1 = 1, +} cpg_model_t; + struct cpg_address { uint32_t nodeid; uint32_t pid; @@ -98,6 +102,11 @@ struct cpg_iteration_description_t { uint32_t pid; }; +struct cpg_ring_id { + uint32_t nodeid; + uint64_t seq; +}; + typedef void (*cpg_deliver_fn_t) ( cpg_handle_t handle, const struct cpg_name *group_name, @@ -117,11 +126,29 @@ typedef void (*cpg_confchg_fn_t) ( const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries); +typedef void (*cpg_totem_confchg_fn_t) ( + cpg_handle_t handle, + struct cpg_ring_id ring_id, + uint32_t member_list_entries, + const uint32_t *member_list); + typedef struct { cpg_deliver_fn_t cpg_deliver_fn; cpg_confchg_fn_t cpg_confchg_fn; } cpg_callbacks_t; +typedef struct { + cpg_model_t model; +} cpg_model_data_t; + +typedef struct { + cpg_model_t model; + cpg_deliver_fn_t cpg_deliver_fn; + cpg_confchg_fn_t cpg_confchg_fn; + cpg_totem_confchg_fn_t cpg_totem_confchg_fn; +} cpg_model_v1_data_t; + + /** @} */ /* @@ -132,6 +159,15 @@ cs_error_t cpg_initialize ( cpg_callbacks_t *callbacks); /* + * Create a new cpg connection, initialize with model + */ +cs_error_t cpg_model_initialize ( + cpg_handle_t *handle, + cpg_model_t model, + cpg_model_data_t *model_data, + void *context); + +/* * Close the cpg handle */ cs_error_t cpg_finalize ( diff --git a/trunk/include/corosync/ipc_cpg.h b/trunk/include/corosync/ipc_cpg.h index 8f55ae8..633a158 100644 --- a/trunk/include/corosync/ipc_cpg.h +++ b/trunk/include/corosync/ipc_cpg.h @@ -65,6 +65,7 @@ enum res_cpg_types { MESSAGE_RES_CPG_ITERATIONNEXT = 10, MESSAGE_RES_CPG_ITERATIONFINALIZE = 11, MESSAGE_RES_CPG_FINALIZE = 12, + MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK = 13, }; enum lib_cpg_confchg_reason { @@ -149,6 +150,19 @@ static inline void marshall_from_mar_cpg_iteration_description_t( marshall_from_mar_cpg_name_t (&dest->group, &src->group); }; +typedef struct { + mar_uint32_t nodeid __attribute__((aligned(8))); + mar_uint64_t seq __attribute__((aligned(8))); +} mar_cpg_ring_id_t; + +static inline void marshall_from_mar_cpg_ring_id_t ( + struct cpg_ring_id *dest, + const mar_cpg_ring_id_t *src) +{ + dest->nodeid = src->nodeid; + dest->seq = src->seq; +} + struct req_lib_cpg_join { coroipc_request_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); @@ -238,6 +252,13 @@ struct res_lib_cpg_confchg_callback { // struct cpg_address joined_list[]; }; +struct res_lib_cpg_totem_confchg_callback { + coroipc_response_header_t header __attribute__((aligned(8))); + mar_cpg_ring_id_t ring_id __attribute__((aligned(8))); + mar_uint32_t member_list_entries __attribute__((aligned(8))); + mar_uint32_t member_list[]; +}; + struct req_lib_cpg_leave { coroipc_request_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); diff --git a/trunk/lib/cpg.c b/trunk/lib/cpg.c index 6b58784..5470221 100644 --- a/trunk/lib/cpg.c +++ b/trunk/lib/cpg.c @@ -62,8 +62,11 @@ struct cpg_inst { hdb_handle_t handle; int finalize; - cpg_callbacks_t callbacks; void *context; + union { + cpg_model_data_t model_data; + cpg_model_v1_data_t model_v1_data; + }; struct list_head iteration_list_head; }; @@ -118,9 +121,32 @@ cs_error_t cpg_initialize ( cpg_handle_t *handle, cpg_callbacks_t *callbacks) { + cpg_model_v1_data_t model_v1_data; + + memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t)); + + if (callbacks) { + model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn; + model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn; + } + + return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL)); +} + +cs_error_t cpg_model_initialize ( + cpg_handle_t *handle, + cpg_model_t model, + cpg_model_data_t *model_data, + void *context) +{ cs_error_t error; struct cpg_inst *cpg_inst; + if (model != CPG_MODEL_V1) { + error = CPG_ERR_INVALID_PARAM; + goto error_no_destroy; + } + error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle)); if (error != CS_OK) { goto error_no_destroy; @@ -142,10 +168,21 @@ cs_error_t cpg_initialize ( goto error_put_destroy; } - if (callbacks) { - memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t)); + if (model_data != NULL) { + switch (model) { + case CPG_MODEL_V1: + memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t)); + break; + default: + error = CS_ERR_LIBRARY; + goto error_destroy; + break; + } } + cpg_inst->model_data.model = model; + cpg_inst->context = context; + list_init(&cpg_inst->iteration_list_head); hdb_handle_put (&cpg_handle_t_db, *handle); @@ -283,7 +320,8 @@ cs_error_t cpg_dispatch ( struct cpg_inst *cpg_inst; struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; - cpg_callbacks_t callbacks; + struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback; + struct cpg_inst cpg_inst_copy; coroipc_response_header_t *dispatch_data; struct cpg_address member_list[CPG_MEMBERS_MAX]; struct cpg_address left_list[CPG_MEMBERS_MAX]; @@ -292,6 +330,8 @@ cs_error_t cpg_dispatch ( mar_cpg_address_t *left_list_start; mar_cpg_address_t *joined_list_start; unsigned int i; + struct cpg_ring_id ring_id; + uint32_t totem_member_list[CPG_MEMBERS_MAX]; error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst)); if (error != CS_OK) { @@ -332,74 +372,96 @@ cs_error_t cpg_dispatch ( * A risk of this dispatch method is that the callback routines may * operate at the same time that cpgFinalize has been called. */ - memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t)); - /* - * Dispatch incoming message - */ - switch (dispatch_data->id) { - case MESSAGE_RES_CPG_DELIVER_CALLBACK: - if (callbacks.cpg_deliver_fn == NULL) { + memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst)); + + switch (cpg_inst_copy.model_data.model) { + case CPG_MODEL_V1: + /* + * Dispatch incoming message + */ + switch (dispatch_data->id) { + case MESSAGE_RES_CPG_DELIVER_CALLBACK: + if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) { + break; + } + + res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data; + + marshall_from_mar_cpg_name_t ( + &group_name, + &res_cpg_deliver_callback->group_name); + + cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle, + &group_name, + res_cpg_deliver_callback->nodeid, + res_cpg_deliver_callback->pid, + &res_cpg_deliver_callback->message, + res_cpg_deliver_callback->msglen); break; - } - res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data; - - marshall_from_mar_cpg_name_t ( - &group_name, - &res_cpg_deliver_callback->group_name); - - callbacks.cpg_deliver_fn (handle, - &group_name, - res_cpg_deliver_callback->nodeid, - res_cpg_deliver_callback->pid, - &res_cpg_deliver_callback->message, - res_cpg_deliver_callback->msglen); - break; + case MESSAGE_RES_CPG_CONFCHG_CALLBACK: + if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) { + break; + } + + res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data; + + for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) { + marshall_from_mar_cpg_address_t (&member_list[i], + &res_cpg_confchg_callback->member_list[i]); + } + left_list_start = res_cpg_confchg_callback->member_list + + res_cpg_confchg_callback->member_list_entries; + for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) { + marshall_from_mar_cpg_address_t (&left_list[i], + &left_list_start[i]); + } + joined_list_start = res_cpg_confchg_callback->member_list + + res_cpg_confchg_callback->member_list_entries + + res_cpg_confchg_callback->left_list_entries; + for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) { + marshall_from_mar_cpg_address_t (&joined_list[i], + &joined_list_start[i]); + } + marshall_from_mar_cpg_name_t ( + &group_name, + &res_cpg_confchg_callback->group_name); + + cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle, + &group_name, + member_list, + res_cpg_confchg_callback->member_list_entries, + left_list, + res_cpg_confchg_callback->left_list_entries, + joined_list, + res_cpg_confchg_callback->joined_list_entries); - case MESSAGE_RES_CPG_CONFCHG_CALLBACK: - if (callbacks.cpg_confchg_fn == NULL) { break; - } - - res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data; - - for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) { - marshall_from_mar_cpg_address_t (&member_list[i], - &res_cpg_confchg_callback->member_list[i]); - } - left_list_start = res_cpg_confchg_callback->member_list + - res_cpg_confchg_callback->member_list_entries; - for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) { - marshall_from_mar_cpg_address_t (&left_list[i], - &left_list_start[i]); - } - joined_list_start = res_cpg_confchg_callback->member_list + - res_cpg_confchg_callback->member_list_entries + - res_cpg_confchg_callback->left_list_entries; - for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) { - marshall_from_mar_cpg_address_t (&joined_list[i], - &joined_list_start[i]); - } - marshall_from_mar_cpg_name_t ( - &group_name, - &res_cpg_confchg_callback->group_name); - - callbacks.cpg_confchg_fn (handle, - &group_name, - member_list, - res_cpg_confchg_callback->member_list_entries, - left_list, - res_cpg_confchg_callback->left_list_entries, - joined_list, - res_cpg_confchg_callback->joined_list_entries); - break; - - default: - coroipcc_dispatch_put (cpg_inst->handle); - error = CS_ERR_LIBRARY; - goto error_put; - break; - } + case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK: + if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) { + break; + } + + res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data; + + marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id); + for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) { + totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i]; + } + + cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle, + ring_id, + res_cpg_totem_confchg_callback->member_list_entries, + totem_member_list); + break; + default: + coroipcc_dispatch_put (cpg_inst->handle); + error = CS_ERR_LIBRARY; + goto error_put; + break; + } /* - switch (dispatch_data->id) */ + break; /* case CPG_MODEL_V1 */ + } /* - switch (cpg_inst_copy.model_data.model) */ coroipcc_dispatch_put (cpg_inst->handle); /* diff --git a/trunk/lib/libcpg.verso b/trunk/lib/libcpg.verso index 1454f6e..ee74734 100644 --- a/trunk/lib/libcpg.verso +++ b/trunk/lib/libcpg.verso @@ -1 +1 @@ -4.0.1 +4.1.0 diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c index ede426f..48e18bf 100644 --- a/trunk/services/cpg.c +++ b/trunk/services/cpg.c @@ -160,6 +160,8 @@ static struct corosync_api_v1 *api = NULL; static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST; +static mar_cpg_ring_id_t last_sync_ring_id; + struct process_info { unsigned int nodeid; uint32_t pid; @@ -255,6 +257,10 @@ static void cpg_sync_activate (void); static void cpg_sync_abort (void); +static int notify_lib_totem_membership ( + int member_list_entries, + const unsigned int *member_list); + /* * Library Handler Definition */ @@ -432,6 +438,9 @@ static void cpg_sync_init_v2 ( sizeof (unsigned int)); my_member_list_entries = member_list_entries; + last_sync_ring_id.nodeid = ring_id->rep.nodeid; + last_sync_ring_id.seq = ring_id->seq; + for (i = 0; i < my_member_list_entries; i++) { if (my_member_list[i] < lowest_nodeid) { lowest_nodeid = my_member_list[i]; @@ -482,13 +491,45 @@ static void cpg_sync_activate (void) memcpy (my_old_member_list, my_member_list, my_member_list_entries * sizeof (unsigned int)); my_old_member_list_entries = my_member_list_entries; + + notify_lib_totem_membership (my_member_list_entries, my_member_list); } static void cpg_sync_abort (void) { } +static int notify_lib_totem_membership ( + int member_list_entries, + const unsigned int *member_list) +{ + struct list_head *iter; + char *buf; + int size; + struct res_lib_cpg_totem_confchg_callback *res; + size = sizeof(struct res_lib_cpg_totem_confchg_callback) + + sizeof(mar_uint32_t) * (member_list_entries); + buf = alloca(size); + if (!buf) + return CPG_ERR_LIBRARY; + + res = (struct res_lib_cpg_totem_confchg_callback *)buf; + res->member_list_entries = member_list_entries; + res->header.size = size; + res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK; + res->header.error = CS_OK; + + memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t)); + memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t)); + + for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) { + struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list); + api->ipc_dispatch_send (cpg_pd->conn, buf, size); + } + + return CPG_OK; +} static int notify_lib_joinlist( const mar_cpg_name_t *group_name, diff --git a/trunk/test/testcpg.c b/trunk/test/testcpg.c index 2abe83d..50bab97 100644 --- a/trunk/test/testcpg.c +++ b/trunk/test/testcpg.c @@ -132,9 +132,28 @@ static void ConfchgCallback ( } } -static cpg_callbacks_t callbacks = { +static void TotemConfchgCallback ( + cpg_handle_t handle, + struct cpg_ring_id ring_id, + uint32_t member_list_entries, + const uint32_t *member_list) +{ + int i; + + printf("\nTotemConfchgCallback: ringid (%u.%llu)\n", ring_id.nodeid, ring_id.seq); + + printf("active processors %lu: ", + (unsigned long int) member_list_entries); + for (i=0; i<member_list_entries; i++) { + printf("%d ", member_list[i]); + } + printf ("\n"); +} + +static cpg_model_v1_data_t callbacks = { .cpg_deliver_fn = DeliverCallback, .cpg_confchg_fn = ConfchgCallback, + .cpg_totem_confchg_fn = TotemConfchgCallback, }; static void sigintr_handler (int signum) __attribute__((__noreturn__)); @@ -170,7 +189,7 @@ int main (int argc, char *argv[]) { group_name.length = 6; } - result = cpg_initialize (&handle, &callbacks); + result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&callbacks, NULL); if (result != CS_OK) { printf ("Could not initialize Cluster Process Group API instance error %d\n", result); exit (1);
_______________________________________________ Openais mailing list [email protected] https://lists.linux-foundation.org/mailman/listinfo/openais
