A good use case I think and Appreciate the work.
Will (also nitpick and) revert back :-)!

Mathi.


> -----Original Message-----
> From: Anders Widell [mailto:[email protected]]
> Sent: Tuesday, September 29, 2015 1:07 AM
> To: Mathivanan Naickan Palanivelu
> Cc: [email protected]
> Subject: [PATCH 1 of 2] clmd: Add support for scale-out by calling a custom
> script to configure new nodes [#1453]
> 
>  osaf/services/saf/clmsv/README           |   17 ++
>  osaf/services/saf/clmsv/clms/clms_cb.h   |   31 ++++
>  osaf/services/saf/clmsv/clms/clms_evt.c  |  238
> ++++++++++++++++++++++++++++++-
>  osaf/services/saf/clmsv/clms/clms_main.c |   18 ++
>  osaf/services/saf/clmsv/config/clmd.conf |    9 +
>  5 files changed, 311 insertions(+), 2 deletions(-)
> 
> 
> The scale-out feature makes it possible to run a custom script when a node
> which is not configured in IMM tries to join the cluster. The intention is 
> that
> the custom script will check if the new node is eligible to be added to the
> cluster, and if so add the necessary IMM objects so that the node will be able
> to join the next time it tries. The script will be called with one or more
> arguments, where each argument represents a node which requests to be
> added to the cluster. Each argument has the format "X,Y" where X is the
> node id in decimal and Y is the name of the node.
> 
> To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT
> variable in clmd.conf to point to the executable script which shall be
> executed when new nodes try to join the cluster.
> 
> diff --git a/osaf/services/saf/clmsv/README
> b/osaf/services/saf/clmsv/README
> --- a/osaf/services/saf/clmsv/README
> +++ b/osaf/services/saf/clmsv/README
> @@ -54,6 +54,23 @@ SA_CLM_AF_INET(i.e. for IPv4) and SA_CLM
> 
>  The CLMA is the clm agent library that user applications shall link with.
> 
> +
> +SCALE OUT
> +
> +The scale-out feature makes it possible to run a custom script when a
> +node which is not configured in IMM tries to join the cluster. The
> +intention is that the custom script will check if the new node is
> +eligible to be added to the cluster, and if so add the necessary IMM
> +objects so that the node will be able to join the next time it tries.
> +The script will be called with one or more arguments, where each
> +argument represents a node which requests to be added to the cluster.
> +Each argument has the format "X,Y" where X is the node id in decimal and Y
> is the name of the node.
> +
> +To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT
> +variable in clmd.conf to point to the executable script which shall be
> +executed when new nodes try to join the cluster.
> +
> +
>  DEPENDENCIES
> 
>  CLMSv depends on the following services:
> diff --git a/osaf/services/saf/clmsv/clms/clms_cb.h
> b/osaf/services/saf/clmsv/clms/clms_cb.h
> --- a/osaf/services/saf/clmsv/clms/clms_cb.h
> +++ b/osaf/services/saf/clmsv/clms/clms_cb.h
> @@ -18,10 +18,20 @@
>  #define CLMS_CB_H
> 
>  #include <stdbool.h>
> +#include <pthread.h>
> 
>  #define IMPLEMENTER_NAME     "safClmService"
>  #define CLMS_HA_INIT_STATE   0
> 
> +/* The maximum number of nodes that can be queued for scale-out while
> the
> +   scale-out script is executing */
> +#define MAX_PENDING_NODES    32
> +
> +/* The value to put in the PATH environment variable when calling the
> +   scale-out script */
> +#define SCALE_OUT_PATH_ENV SBINDIR ":" BINDIR ":/usr/local/sbin:" \
> +     "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
> +
>  typedef enum clms_tmr_type_t {
>       CLMS_TMR_BASE,
>       CLMS_CLIENT_RESP_TMR = CLMS_TMR_BASE,
> @@ -204,6 +214,27 @@ typedef struct clms_cb_t {
>       bool is_impl_set;
>       bool nid_started;       /**< true if started by NID */
>       NCS_PATRICIA_TREE iplist;       /* To temporarily store ipaddress
> information recieved in MDS NODE_UP */
> +
> +     /* Mutex protecting shared data used by the scale-out functionality
> */
> +     pthread_mutex_t scale_out_data_mutex;
> +     /* Number of occupied indices in the vectors pending_nodes[] and
> +      * pending_node_ids[] */
> +     size_t no_of_pending_nodes;
> +     /* Number of occupied indices in the vector inprogress_node_ids[]
> */
> +     size_t no_of_inprogress_nodes;
> +     /* Names of the nodes to be added in the next run of the scale-out
> +      * script */
> +     char *pending_nodes[MAX_PENDING_NODES + 1];
> +     /* Node ids of the nodes to be added in the next run of the the
> +      * scale-out script */
> +     SaClmNodeIdT pending_node_ids[MAX_PENDING_NODES + 1];
> +     /* Node ids of the nodes that are being added by the currently
> executing
> +      * instance of the scale-out script */
> +     SaClmNodeIdT inprogress_node_ids[MAX_PENDING_NODES + 1];
> +     /* True if the scale-out thread is currently running */
> +     bool is_scale_out_thread_running;
> +     /* Full path to the scale-out script */
> +     char *scale_out_script;
>  } CLMS_CB;
> 
>  typedef struct clms_lock_tmr_t {
> diff --git a/osaf/services/saf/clmsv/clms/clms_evt.c
> b/osaf/services/saf/clmsv/clms/clms_evt.c
> --- a/osaf/services/saf/clmsv/clms/clms_evt.c
> +++ b/osaf/services/saf/clmsv/clms/clms_evt.c
> @@ -15,9 +15,25 @@
>   *
>   */
> 
> +#define _GNU_SOURCE
>  #include <configmake.h>
> 
>  #include "clms.h"
> +#include <stdio.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <errno.h>
> +#include <pthread.h>
> +#include <inttypes.h>
> +#include <unistd.h>
> +#include <limits.h>
> +#include <sys/time.h>
> +#include <sys/resource.h>
> +#include <sys/types.h>
> +#include <sys/wait.h>
> +#include "logtrace.h"
> +#include "ncsgl_defs.h"
> +#include "osaf_utility.h"
> 
>  static uint32_t process_api_evt(CLMSV_CLMS_EVT * evt);  static uint32_t
> proc_clma_updn_mds_msg(CLMSV_CLMS_EVT * evt); @@ -26,6 +42,11 @@
> static uint32_t proc_rda_evt(CLMSV_CLMS_  static uint32_t
> proc_mds_quiesced_ack_msg(CLMSV_CLMS_EVT * evt);  static uint32_t
> proc_node_lock_tmr_exp_msg(CLMSV_CLMS_EVT * evt);  static uint32_t
> proc_node_up_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt);
> +static void execute_scale_out_script(int argc, char *argv[]); static
> +void *scale_out_thread(void *arg); static void
> +start_scale_out_thread(CLMS_CB *cb); static void
> scale_out_node(CLMS_CB
> +*cb,
> +     const clmsv_clms_node_up_info_t *nodeup_info);
>  static uint32_t proc_initialize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt);
> static uint32_t proc_finalize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt);
> static uint32_t proc_track_start_msg(CLMS_CB * cb, CLMSV_CLMS_EVT *
> evt); @@ -64,6 +85,8 @@ static const CLMSV_CLMS_CLMA_API_MSG_HAN
>       proc_node_up_msg
>  };
> 
> +static char scale_out_path_env[] = "PATH=" SCALE_OUT_PATH_ENV;
> +
>  /**
>  * Clear any pending clma_down records or node_down list
>  *
> @@ -251,6 +274,212 @@ CLMS_CLIENT_INFO *clms_client_new(MDS_DE
>       return client;
>  }
> 
> +/*
> + * Execute the scale-out script with the parameters specified in @a
> +argc, and
> + * @a argv (analogous to the parameters taken by the main() function).
> +The first
> + * index in argv must be the full path to the executable script file.
> +The rest
> + * of the parameters shall specify the nodes to scale out. The format
> +of each
> + * parameter must be two strings separated by a comma character, where
> +the first
> + * string is the node id of a node to be scaled out (given as a decimal
> +number),
> + * and the second string is the node name. This function blocks until
> +the script
> + * has exited.
> + */
> +static void execute_scale_out_script(int argc, char *argv[]) {
> +     struct rlimit rlim;
> +     int nofile = 1024;
> +     char *const env[] = { scale_out_path_env, NULL };
> +
> +     TRACE_ENTER();
> +     osafassert(argc >= 1 && argv[argc] == NULL);
> +     LOG_NO("Running script %s to scale out %d node(s)", argv[0], argc -
> +1);
> +
> +     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0 && rlim.rlim_cur <=
> INT_MAX) {
> +             nofile = rlim.rlim_cur;
> +     } else {
> +             LOG_ER("getrlimit(RLIMIT_NOFILE) failed: %s",
> strerror(errno));
> +     }
> +
> +     pid_t child_pid = fork();
> +     if (child_pid == 0) {
> +             for (int fd = 3; fd < nofile; ++fd) close(fd);
> +             execve(argv[0], argv, env);
> +             _Exit(123);
> +     } else if (child_pid != (pid_t) -1) {
> +             int status;
> +             pid_t wait_pid;
> +             do {
> +                     wait_pid = waitpid(child_pid, &status, 0);
> +             } while (wait_pid == (pid_t) -1 && errno == EINTR);
> +             if (wait_pid != (pid_t) -1) {
> +                     if (!WIFEXITED(status)) {
> +                             LOG_ER("Scale out script %s terminated "
> +                                     "abnormally", argv[0]);
> +                     } else if (WEXITSTATUS(status) != 0) {
> +                             if (WEXITSTATUS(status) == 123) {
> +                                     LOG_ER("Scale out script %s could "
> +                                             "not be executed", argv[0]);
> +                             } else {
> +                                     LOG_ER("Scale out script %s failed "
> +                                             "with exit code %d", argv[0],
> +                                             WEXITSTATUS(status));
> +                             }
> +                     } else {
> +                             LOG_IN("Scale out script %s exited "
> +                                     "successfully", argv[0]);
> +                     }
> +             } else {
> +                     LOG_ER("Scale out script %s failed in waitpid(%u):
> %s",
> +                             argv[0], (unsigned) child_pid,
> strerror(errno));
> +             }
> +     } else {
> +             LOG_ER("Scale out script %s failed in fork(): %s", argv[0],
> +                     strerror(errno));
> +     }
> +     TRACE_LEAVE();
> +}
> +
> +/*
> + * This function is executed in a separate thread, and will
> +continuously call
> + * the scale-out script to scale out pending nodes until
> +no_of_pending_nodes
> + * becomes zero. The thread terminates itself when there are no more
> +pending
> + * nodes.
> + */
> +static void *scale_out_thread(void *arg) {
> +     CLMS_CB *cb = (CLMS_CB*) arg;
> +
> +     TRACE_ENTER();
> +     osafassert(cb->no_of_inprogress_nodes == 0);
> +     for (;;) {
> +             osaf_mutex_lock_ordie(&cb->scale_out_data_mutex);
> +             char *argv[MAX_PENDING_NODES + 2] = { cb-
> >scale_out_script };
> +             size_t no_of_pending_nodes = cb->no_of_pending_nodes;
> +             osafassert(no_of_pending_nodes <=
> MAX_PENDING_NODES);
> +             for (size_t i = 0; i != (no_of_pending_nodes + 1); ++i) {
> +                     argv[i + 1] = cb->pending_nodes[i];
> +                     cb->inprogress_node_ids[i] = cb-
> >pending_node_ids[i];
> +                     cb->pending_nodes[i] = NULL;
> +             }
> +             cb->no_of_pending_nodes = 0;
> +             cb->no_of_inprogress_nodes = no_of_pending_nodes;
> +             if (no_of_pending_nodes == 0) {
> +                     osafassert(cb->is_scale_out_thread_running ==
> true);
> +                     cb->is_scale_out_thread_running = false;
> +             }
> +             osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex);
> +             if (no_of_pending_nodes == 0) break;
> +             execute_scale_out_script(no_of_pending_nodes + 1, argv);
> +             for (size_t i = 0; i != no_of_pending_nodes; ++i) {
> +                     free(argv[i + 1]);
> +             }
> +     }
> +     LOG_IN("Scale out thread terminating");
> +     TRACE_LEAVE();
> +     return NULL;
> +}
> +
> +/*
> + * Start the scale-out thread which executes the function
> +scale_out_thread() in
> + * a background thread for as long as there are pending nodes to scale
> +out. This
> + * function must not be called if the thread is already running.
> + */
> +static void start_scale_out_thread(CLMS_CB *cb) {
> +     pthread_attr_t attr;
> +     int result;
> +
> +     TRACE_ENTER();
> +     result = pthread_attr_init(&attr);
> +     if (result == 0) {
> +             result = pthread_attr_setdetachstate(&attr,
> +                     PTHREAD_CREATE_DETACHED);
> +             if (result != 0) {
> +                     LOG_ER("pthread_attr_setdetachstate() failed "
> +                             "with return code %d", result);
> +             }
> +             pthread_t thread;
> +             result = pthread_create(&thread, &attr, scale_out_thread,
> cb);
> +             if (result == 0) {
> +                     LOG_IN("Scale out thread started");
> +                     osafassert(cb->is_scale_out_thread_running ==
> false);
> +                     cb->is_scale_out_thread_running = true;
> +             } else {
> +                     LOG_ER("pthread_create() failed with return code
> %d",
> +                             result);
> +             }
> +             pthread_attr_destroy(&attr);
> +     } else {
> +             LOG_ER("pthread_attr_init() failed with return code %d",
> result);
> +     }
> +     TRACE_LEAVE();
> +}
> +
> +/*
> + * Add the node given by the @a nodeup_info parameter to the queue of
> +pending
> + * nodes to be scaled out, and start the scale-out thread if it is not
> +already
> + * running.
> + */
> +static void scale_out_node(CLMS_CB *cb,
> +     const clmsv_clms_node_up_info_t *nodeup_info) {
> +     char node_name[SA_MAX_NAME_LENGTH];
> +
> +     TRACE_ENTER();
> +     size_t name_len = nodeup_info->node_name.length <
> SA_MAX_NAME_LENGTH ?
> +             nodeup_info->node_name.length :
> +             (SA_MAX_NAME_LENGTH - 1);
> +     memcpy(node_name, nodeup_info->node_name.value,
> name_len);
> +     node_name[name_len] = '\0';
> +
> +     osaf_mutex_lock_ordie(&cb->scale_out_data_mutex);
> +     size_t no_of_pending_nodes = cb->no_of_pending_nodes;
> +     size_t no_of_inprogress_nodes = cb->no_of_inprogress_nodes;
> +     bool queue_the_node = true;
> +     for (size_t i = 0; i != no_of_inprogress_nodes && queue_the_node;
> ++i) {
> +             if (nodeup_info->node_id == cb->inprogress_node_ids[i]) {
> +                     LOG_IN("Node 0x%" PRIx32 " (%s) is already being "
> +                             "scaled out", nodeup_info->node_id,
> node_name);
> +                     queue_the_node = false;
> +             }
> +     }
> +     for (size_t i = 0; i != no_of_pending_nodes && queue_the_node;
> ++i) {
> +             if (nodeup_info->node_id == cb->pending_node_ids[i]) {
> +                     LOG_IN("Node 0x%" PRIx32 " (%s) is already queued
> for "
> +                             "scaled out", nodeup_info->node_id,
> node_name);
> +                     queue_the_node = false;
> +             }
> +     }
> +     if (no_of_pending_nodes >= MAX_PENDING_NODES &&
> queue_the_node) {
> +             LOG_ER("Failed to add node 0x%" PRIx32 " (%s): too many "
> +                     "pending nodes", nodeup_info->node_id,
> node_name);
> +                     queue_the_node = false;
> +     }
> +     if (queue_the_node) {
> +             char *strp;
> +             if (asprintf(&strp, "%" PRIu32 ",%s", nodeup_info->node_id,
> +                     node_name) != -1) {
> +                     LOG_NO("Queuing request to scale out node 0x%"
> PRIx32
> +                             " (%s)", nodeup_info->node_id,
> node_name);
> +                     osafassert(cb-
> >pending_nodes[no_of_pending_nodes] ==
> +                             NULL);
> +                     cb->pending_nodes[no_of_pending_nodes] = strp;
> +                     cb->pending_node_ids[no_of_pending_nodes] =
> +                             nodeup_info->node_id;
> +                     cb->no_of_pending_nodes++;
> +                     if (cb->is_scale_out_thread_running == false) {
> +                             start_scale_out_thread(cb);
> +                     }
> +             } else {
> +                     LOG_ER("Failed to add node 0x%" PRIx32 " (%s): "
> +                             "asprintf failed", nodeup_info->node_id,
> +                             node_name);
> +             }
> +     }
> +     osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex);
> +     TRACE_LEAVE();
> +}
> +
>  /**
>   * Processing Node up mesg from the node_agent utility
>   *
> @@ -281,7 +510,6 @@ uint32_t proc_node_up_msg(CLMS_CB * cb,
> 
>       node = clms_node_get_by_name(&node_name);
>       if (node == NULL) {
> -             clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST;
>               /* The /etc/opensaf/node_name is an user exposed
> configuration file.
>                * The node_name file contains the RDN value of the CLM
> node name.
>                * (a) When opensaf cluster configuration is pre-provisioned
> using the OpenSAF IMM tools:
> @@ -290,7 +518,13 @@ uint32_t proc_node_up_msg(CLMS_CB * cb,
>                * (b) When opensaf cluster nodes are dynamically added at
> runtime:
>                *     the /etc/opensaf/node_name should contain the rdn
> value.
>                */
> -             LOG_NO("Node '%s' requests to join the cluster but is
> unconfigured", nodeup_info->node_name.value);
> +             if (cb->scale_out_script != NULL) {
> +                     clm_msg.info.api_resp_info.rc =
> SA_AIS_ERR_TRY_AGAIN;
> +                     scale_out_node(cb, nodeup_info);
> +             } else {
> +                     clm_msg.info.api_resp_info.rc =
> SA_AIS_ERR_NOT_EXIST;
> +                     LOG_NO("Node '%s' requests to join the cluster but
> is unconfigured", nodeup_info->node_name.value);
> +             }
>       }
> 
>       if (node != NULL) {
> diff --git a/osaf/services/saf/clmsv/clms/clms_main.c
> b/osaf/services/saf/clmsv/clms/clms_main.c
> --- a/osaf/services/saf/clmsv/clms/clms_main.c
> +++ b/osaf/services/saf/clmsv/clms/clms_main.c
> @@ -226,6 +226,24 @@ uint32_t clms_cb_init(CLMS_CB * clms_cb)
>       clms_cb->is_impl_set = false;
>       clms_cb->rtu_pending = false; /* Flag to control try-again of rt-
> updates */
> 
> +     if (pthread_mutex_init(&clms_cb->scale_out_data_mutex, NULL) !=
> 0) {
> +             return NCSCC_RC_FAILURE;
> +     }
> +     clms_cb->no_of_pending_nodes = 0;
> +     clms_cb->no_of_inprogress_nodes = 0;
> +     for (int i = 0; i != (MAX_PENDING_NODES + 1); ++i) {
> +             clms_cb->pending_nodes[i] = NULL;
> +             clms_cb->pending_node_ids[i] = 0;
> +             clms_cb->inprogress_node_ids[i] = 0;
> +     }
> +     clms_cb->is_scale_out_thread_running = false;
> +     clms_cb->scale_out_script = NULL;
> +     char *tmp = getenv("CLMSV_SCALE_OUT_SCRIPT");
> +     if (tmp != NULL) {
> +             clms_cb->scale_out_script = strdup(tmp);
> +             if (clms_cb->scale_out_script == NULL) return
> NCSCC_RC_FAILURE;
> +     }
> +
>       /* Assign Version. Currently, hardcoded, This will change later */
>       clms_cb->clm_ver.releaseCode = CLM_RELEASE_CODE;
>       clms_cb->clm_ver.majorVersion = CLM_MAJOR_VERSION_4; diff --
> git a/osaf/services/saf/clmsv/config/clmd.conf
> b/osaf/services/saf/clmsv/config/clmd.conf
> --- a/osaf/services/saf/clmsv/config/clmd.conf
> +++ b/osaf/services/saf/clmsv/config/clmd.conf
> @@ -10,5 +10,14 @@
>  # Healthcheck keys
>  export CLMSV_ENV_HEALTHCHECK_KEY="Default"
> 
> +# Script to be called when a node which is not configured tries to join
> +the # cluster. The idea is that this script can check if the node is
> +eligible to be # added to the cluster, and if so add the necessary IMM
> +objects so that the node # will be able to join the next time it tries.
> +To enable this feature, configure # this variable with the full path to
> +the executable script. The script will be # called with one or more
> arguments, and each argument has the format "X,Y"
> +# where X is the node id in decimal and Y is the name of the node.
> +#export CLMSV_SCALE_OUT_SCRIPT="/usr/local/bin/scale_out_script"
> +
>  # Uncomment the next line to enable info level logging  #args="--
> loglevel=info"

------------------------------------------------------------------------------
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to