Lets hold off on this patch until I finish the ipc forward port please.

Thanks
-steve

On Wed, 2008-08-20 at 06:54 +1200, angus salkeld wrote:
> This patch causes the flow control state in the library to be set
> properly when the flow control is turned off (disabled).  Then it can be
> read properly by the flow control apis.
> This also fixes the case where the application is no longer sending
> messages and it has already dispatched all its received messages
> before flow control is disabled.
> 
> Also, CPG response messages with a TRY_AGAIN error did NOT contain
> a valid flow control state value. This meant the library could get
> stuck with flow control enabled (flow control was never enabled
> for the EXEC, so no disable event occurred).
> This case was hit when a new node was joining - sync_in_process()
> resulted in a TRY_AGAIN for error cpg_mcast_joined).
> 
> Also, in message_handler_req_exec_cpg_mcast() the state passed
> back to the library defaulted to disabled for messages received
> from another node (even if flow control was still enabled)
> - this meant if multiple nodes were sending CPG messages,
>   then the library flow control state flip-flopped between
>   enabled and disabled.
> 
> Author: Steven Dake <[EMAIL PROTECTED]> &
>         Tim Beale <[EMAIL PROTECTED]>
> ---
>  exec/apidef.c                     |    1 +
>  exec/ipc.c                        |   17 ++++++++++++++++-
>  exec/ipc.h                        |    2 ++
>  include/corosync/engine/coroapi.h |    2 ++
>  include/corosync/ipc_cpg.h        |    8 +++++++-
>  lib/cpg.c                         |   19 ++++++++++++++++---
>  services/cpg.c                    |   23 ++++++++++++++++++++---
>  7 files changed, 64 insertions(+), 8 deletions(-)
> 
> diff --git a/exec/apidef.c b/exec/apidef.c
> index d1dfde3..7fb1852 100644
> --- a/exec/apidef.c
> +++ b/exec/apidef.c
> @@ -69,6 +69,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
>       .ipc_source_is_local = message_source_is_local,
>       .ipc_private_data_get = corosync_conn_private_data_get,
>       .ipc_response_send = NULL,
> +     .ipc_response_no_fcc = corosync_conn_send_response_no_fcc,
>       .ipc_dispatch_send = NULL,
>       .ipc_conn_send_response = corosync_conn_send_response,
>       .ipc_conn_partner_get = corosync_conn_partner_get,
> diff --git a/exec/ipc.c b/exec/ipc.c
> index cccbcbb..facc75e 100644
> --- a/exec/ipc.c
> +++ b/exec/ipc.c
> @@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);
>  
>  static unsigned int g_gid_valid = 0;
>  
> +static unsigned int dont_call_flow_control = 0;
> +
>  static totempg_groups_handle ipc_handle;
>  
>  DECLARE_LIST_INIT (conn_info_list_head);
> @@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn)
>       }
>  }
>  
> +int corosync_conn_send_response_no_fcc (
> +     void *conn,
> +     void *msg,
> +     int mlen)
> +{
> +     dont_call_flow_control = 1;
> +     corosync_conn_send_response (
> +             conn, msg, mlen);
> +     dont_call_flow_control = 0;
> +}
> +
>  int corosync_conn_send_response (
>       void *conn,
>       void *msg,
> @@ -1149,7 +1162,9 @@ int corosync_conn_send_response (
>               return (-1);
>       }
>  
> -     ipc_flow_control (conn_info);
> +     if (dont_call_flow_control == 0) {
> +             ipc_flow_control (conn_info);
> +     }
>  
>       outq = &conn_info->outq;
>  
> diff --git a/exec/ipc.h b/exec/ipc.h
> index a29a698..fc24241 100644
> --- a/exec/ipc.h
> +++ b/exec/ipc.h
> @@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn);
>  
>  extern int corosync_conn_send_response (void *conn, void *msg, int mlen);
>  
> +extern int corosync_conn_send_response_no_fcc (void *conn, void *msg, int 
> mlen);
> +
>  extern void corosync_ipc_init (
>          void (*serialize_lock_fn) (void),
>          void (*serialize_unlock_fn) (void),
> diff --git a/include/corosync/engine/coroapi.h 
> b/include/corosync/engine/coroapi.h
> index 0d6c60b..c46338e 100644
> --- a/include/corosync/engine/coroapi.h
> +++ b/include/corosync/engine/coroapi.h
> @@ -322,6 +322,8 @@ struct corosync_api_v1 {
>  
>       int (*ipc_response_send) (void *conn, void *msg, int mlen);
>  
> +     int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen);
> +
>       int (*ipc_dispatch_send) (void *conn, void *msg, int mlen);
>  
>       /*
> diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
> index 32b8544..c1e68be 100644
> --- a/include/corosync/ipc_cpg.h
> +++ b/include/corosync/ipc_cpg.h
> @@ -62,7 +62,8 @@ enum res_cpg_types {
>       MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
>       MESSAGE_RES_CPG_LOCAL_GET = 9,
>       MESSAGE_RES_CPG_GROUPS_GET = 10,
> -     MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
> +     MESSAGE_RES_CPG_GROUPS_CALLBACK = 11,
> +     MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12
>  };
>  
>  enum lib_cpg_confchg_reason {
> @@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback {
>       mar_uint8_t message[] __attribute__((aligned(8)));
>  };
>  
> +struct res_lib_cpg_flowcontrol_callback {
> +     mar_res_header_t header __attribute__((aligned(8)));
> +     mar_uint32_t flow_control_state __attribute__((aligned(8)));
> +};
> +
>  struct req_lib_cpg_membership {
>       mar_req_header_t header __attribute__((aligned(8)));
>       mar_cpg_name_t group_name __attribute__((aligned(8)));
> diff --git a/lib/cpg.c b/lib/cpg.c
> index 6509b7a..89390f7 100644
> --- a/lib/cpg.c
> +++ b/lib/cpg.c
> @@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch (
>       int cont = 1; /* always continue do loop except when set to 0 */
>       int dispatch_avail;
>       struct cpg_inst *cpg_inst;
> +     struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
>       struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
>       struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
>       struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
> @@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch (
>                               joined_list,
>                               res_cpg_confchg_callback->joined_list_entries);
>                       break;
> +
>               case MESSAGE_RES_CPG_GROUPS_CALLBACK:
>                       res_lib_cpg_groups_get_callback = (struct 
> res_lib_cpg_groups_get_callback *)&dispatch_data;
>                       marshall_from_mar_cpg_name_t (
> @@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch (
>                                                   &group_name,
>                                                   member_list,
>                                                   
> res_lib_cpg_groups_get_callback->num_members);
> +
> +                     break;
> +
> +             case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
> +                     res_cpg_flowcontrol_callback = (struct 
> res_lib_cpg_flowcontrol_callback *)&dispatch_data;
> +                     cpg_inst->flow_control_state = 
> res_cpg_flowcontrol_callback->flow_control_state;
>                       break;
>  
>               default:
> @@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined (
>               goto error_exit;
>       }
>  
> -     cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
> -     if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
> -             cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
> +/*   Only update the flow control state when the return value is OK.
> + *   Otherwise the flow control state is not guaranteed to be valid in the
> + *   return message.
> + *   Also, don't set to ENABLED if the return value is TRY_AGAIN as this can 
> lead
> + *   to Flow Control State sync issues between AIS LIB and EXEC.
> + */
> +     if (res_lib_cpg_mcast.header.error == CPG_OK) {
> +             cpg_inst->flow_control_state = 
> res_lib_cpg_mcast.flow_control_state;
>       }
>       error = res_lib_cpg_mcast.header.error;
>  
> diff --git a/services/cpg.c b/services/cpg.c
> index 82c426f..d7c179c 100644
> --- a/services/cpg.c
> +++ b/services/cpg.c
> @@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn (
>       void *context,
>       enum corosync_flow_control_state flow_control_state)
>  {
> +     struct res_lib_cpg_flowcontrol_callback 
> res_lib_cpg_flowcontrol_callback;
>       struct process_info *process_info = (struct process_info *)context;
>  
>       process_info->flow_control_state = flow_control_state;
> +     /*
> +      * Send disabled flow control if a disabled occurs.  This prevents
> +      * the condition where a disabled occurs after all messages have been
> +      * delivered and then there is no valid way to retrieve the flow
> +      * control state
> +      */
> +     if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
> +             res_lib_cpg_flowcontrol_callback.header.id = 
> MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
> +             res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct 
> res_lib_cpg_flowcontrol_callback);
> +             res_lib_cpg_flowcontrol_callback.flow_control_state = 
> flow_control_state;
> +
> +             if (process_info->trackerconn) {
> +                     api->ipc_response_no_fcc (
> +                             process_info->trackerconn,
> +                             &res_lib_cpg_flowcontrol_callback,
> +                             sizeof (struct 
> res_lib_cpg_flowcontrol_callback));
> +             }
> +     }
>  }
>  
>  /* Can byteswap join & leave messages */
> @@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast (
>  {
>       struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct 
> req_exec_cpg_mcast *)message;
>       struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
> -     struct process_info *process_info;
>       int msglen = req_exec_cpg_mcast->msglen;
>       char buf[sizeof(*res_lib_cpg_mcast) + msglen];
>       struct group_info *gi;
> @@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast (
>       res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
>       if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) {
>               api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn);
> -             process_info = (struct process_info *)api->ipc_private_data_get 
> (req_exec_cpg_mcast->source.conn);
> -             res_lib_cpg_mcast->flow_control_state = 
> process_info->flow_control_state;
>       }
>       memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
>               sizeof(mar_cpg_name_t));
> @@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast (
>       for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
>               struct process_info *pi = list_entry(iter, struct process_info, 
> list);
>               if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
> +                     res_lib_cpg_mcast->flow_control_state = 
> pi->flow_control_state;
>                       api->ipc_conn_send_response(
>                               pi->trackerconn,
>                               buf,

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to