Tim,
Please find attached a patch which may fix your issue.
If you could report back if this solves your problem I would appreciate
it.
This patch causes the flow control state in the library to be set
properly when the flow control is turned off (disabled). Then it can be
read properly by the flow control apis.
Regards
-steve
On Wed, 2007-11-21 at 17:30 +1300, Tim Beale wrote:
> Hi,
>
> I'm currently using EVS messaging to propagate events across a cluster, and I
> was hoping that switching to CPG would help minimise some IPC disconnect
> issues
> I've been having. However, I'm not sure I fully understand how CPG flow
> control
> works.
>
> My test is: I've got 2 nodes in a cluster, node-1 is sending CPG messages
> faster than node-2 can process them (due to a usleep), and node-2 is not
> sending
> any messages. I can see flow control kick in and node-1 stops sending
> messages,
> i.e. by using the demo code:
> cpg_flow_control_state_get(handle, &flow_control);
> if (flow_control == CPG_FLOW_CONTROL_DISABLED)
> cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
>
> Once node-2 recovers, I see that flow control gets disabled again in aisexec.
> However, the app on node-1 doesn't seem to detect flow control is disabled
> again. It looks like the cpg_inst->flow_control_status returned by
> cpg_flow_control_state_get() gets updated during a DELIVER_CALLBACK in
> cpg_dispatch(), or in cpg_mcast_joined(). But if the app is not sending
> messages (because flow control is enabled), and it has already dispatched all
> its received messages, how would the app/library know when the flow control
> becomes disabled again?
>
> The app I'm using is multi-threaded, e.g. one thread generates events
> (cpg_mcast_joined), another thread processes them (cpg_dispatch). I'm using
> AIS trunk code from around August 07, but I can't see any major CPG changes in
> the latest trunk code.
>
> Sorry if I've completely misunderstood CPG. Thanks for your help,
>
> Tim
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais
Index: include/ipc_cpg.h
===================================================================
--- include/ipc_cpg.h (revision 1480)
+++ include/ipc_cpg.h (working copy)
@@ -59,6 +59,7 @@
MESSAGE_RES_CPG_TRACKSTART = 6,
MESSAGE_RES_CPG_TRACKSTOP = 7,
MESSAGE_RES_CPG_LOCAL_GET = 8,
+ MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 9
};
enum lib_cpg_confchg_reason {
@@ -131,6 +132,11 @@
mar_uint8_t message[] __attribute__((aligned(8)));
};
+struct res_lib_cpg_flowcontrol_callback {
+ mar_res_header_t header __attribute__((aligned(8)));
+ mar_uint32_t flow_control_state __attribute__((aligned(8)));
+};
+
struct req_lib_cpg_membership {
mar_req_header_t header __attribute__((aligned(8)));
mar_cpg_name_t group_name __attribute__((aligned(8)));
Index: exec/cpg.c
===================================================================
--- exec/cpg.c (revision 1480)
+++ exec/cpg.c (working copy)
@@ -656,9 +656,28 @@
void *context,
enum openais_flow_control_state flow_control_state)
{
+ struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback;
struct process_info *process_info = (struct process_info *)context;
process_info->flow_control_state = flow_control_state;
+ /*
+ * Send disabled flow control if a disabled occurs. This prevents
+ * the condition where a disabled occurs after all messages have been
+ * delivered and then there is no valid way to retrieve the flow
+ * control state
+ */
+ if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+ res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
+ res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback);
+ res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state;
+
+ if (process_info->trackerconn) {
+ openais_conn_send_response_no_fcc (
+ process_info->trackerconn,
+ &res_lib_cpg_flowcontrol_callback,
+ sizeof (struct res_lib_cpg_flowcontrol_callback));
+ }
+ }
}
/* Can byteswap join & leave messages */
Index: exec/ipc.c
===================================================================
--- exec/ipc.c (revision 1480)
+++ exec/ipc.c (working copy)
@@ -94,6 +94,8 @@
static unsigned int g_gid_valid = 0;
+static unsigned int dont_call_flow_control = 0;
+
static struct totem_ip_address *my_ip;
static totempg_groups_handle ipc_handle;
@@ -1089,6 +1091,17 @@
}
}
+int openais_conn_send_response_no_fcc (
+ void *conn,
+ void *msg,
+ int mlen)
+{
+ dont_call_flow_control = 1;
+ openais_conn_send_response (
+ conn, msg, mlen);
+ dont_call_flow_control = 0;
+}
+
int openais_conn_send_response (
void *conn,
void *msg,
@@ -1113,7 +1126,9 @@
return (-1);
}
- ipc_flow_control (conn_info);
+ if (dont_call_flow_control == 0) {
+ ipc_flow_control (conn_info);
+ }
outq = &conn_info->outq;
Index: exec/ipc.h
===================================================================
--- exec/ipc.h (revision 1480)
+++ exec/ipc.h (working copy)
@@ -48,6 +48,8 @@
extern int openais_conn_send_response (void *conn, void *msg, int mlen);
+extern int openais_conn_send_response_no_fcc (void *conn, void *msg, int mlen);
+
extern void openais_ipc_init (
void (*serialize_lock_fn) (void),
void (*serialize_unlock_fn) (void),
Index: lib/cpg.c
===================================================================
--- lib/cpg.c (revision 1480)
+++ lib/cpg.c (working copy)
@@ -251,6 +251,7 @@
int cont = 1; /* always continue do loop except when set to 0 */
int dispatch_avail;
struct cpg_inst *cpg_inst;
+ struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
cpg_callbacks_t callbacks;
@@ -354,6 +355,7 @@
res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
+
marshall_from_mar_cpg_name_t (
&group_name,
&res_cpg_deliver_callback->group_name);
@@ -369,6 +371,7 @@
case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)&dispatch_data;
+ cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state;
for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
marshall_from_mar_cpg_address_t (&member_list[i],
&res_cpg_confchg_callback->member_list[i]);
@@ -400,6 +403,11 @@
res_cpg_confchg_callback->joined_list_entries);
break;
+ case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
+ res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data;
+ cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state;
+ break;
+
default:
error = SA_AIS_ERR_LIBRARY;
goto error_nounlock;
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais