Included patch should fix
https://bugzilla.redhat.com/show_bug.cgi?id=506255 .
David, I hope it will fix problem for you.
It's based on simple idea of adding node startup timestamp at the end of
cpg_join (and joinlist) calls. If timestamp is larger then old timestamp
we know, node was restarted and we didn't notices -> deliver leave event
and then join event. If timestamp is same (or in special cases lower) ->
new cpg app joined -> send only join event.
Of course, patch isn't so simple. Cpg_join messages are always send as
larger messages with timestamp (btw. timestamp is 64-bit value, because
I expect l(o^64)ng life of corosync ;) ). On delivery, we test, if
message is larger then standard message. If it is -> we have ts -> use it.
Bigger problem was joinlist, because it's array, ... you will see in
source. Solution is, to send special entry, with pid 0 (shouldn't ever
happened to process, to have pid 0), and timestamp encoded in name
(ugly, but looks like working).
Please comment, if you can.
Regards,
Honza
diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c
index 5c93586..c052db0 100644
--- a/trunk/services/cpg.c
+++ b/trunk/services/cpg.c
@@ -70,6 +70,8 @@ LOGSYS_DECLARE_SUBSYS ("CPG");
#define GROUP_HASH_SIZE 32
+#define GROUP_SPECIAL_PID 0
+
enum cpg_message_req_types {
MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
@@ -135,6 +137,7 @@ struct process_info {
unsigned int nodeid;
uint32_t pid;
mar_cpg_name_t group;
+ uint64_t timestamp;
struct list_head list; /* on the group_info members list */
};
DECLARE_LIST_INIT(process_info_list_head);
@@ -146,6 +149,9 @@ struct join_list_entry {
static struct corosync_api_v1 *api = NULL;
+static uint64_t timestamp;
+static char special_pid_check[9] = {0xff, 0xee, 0xdd, 'c', 'h', 'e', 'c', 'k', 0x00};
+
/*
* Service Interfaces required by service_message_handler struct
*/
@@ -341,6 +347,14 @@ struct req_exec_cpg_procjoin {
mar_uint32_t reason __attribute__((aligned(8)));
};
+struct req_exec_cpg_procjoin_with_ts {
+ coroipc_request_header_t header __attribute__((aligned(8)));
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
+ mar_uint32_t pid __attribute__((aligned(8)));
+ mar_uint32_t reason __attribute__((aligned(8)));
+ mar_uint64_t timestamp __attribute__((aligned(8)));
+};
+
struct req_exec_cpg_mcast {
coroipc_request_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
@@ -504,6 +518,9 @@ static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
logsys_subsys_init();
#endif
api = corosync_api;
+
+ timestamp = (uint64_t) time(NULL);
+
return (0);
}
@@ -524,7 +541,7 @@ static int cpg_lib_exit_fn (void *conn)
static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
{
- struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
+ struct req_exec_cpg_procjoin_with_ts req_exec_cpg_procjoin;
struct iovec req_exec_cpg_iovec;
int result;
@@ -532,6 +549,8 @@ static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *grou
req_exec_cpg_procjoin.pid = pid;
req_exec_cpg_procjoin.reason = reason;
+ req_exec_cpg_procjoin.timestamp = timestamp;
+
req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
@@ -601,6 +620,12 @@ static void exec_cpg_procjoin_endian_convert (void *msg)
req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
+
+ if (req_exec_cpg_procjoin->header.size > sizeof (struct req_exec_cpg_procjoin)) {
+ /* Message is full message with ts */
+ ((struct req_exec_cpg_procjoin_with_ts *) msg)->timestamp =
+ swab64(((struct req_exec_cpg_procjoin_with_ts *) msg)->timestamp);
+ }
}
static void exec_cpg_joinlist_endian_convert (void *msg_v)
@@ -614,6 +639,27 @@ static void exec_cpg_joinlist_endian_convert (void *msg_v)
while ((const char*)jle < msg + res->size) {
jle->pid = swab32(jle->pid);
swab_mar_cpg_name_t (&jle->group_name);
+
+ /* Test, if this pid is special pid */
+ if (jle->pid == GROUP_SPECIAL_PID &&
+ memcmp (jle->group_name.value, special_pid_check, sizeof (special_pid_check)) == 0) {
+
+ uint32_t ainfo_size;
+ uint64_t ntimestamp;
+
+ ainfo_size = *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check)));
+ ainfo_size = swab32 (ainfo_size);
+ *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))) = ainfo_size;
+
+ if (ainfo_size >= sizeof (timestamp)) {
+ /* We have timestamp */
+ ntimestamp = *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) +
+ sizeof (uint32_t)));
+ ntimestamp = swab64 (ntimestamp);
+ *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) +
+ sizeof (uint32_t))) = ntimestamp;
+ }
+ }
jle++;
}
}
@@ -662,6 +708,7 @@ static void do_proc_join(
const mar_cpg_name_t *name,
uint32_t pid,
unsigned int nodeid,
+ uint64_t ntimestamp,
int reason)
{
struct process_info *pi;
@@ -680,6 +727,7 @@ static void do_proc_join(
}
pi->nodeid = nodeid;
pi->pid = pid;
+ pi->timestamp = ntimestamp;
memcpy(&pi->group, name, sizeof(*name));
list_init(&pi->list);
@@ -750,11 +798,56 @@ static void message_handler_req_exec_cpg_procjoin (
unsigned int nodeid)
{
const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
+ uint64_t ntimestamp;
+ struct list_head *iter;
+ int node_failed = 0;
+ mar_cpg_address_t left_list[1];
log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
+ if (req_exec_cpg_procjoin->header.size > sizeof (struct req_exec_cpg_procjoin)) {
+ /* Message with timestamp */
+ ntimestamp = ((struct req_exec_cpg_procjoin_with_ts *)message)->timestamp;
+ } else {
+ ntimestamp = ~0;
+ }
+
+ /* Test if node didn't failed before */
+ for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
+ struct process_info *pi = list_entry(iter, struct process_info, list);
+
+ if (pi->nodeid == nodeid && ntimestamp > pi->timestamp) {
+ /* Node failed*/
+ node_failed = 1;
+ break;
+ }
+ }
+
+ if (node_failed) {
+ /* Fake leave message */
+ log_printf(LOGSYS_LEVEL_DEBUG, "node %d failed. Delivering leave message\n", nodeid);
+
+ for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
+ struct process_info *pi = list_entry(iter, struct process_info, list);
+ iter = iter->next;
+
+ if (pi->nodeid == nodeid) {
+ left_list[0].nodeid = pi->nodeid;
+ left_list[0].pid = pi->pid;
+ left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN;
+
+ notify_lib_joinlist(&pi->group, NULL,
+ 0, NULL,
+ 1, left_list,
+ MESSAGE_RES_CPG_CONFCHG_CALLBACK);
+ list_del (&pi->list);
+ free (pi);
+ }
+ }
+ }
+
do_proc_join (&req_exec_cpg_procjoin->group_name,
- req_exec_cpg_procjoin->pid, nodeid,
+ req_exec_cpg_procjoin->pid, nodeid, ntimestamp,
CONFCHG_CPG_REASON_JOIN);
}
@@ -799,6 +892,7 @@ static void message_handler_req_exec_cpg_joinlist (
const char *message = message_v;
const coroipc_response_header_t *res = (const coroipc_response_header_t *)message;
const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
+ uint64_t ntimestamp;
log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node %x\n",
nodeid);
@@ -808,9 +902,36 @@ static void message_handler_req_exec_cpg_joinlist (
return;
}
+ jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
+
+ ntimestamp = ~0;
+
+ /* We first try to found, if we have timestamp or not */
+ while ((const char*)jle < message + res->size) {
+ if (jle->pid == GROUP_SPECIAL_PID &&
+ memcmp (jle->group_name.value, special_pid_check, sizeof (special_pid_check)) == 0) {
+
+ /* It looks like we have special pid -> timestamp*/
+ uint32_t ainfo_size;
+
+ ainfo_size = *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check)));
+
+ if (ainfo_size >= sizeof (timestamp)) {
+ /* We have timestamp */
+ ntimestamp = *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) +
+ sizeof (uint32_t)));
+
+ log_printf(LOGSYS_LEVEL_DEBUG, "joinlist with timestamp %lld\n", ntimestamp);
+ }
+ }
+ jle++;
+ }
+
+ jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
+
while ((const char*)jle < message + res->size) {
do_proc_join (&jle->group_name, jle->pid, nodeid,
- CONFCHG_CPG_REASON_NODEUP);
+ ntimestamp, CONFCHG_CPG_REASON_NODEUP);
jle++;
}
}
@@ -873,7 +994,7 @@ static int cpg_exec_send_joinlist(void)
if (!count)
return 0;
- buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * count);
+ buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * (count + 1));
if (!buf) {
log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
return -1;
@@ -892,8 +1013,15 @@ static int cpg_exec_send_joinlist(void)
}
}
+ /* Time to add special pid with timestamp */
+ jle->pid = GROUP_SPECIAL_PID;
+ jle->group_name.length = sizeof (special_pid_check);
+ memcpy(jle->group_name.value, special_pid_check, sizeof (special_pid_check));
+ *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))) = sizeof (timestamp);
+ *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) + sizeof (uint32_t))) = timestamp;
+
res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
- res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * count;
+ res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * (count + 1);
req_exec_cpg_iovec.iov_base = buf;
req_exec_cpg_iovec.iov_len = res->size;
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais