As agreed in our discussion, the following are the minor general changes
to the approach before pushing.

a) Let this 'feature' be controlled by the variable 
"OPENSAF_CLUSTERAUTO_SCALE_ENABLED"

b) Move this variable to nid.conf. This would be helpful
in the future if this script will get invoked from the 
individual nodes(say clmna or a cluster manager).

c) Make this as an opensaf's script itself. My suggestion for
the script name say opensaf_cluster_autoscale and installed
in pkglibdir/ dir. i.e. same directory as the opensaf_reboot script.
Note: I'm consciously using the word cluster instead of clm. In future
some other service might become an owner of this.

d) Call this script if OPENSAF_CLUSTERAUTO_SCALE_ENABLED=true
in nid.conf

e) By default, let the script have an example of adding this(instead of being 
an empty script).
Actually, if you can fill the content of the script, it will help me test this 
once. 
You could perhaps go ahead and push this before FC since it is functionally 
reviewed.
I will revert back once after iam able to test it.

Thanks,
Mathi.

----- [email protected] wrote:

> osaf/services/saf/clmsv/README           |   17 ++
>  osaf/services/saf/clmsv/clms/clms_cb.h   |   31 ++++
>  osaf/services/saf/clmsv/clms/clms_evt.c  |  238
> ++++++++++++++++++++++++++++++-
>  osaf/services/saf/clmsv/clms/clms_main.c |   18 ++
>  osaf/services/saf/clmsv/config/clmd.conf |    9 +
>  5 files changed, 311 insertions(+), 2 deletions(-)
> 
> 
> The scale-out feature makes it possible to run a custom script when a
> node which
> is not configured in IMM tries to join the cluster. The intention is
> that the
> custom script will check if the new node is eligible to be added to
> the cluster,
> and if so add the necessary IMM objects so that the node will be able
> to join
> the next time it tries. The script will be called with one or more
> arguments,
> where each argument represents a node which requests to be added to
> the
> cluster. Each argument has the format "X,Y" where X is the node id in
> decimal
> and Y is the name of the node.
> 
> To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT
> variable
> in clmd.conf to point to the executable script which shall be executed
> when new
> nodes try to join the cluster.
> 
> diff --git a/osaf/services/saf/clmsv/README
> b/osaf/services/saf/clmsv/README
> --- a/osaf/services/saf/clmsv/README
> +++ b/osaf/services/saf/clmsv/README
> @@ -54,6 +54,23 @@ SA_CLM_AF_INET(i.e. for IPv4) and SA_CLM
>  
>  The CLMA is the clm agent library that user applications shall link
> with.
>  
> +
> +SCALE OUT
> +
> +The scale-out feature makes it possible to run a custom script when a
> node which
> +is not configured in IMM tries to join the cluster. The intention is
> that the
> +custom script will check if the new node is eligible to be added to
> the cluster,
> +and if so add the necessary IMM objects so that the node will be able
> to join
> +the next time it tries. The script will be called with one or more
> arguments,
> +where each argument represents a node which requests to be added to
> the
> +cluster. Each argument has the format "X,Y" where X is the node id in
> decimal
> +and Y is the name of the node.
> +
> +To enable the scale-out feature in CLM, set the
> CLMSV_SCALE_OUT_SCRIPT variable
> +in clmd.conf to point to the executable script which shall be
> executed when new
> +nodes try to join the cluster.
> +
> +
>  DEPENDENCIES
>  
>  CLMSv depends on the following services:
> diff --git a/osaf/services/saf/clmsv/clms/clms_cb.h
> b/osaf/services/saf/clmsv/clms/clms_cb.h
> --- a/osaf/services/saf/clmsv/clms/clms_cb.h
> +++ b/osaf/services/saf/clmsv/clms/clms_cb.h
> @@ -18,10 +18,20 @@
>  #define CLMS_CB_H
>  
>  #include <stdbool.h>
> +#include <pthread.h>
>  
>  #define IMPLEMENTER_NAME     "safClmService"
>  #define CLMS_HA_INIT_STATE   0
>  
> +/* The maximum number of nodes that can be queued for scale-out while
> the
> +   scale-out script is executing */
> +#define MAX_PENDING_NODES    32
> +
> +/* The value to put in the PATH environment variable when calling
> the
> +   scale-out script */
> +#define SCALE_OUT_PATH_ENV SBINDIR ":" BINDIR ":/usr/local/sbin:" \
> +     "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
> +
>  typedef enum clms_tmr_type_t {
>       CLMS_TMR_BASE,
>       CLMS_CLIENT_RESP_TMR = CLMS_TMR_BASE,
> @@ -204,6 +214,27 @@ typedef struct clms_cb_t {
>       bool is_impl_set;
>       bool nid_started;       /**< true if started by NID */
>       NCS_PATRICIA_TREE iplist;       /* To temporarily store ipaddress
> information recieved in MDS NODE_UP */
> +
> +     /* Mutex protecting shared data used by the scale-out functionality
> */
> +     pthread_mutex_t scale_out_data_mutex;
> +     /* Number of occupied indices in the vectors pending_nodes[] and
> +      * pending_node_ids[] */
> +     size_t no_of_pending_nodes;
> +     /* Number of occupied indices in the vector inprogress_node_ids[]
> */
> +     size_t no_of_inprogress_nodes;
> +     /* Names of the nodes to be added in the next run of the scale-out
> +      * script */
> +     char *pending_nodes[MAX_PENDING_NODES + 1];
> +     /* Node ids of the nodes to be added in the next run of the the
> +      * scale-out script */
> +     SaClmNodeIdT pending_node_ids[MAX_PENDING_NODES + 1];
> +     /* Node ids of the nodes that are being added by the currently
> executing
> +      * instance of the scale-out script */
> +     SaClmNodeIdT inprogress_node_ids[MAX_PENDING_NODES + 1];
> +     /* True if the scale-out thread is currently running */
> +     bool is_scale_out_thread_running;
> +     /* Full path to the scale-out script */
> +     char *scale_out_script;
>  } CLMS_CB;
>  
>  typedef struct clms_lock_tmr_t {
> diff --git a/osaf/services/saf/clmsv/clms/clms_evt.c
> b/osaf/services/saf/clmsv/clms/clms_evt.c
> --- a/osaf/services/saf/clmsv/clms/clms_evt.c
> +++ b/osaf/services/saf/clmsv/clms/clms_evt.c
> @@ -15,9 +15,25 @@
>   *
>   */
>  
> +#define _GNU_SOURCE
>  #include <configmake.h>
>  
>  #include "clms.h"
> +#include <stdio.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <errno.h>
> +#include <pthread.h>
> +#include <inttypes.h>
> +#include <unistd.h>
> +#include <limits.h>
> +#include <sys/time.h>
> +#include <sys/resource.h>
> +#include <sys/types.h>
> +#include <sys/wait.h>
> +#include "logtrace.h"
> +#include "ncsgl_defs.h"
> +#include "osaf_utility.h"
>  
>  static uint32_t process_api_evt(CLMSV_CLMS_EVT * evt);
>  static uint32_t proc_clma_updn_mds_msg(CLMSV_CLMS_EVT * evt);
> @@ -26,6 +42,11 @@ static uint32_t proc_rda_evt(CLMSV_CLMS_
>  static uint32_t proc_mds_quiesced_ack_msg(CLMSV_CLMS_EVT * evt);
>  static uint32_t proc_node_lock_tmr_exp_msg(CLMSV_CLMS_EVT * evt);
>  static uint32_t proc_node_up_msg(CLMS_CB * cb, CLMSV_CLMS_EVT *
> evt);
> +static void execute_scale_out_script(int argc, char *argv[]);
> +static void *scale_out_thread(void *arg);
> +static void start_scale_out_thread(CLMS_CB *cb);
> +static void scale_out_node(CLMS_CB *cb,
> +     const clmsv_clms_node_up_info_t *nodeup_info);
>  static uint32_t proc_initialize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT *
> evt);
>  static uint32_t proc_finalize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT *
> evt);
>  static uint32_t proc_track_start_msg(CLMS_CB * cb, CLMSV_CLMS_EVT *
> evt);
> @@ -64,6 +85,8 @@ static const CLMSV_CLMS_CLMA_API_MSG_HAN
>       proc_node_up_msg
>  };
>  
> +static char scale_out_path_env[] = "PATH=" SCALE_OUT_PATH_ENV;
> +
>  /**
>  * Clear any pending clma_down records or node_down list
>  *
> @@ -251,6 +274,212 @@ CLMS_CLIENT_INFO *clms_client_new(MDS_DE
>       return client;
>  }
>  
> +/*
> + * Execute the scale-out script with the parameters specified in @a
> argc, and
> + * @a argv (analogous to the parameters taken by the main()
> function). The first
> + * index in argv must be the full path to the executable script file.
> The rest
> + * of the parameters shall specify the nodes to scale out. The format
> of each
> + * parameter must be two strings separated by a comma character,
> where the first
> + * string is the node id of a node to be scaled out (given as a
> decimal number),
> + * and the second string is the node name. This function blocks until
> the script
> + * has exited.
> + */
> +static void execute_scale_out_script(int argc, char *argv[])
> +{
> +     struct rlimit rlim;
> +     int nofile = 1024;
> +     char *const env[] = { scale_out_path_env, NULL };
> +
> +     TRACE_ENTER();
> +     osafassert(argc >= 1 && argv[argc] == NULL);
> +     LOG_NO("Running script %s to scale out %d node(s)", argv[0], argc -
> 1);
> +
> +     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0 && rlim.rlim_cur <=
> INT_MAX) {
> +             nofile = rlim.rlim_cur;
> +     } else {
> +             LOG_ER("getrlimit(RLIMIT_NOFILE) failed: %s", strerror(errno));
> +     }
> +
> +     pid_t child_pid = fork();
> +     if (child_pid == 0) {
> +             for (int fd = 3; fd < nofile; ++fd) close(fd);
> +             execve(argv[0], argv, env);
> +             _Exit(123);
> +     } else if (child_pid != (pid_t) -1) {
> +             int status;
> +             pid_t wait_pid;
> +             do {
> +                     wait_pid = waitpid(child_pid, &status, 0);
> +             } while (wait_pid == (pid_t) -1 && errno == EINTR);
> +             if (wait_pid != (pid_t) -1) {
> +                     if (!WIFEXITED(status)) {
> +                             LOG_ER("Scale out script %s terminated "
> +                                     "abnormally", argv[0]);
> +                     } else if (WEXITSTATUS(status) != 0) {
> +                             if (WEXITSTATUS(status) == 123) {
> +                                     LOG_ER("Scale out script %s could "
> +                                             "not be executed", argv[0]);
> +                             } else {
> +                                     LOG_ER("Scale out script %s failed "
> +                                             "with exit code %d", argv[0],
> +                                             WEXITSTATUS(status));
> +                             }
> +                     } else {
> +                             LOG_IN("Scale out script %s exited "
> +                                     "successfully", argv[0]);
> +                     }
> +             } else {
> +                     LOG_ER("Scale out script %s failed in waitpid(%u): %s",
> +                             argv[0], (unsigned) child_pid, strerror(errno));
> +             }
> +     } else {
> +             LOG_ER("Scale out script %s failed in fork(): %s", argv[0],
> +                     strerror(errno));
> +     }
> +     TRACE_LEAVE();
> +}
> +
> +/*
> + * This function is executed in a separate thread, and will
> continuously call
> + * the scale-out script to scale out pending nodes until
> no_of_pending_nodes
> + * becomes zero. The thread terminates itself when there are no more
> pending
> + * nodes.
> + */
> +static void *scale_out_thread(void *arg)
> +{
> +     CLMS_CB *cb = (CLMS_CB*) arg;
> +
> +     TRACE_ENTER();
> +     osafassert(cb->no_of_inprogress_nodes == 0);
> +     for (;;) {
> +             osaf_mutex_lock_ordie(&cb->scale_out_data_mutex);
> +             char *argv[MAX_PENDING_NODES + 2] = { cb->scale_out_script };
> +             size_t no_of_pending_nodes = cb->no_of_pending_nodes;
> +             osafassert(no_of_pending_nodes <= MAX_PENDING_NODES);
> +             for (size_t i = 0; i != (no_of_pending_nodes + 1); ++i) {
> +                     argv[i + 1] = cb->pending_nodes[i];
> +                     cb->inprogress_node_ids[i] = cb->pending_node_ids[i];
> +                     cb->pending_nodes[i] = NULL;
> +             }
> +             cb->no_of_pending_nodes = 0;
> +             cb->no_of_inprogress_nodes = no_of_pending_nodes;
> +             if (no_of_pending_nodes == 0) {
> +                     osafassert(cb->is_scale_out_thread_running == true);
> +                     cb->is_scale_out_thread_running = false;
> +             }
> +             osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex);
> +             if (no_of_pending_nodes == 0) break;
> +             execute_scale_out_script(no_of_pending_nodes + 1, argv);
> +             for (size_t i = 0; i != no_of_pending_nodes; ++i) {
> +                     free(argv[i + 1]);
> +             }
> +     }
> +     LOG_IN("Scale out thread terminating");
> +     TRACE_LEAVE();
> +     return NULL;
> +}
> +
> +/*
> + * Start the scale-out thread which executes the function
> scale_out_thread() in
> + * a background thread for as long as there are pending nodes to
> scale out. This
> + * function must not be called if the thread is already running.
> + */
> +static void start_scale_out_thread(CLMS_CB *cb)
> +{
> +     pthread_attr_t attr;
> +     int result;
> +
> +     TRACE_ENTER();
> +     result = pthread_attr_init(&attr);
> +     if (result == 0) {
> +             result = pthread_attr_setdetachstate(&attr,
> +                     PTHREAD_CREATE_DETACHED);
> +             if (result != 0) {
> +                     LOG_ER("pthread_attr_setdetachstate() failed "
> +                             "with return code %d", result);
> +             }
> +             pthread_t thread;
> +             result = pthread_create(&thread, &attr, scale_out_thread, cb);
> +             if (result == 0) {
> +                     LOG_IN("Scale out thread started");
> +                     osafassert(cb->is_scale_out_thread_running == false);
> +                     cb->is_scale_out_thread_running = true;
> +             } else {
> +                     LOG_ER("pthread_create() failed with return code %d",
> +                             result);
> +             }
> +             pthread_attr_destroy(&attr);
> +     } else {
> +             LOG_ER("pthread_attr_init() failed with return code %d", 
> result);
> +     }
> +     TRACE_LEAVE();
> +}
> +
> +/*
> + * Add the node given by the @a nodeup_info parameter to the queue of
> pending
> + * nodes to be scaled out, and start the scale-out thread if it is
> not already
> + * running.
> + */
> +static void scale_out_node(CLMS_CB *cb,
> +     const clmsv_clms_node_up_info_t *nodeup_info)
> +{
> +     char node_name[SA_MAX_NAME_LENGTH];
> +
> +     TRACE_ENTER();
> +     size_t name_len = nodeup_info->node_name.length < SA_MAX_NAME_LENGTH
> ?
> +             nodeup_info->node_name.length :
> +             (SA_MAX_NAME_LENGTH - 1);
> +     memcpy(node_name, nodeup_info->node_name.value, name_len);
> +     node_name[name_len] = '\0';
> +
> +     osaf_mutex_lock_ordie(&cb->scale_out_data_mutex);
> +     size_t no_of_pending_nodes = cb->no_of_pending_nodes;
> +     size_t no_of_inprogress_nodes = cb->no_of_inprogress_nodes;
> +     bool queue_the_node = true;
> +     for (size_t i = 0; i != no_of_inprogress_nodes && queue_the_node;
> ++i) {
> +             if (nodeup_info->node_id == cb->inprogress_node_ids[i]) {
> +                     LOG_IN("Node 0x%" PRIx32 " (%s) is already being "
> +                             "scaled out", nodeup_info->node_id, node_name);
> +                     queue_the_node = false;
> +             }
> +     }
> +     for (size_t i = 0; i != no_of_pending_nodes && queue_the_node; ++i)
> {
> +             if (nodeup_info->node_id == cb->pending_node_ids[i]) {
> +                     LOG_IN("Node 0x%" PRIx32 " (%s) is already queued for "
> +                             "scaled out", nodeup_info->node_id, node_name);
> +                     queue_the_node = false;
> +             }
> +     }
> +     if (no_of_pending_nodes >= MAX_PENDING_NODES && queue_the_node) {
> +             LOG_ER("Failed to add node 0x%" PRIx32 " (%s): too many "
> +                     "pending nodes", nodeup_info->node_id, node_name);
> +                     queue_the_node = false;
> +     }
> +     if (queue_the_node) {
> +             char *strp;
> +             if (asprintf(&strp, "%" PRIu32 ",%s", nodeup_info->node_id,
> +                     node_name) != -1) {
> +                     LOG_NO("Queuing request to scale out node 0x%" PRIx32
> +                             " (%s)", nodeup_info->node_id, node_name);
> +                     osafassert(cb->pending_nodes[no_of_pending_nodes] ==
> +                             NULL);
> +                     cb->pending_nodes[no_of_pending_nodes] = strp;
> +                     cb->pending_node_ids[no_of_pending_nodes] =
> +                             nodeup_info->node_id;
> +                     cb->no_of_pending_nodes++;
> +                     if (cb->is_scale_out_thread_running == false) {
> +                             start_scale_out_thread(cb);
> +                     }
> +             } else {
> +                     LOG_ER("Failed to add node 0x%" PRIx32 " (%s): "
> +                             "asprintf failed", nodeup_info->node_id,
> +                             node_name);
> +             }
> +     }
> +     osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex);
> +     TRACE_LEAVE();
> +}
> +
>  /**
>   * Processing Node up mesg from the node_agent utility
>   *                
> @@ -281,7 +510,6 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, 
>  
>       node = clms_node_get_by_name(&node_name);
>       if (node == NULL) {
> -             clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST;
>               /* The /etc/opensaf/node_name is an user exposed configuration
> file.
>                * The node_name file contains the RDN value of the CLM node 
> name.
>                * (a) When opensaf cluster configuration is pre-provisioned 
> using
> the OpenSAF IMM tools:
> @@ -290,7 +518,13 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, 
>                * (b) When opensaf cluster nodes are dynamically added at
> runtime:
>                *     the /etc/opensaf/node_name should contain the rdn value.
>                */
> -             LOG_NO("Node '%s' requests to join the cluster but is
> unconfigured", nodeup_info->node_name.value);
> +             if (cb->scale_out_script != NULL) {
> +                     clm_msg.info.api_resp_info.rc = SA_AIS_ERR_TRY_AGAIN;
> +                     scale_out_node(cb, nodeup_info);
> +             } else {
> +                     clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST;
> +                     LOG_NO("Node '%s' requests to join the cluster but is
> unconfigured", nodeup_info->node_name.value);
> +             }
>       }
>  
>       if (node != NULL) {
> diff --git a/osaf/services/saf/clmsv/clms/clms_main.c
> b/osaf/services/saf/clmsv/clms/clms_main.c
> --- a/osaf/services/saf/clmsv/clms/clms_main.c
> +++ b/osaf/services/saf/clmsv/clms/clms_main.c
> @@ -226,6 +226,24 @@ uint32_t clms_cb_init(CLMS_CB * clms_cb)
>       clms_cb->is_impl_set = false;
>       clms_cb->rtu_pending = false; /* Flag to control try-again of
> rt-updates */
>  
> +     if (pthread_mutex_init(&clms_cb->scale_out_data_mutex, NULL) != 0)
> {
> +             return NCSCC_RC_FAILURE;
> +     }
> +     clms_cb->no_of_pending_nodes = 0;
> +     clms_cb->no_of_inprogress_nodes = 0;
> +     for (int i = 0; i != (MAX_PENDING_NODES + 1); ++i) {
> +             clms_cb->pending_nodes[i] = NULL;
> +             clms_cb->pending_node_ids[i] = 0;
> +             clms_cb->inprogress_node_ids[i] = 0;
> +     }
> +     clms_cb->is_scale_out_thread_running = false;
> +     clms_cb->scale_out_script = NULL;
> +     char *tmp = getenv("CLMSV_SCALE_OUT_SCRIPT");
> +     if (tmp != NULL) {
> +             clms_cb->scale_out_script = strdup(tmp);
> +             if (clms_cb->scale_out_script == NULL) return NCSCC_RC_FAILURE;
> +     }
> +
>       /* Assign Version. Currently, hardcoded, This will change later */
>       clms_cb->clm_ver.releaseCode = CLM_RELEASE_CODE;
>       clms_cb->clm_ver.majorVersion = CLM_MAJOR_VERSION_4;
> diff --git a/osaf/services/saf/clmsv/config/clmd.conf
> b/osaf/services/saf/clmsv/config/clmd.conf
> --- a/osaf/services/saf/clmsv/config/clmd.conf
> +++ b/osaf/services/saf/clmsv/config/clmd.conf
> @@ -10,5 +10,14 @@
>  # Healthcheck keys
>  export CLMSV_ENV_HEALTHCHECK_KEY="Default"
>  
> +# Script to be called when a node which is not configured tries to
> join the
> +# cluster. The idea is that this script can check if the node is
> eligible to be
> +# added to the cluster, and if so add the necessary IMM objects so
> that the node
> +# will be able to join the next time it tries. To enable this
> feature, configure
> +# this variable with the full path to the executable script. The
> script will be
> +# called with one or more arguments, and each argument has the format
> "X,Y"
> +# where X is the node id in decimal and Y is the name of the node.
> +#export CLMSV_SCALE_OUT_SCRIPT="/usr/local/bin/scale_out_script"
> +
>  # Uncomment the next line to enable info level logging
>  #args="--loglevel=info"

------------------------------------------------------------------------------
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to