Hi Steve,
Thanks a lot for your help. Sorry for the delay in replying. Your patch
fixed
the problem (the change for CONFCHG_CALLBACK in cpg_dispatch() tried to
access
a bad pointer though).
I noticed another problem where if flow control is enabled when both nodes
are
sending CPG messages, then the state returned by
cpg_flow_control_state_get()
flip-flops between enabled and disabled. In
message_handler_req_exec_cpg_mcast(),
the state passed back to the library defaults to disabled for messages
received
from another node (even if flow control is still enabled), i.e.
res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
if (message_source_is_local (&req_exec_cpg_mcast->source))
res_lib_cpg_mcast->flow_control_state =
process_info->flow_control_state;
I changed this to use the flow_control_state from the tracker connection's
process_info instead - is that OK? All process_infos should have the same
flow_control_state, right? I incorporated the change into your CPG patch,
attached.
Thanks again,
Tim
On Nov 27, 2007 9:17 AM, Steven Dake <[EMAIL PROTECTED]> wrote:
> Tim,
> Please find attached a patch which may fix your issue.
>
> If you could report back if this solves your problem I would appreciate
> it.
>
> This patch causes the flow control state in the library to be set
> properly when the flow control is turned off (disabled). Then it can be
> read properly by the flow control apis.
>
> Regards
> -steve
>
> On Wed, 2007-11-21 at 17:30 +1300, Tim Beale wrote:
> > Hi,
> >
> > I'm currently using EVS messaging to propagate events across a cluster,
> and I
> > was hoping that switching to CPG would help minimise some IPC disconnect
> issues
> > I've been having. However, I'm not sure I fully understand how CPG flow
> control
> > works.
> >
> > My test is: I've got 2 nodes in a cluster, node-1 is sending CPG
> messages
> > faster than node-2 can process them (due to a usleep), and node-2 is not
> sending
> > any messages. I can see flow control kick in and node-1 stops sending
> messages,
> > i.e. by using the demo code:
> > cpg_flow_control_state_get(handle, &flow_control);
> > if (flow_control == CPG_FLOW_CONTROL_DISABLED)
> > cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
> >
> > Once node-2 recovers, I see that flow control gets disabled again in
> aisexec.
> > However, the app on node-1 doesn't seem to detect flow control is
> disabled
> > again. It looks like the cpg_inst->flow_control_status returned by
> > cpg_flow_control_state_get() gets updated during a DELIVER_CALLBACK in
> > cpg_dispatch(), or in cpg_mcast_joined(). But if the app is not sending
> > messages (because flow control is enabled), and it has already
> dispatched all
> > its received messages, how would the app/library know when the flow
> control
> > becomes disabled again?
> >
> > The app I'm using is multi-threaded, e.g. one thread generates events
> > (cpg_mcast_joined), another thread processes them (cpg_dispatch). I'm
> using
> > AIS trunk code from around August 07, but I can't see any major CPG
> changes in
> > the latest trunk code.
> >
> > Sorry if I've completely misunderstood CPG. Thanks for your help,
> >
> > Tim
> > _______________________________________________
> > Openais mailing list
> > [email protected]
> > https://lists.linux-foundation.org/mailman/listinfo/openais
>
diff --git a/exec/cpg.c b/exec/cpg.c
index 6a93d87..00b6488 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -662,9 +662,28 @@ static void cpg_flow_control_state_set_fn (
void *context,
enum openais_flow_control_state flow_control_state)
{
+ struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback;
struct process_info *process_info = (struct process_info *)context;
process_info->flow_control_state = flow_control_state;
+ /*
+ * Send disabled flow control if a disabled occurs. This prevents
+ * the condition where a disabled occurs after all messages have been
+ * delivered and then there is no valid way to retrieve the flow
+ * control state
+ */
+ if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+ res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
+ res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback);
+ res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state;
+
+ if (process_info->trackerconn) {
+ openais_conn_send_response_no_fcc (
+ process_info->trackerconn,
+ &res_lib_cpg_flowcontrol_callback,
+ sizeof (struct res_lib_cpg_flowcontrol_callback));
+ }
+ }
}
/* Can byteswap join & leave messages */
@@ -910,8 +929,6 @@ static void message_handler_req_exec_cpg_mcast (
res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
if (message_source_is_local (&req_exec_cpg_mcast->source)) {
openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn);
- process_info = (struct process_info *)openais_conn_private_data_get (req_exec_cpg_mcast->source.conn);
- res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
}
memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
sizeof(mar_cpg_name_t));
@@ -922,6 +939,7 @@ static void message_handler_req_exec_cpg_mcast (
for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
struct process_info *pi = list_entry(iter, struct process_info, list);
if (pi->trackerconn) {
+ res_lib_cpg_mcast->flow_control_state = pi->flow_control_state;
openais_conn_send_response(
pi->trackerconn,
buf,
diff --git a/exec/ipc.c b/exec/ipc.c
index 17d721b..f1ac05c 100644
--- a/exec/ipc.c
+++ b/exec/ipc.c
@@ -1120,6 +1120,17 @@ void *openais_conn_partner_get (void *conn)
}
}
+int openais_conn_send_response_no_fcc (
+ void *conn,
+ void *msg,
+ int mlen)
+{
+ dont_call_flow_control = 1;
+ openais_conn_send_response (
+ conn, msg, mlen);
+ dont_call_flow_control = 0;
+}
+
int openais_conn_send_response (
void *conn,
void *msg,
@@ -1144,7 +1155,9 @@ int openais_conn_send_response (
return (-1);
}
- ipc_flow_control (conn_info);
+ if (dont_call_flow_control == 0) {
+ ipc_flow_control (conn_info);
+ }
outq = &conn_info->outq;
diff --git a/exec/ipc.h b/exec/ipc.h
index 37d87d9..dbc0e48 100644
--- a/exec/ipc.h
+++ b/exec/ipc.h
@@ -50,6 +50,8 @@ extern void *openais_conn_private_data_get (void *conn);
extern int openais_conn_send_response (void *conn, void *msg, int mlen);
+extern int openais_conn_send_response_no_fcc (void *conn, void *msg, int mlen);
+
extern void openais_ipc_init (
void (*serialize_lock_fn) (void),
void (*serialize_unlock_fn) (void),
diff --git a/include/ipc_cpg.h b/include/ipc_cpg.h
index 16b87a0..c21e957 100644
--- a/include/ipc_cpg.h
+++ b/include/ipc_cpg.h
@@ -132,6 +132,11 @@ struct res_lib_cpg_deliver_callback {
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct res_lib_cpg_flowcontrol_callback {
+ mar_res_header_t header __attribute__((aligned(8)));
+ mar_uint32_t flow_control_state __attribute__((aligned(8)));
+};
+
struct req_lib_cpg_membership {
mar_req_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 2f48694..e022ba3 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch (
int cont = 1; /* always continue do loop except when set to 0 */
int dispatch_avail;
struct cpg_inst *cpg_inst;
+ struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
cpg_callbacks_t callbacks;
@@ -351,6 +352,7 @@ cpg_error_t cpg_dispatch (
res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
+
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_deliver_callback->group_name);
@@ -397,6 +399,11 @@ cpg_error_t cpg_dispatch (
res_cpg_confchg_callback->joined_list_entries);
break;
+ case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
+ res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data;
+ cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state;
+ break;
+
default:
error = SA_AIS_ERR_LIBRARY;
goto error_nounlock;
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais