A good use case I think and Appreciate the work. Will (also nitpick and) revert back :-)!
Mathi. > -----Original Message----- > From: Anders Widell [mailto:[email protected]] > Sent: Tuesday, September 29, 2015 1:07 AM > To: Mathivanan Naickan Palanivelu > Cc: [email protected] > Subject: [PATCH 1 of 2] clmd: Add support for scale-out by calling a custom > script to configure new nodes [#1453] > > osaf/services/saf/clmsv/README | 17 ++ > osaf/services/saf/clmsv/clms/clms_cb.h | 31 ++++ > osaf/services/saf/clmsv/clms/clms_evt.c | 238 > ++++++++++++++++++++++++++++++- > osaf/services/saf/clmsv/clms/clms_main.c | 18 ++ > osaf/services/saf/clmsv/config/clmd.conf | 9 + > 5 files changed, 311 insertions(+), 2 deletions(-) > > > The scale-out feature makes it possible to run a custom script when a node > which is not configured in IMM tries to join the cluster. The intention is > that > the custom script will check if the new node is eligible to be added to the > cluster, and if so add the necessary IMM objects so that the node will be able > to join the next time it tries. The script will be called with one or more > arguments, where each argument represents a node which requests to be > added to the cluster. Each argument has the format "X,Y" where X is the > node id in decimal and Y is the name of the node. > > To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT > variable in clmd.conf to point to the executable script which shall be > executed when new nodes try to join the cluster. > > diff --git a/osaf/services/saf/clmsv/README > b/osaf/services/saf/clmsv/README > --- a/osaf/services/saf/clmsv/README > +++ b/osaf/services/saf/clmsv/README > @@ -54,6 +54,23 @@ SA_CLM_AF_INET(i.e. for IPv4) and SA_CLM > > The CLMA is the clm agent library that user applications shall link with. > > + > +SCALE OUT > + > +The scale-out feature makes it possible to run a custom script when a > +node which is not configured in IMM tries to join the cluster. The > +intention is that the custom script will check if the new node is > +eligible to be added to the cluster, and if so add the necessary IMM > +objects so that the node will be able to join the next time it tries. > +The script will be called with one or more arguments, where each > +argument represents a node which requests to be added to the cluster. > +Each argument has the format "X,Y" where X is the node id in decimal and Y > is the name of the node. > + > +To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT > +variable in clmd.conf to point to the executable script which shall be > +executed when new nodes try to join the cluster. > + > + > DEPENDENCIES > > CLMSv depends on the following services: > diff --git a/osaf/services/saf/clmsv/clms/clms_cb.h > b/osaf/services/saf/clmsv/clms/clms_cb.h > --- a/osaf/services/saf/clmsv/clms/clms_cb.h > +++ b/osaf/services/saf/clmsv/clms/clms_cb.h > @@ -18,10 +18,20 @@ > #define CLMS_CB_H > > #include <stdbool.h> > +#include <pthread.h> > > #define IMPLEMENTER_NAME "safClmService" > #define CLMS_HA_INIT_STATE 0 > > +/* The maximum number of nodes that can be queued for scale-out while > the > + scale-out script is executing */ > +#define MAX_PENDING_NODES 32 > + > +/* The value to put in the PATH environment variable when calling the > + scale-out script */ > +#define SCALE_OUT_PATH_ENV SBINDIR ":" BINDIR ":/usr/local/sbin:" \ > + "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" > + > typedef enum clms_tmr_type_t { > CLMS_TMR_BASE, > CLMS_CLIENT_RESP_TMR = CLMS_TMR_BASE, > @@ -204,6 +214,27 @@ typedef struct clms_cb_t { > bool is_impl_set; > bool nid_started; /**< true if started by NID */ > NCS_PATRICIA_TREE iplist; /* To temporarily store ipaddress > information recieved in MDS NODE_UP */ > + > + /* Mutex protecting shared data used by the scale-out functionality > */ > + pthread_mutex_t scale_out_data_mutex; > + /* Number of occupied indices in the vectors pending_nodes[] and > + * pending_node_ids[] */ > + size_t no_of_pending_nodes; > + /* Number of occupied indices in the vector inprogress_node_ids[] > */ > + size_t no_of_inprogress_nodes; > + /* Names of the nodes to be added in the next run of the scale-out > + * script */ > + char *pending_nodes[MAX_PENDING_NODES + 1]; > + /* Node ids of the nodes to be added in the next run of the the > + * scale-out script */ > + SaClmNodeIdT pending_node_ids[MAX_PENDING_NODES + 1]; > + /* Node ids of the nodes that are being added by the currently > executing > + * instance of the scale-out script */ > + SaClmNodeIdT inprogress_node_ids[MAX_PENDING_NODES + 1]; > + /* True if the scale-out thread is currently running */ > + bool is_scale_out_thread_running; > + /* Full path to the scale-out script */ > + char *scale_out_script; > } CLMS_CB; > > typedef struct clms_lock_tmr_t { > diff --git a/osaf/services/saf/clmsv/clms/clms_evt.c > b/osaf/services/saf/clmsv/clms/clms_evt.c > --- a/osaf/services/saf/clmsv/clms/clms_evt.c > +++ b/osaf/services/saf/clmsv/clms/clms_evt.c > @@ -15,9 +15,25 @@ > * > */ > > +#define _GNU_SOURCE > #include <configmake.h> > > #include "clms.h" > +#include <stdio.h> > +#include <string.h> > +#include <stdlib.h> > +#include <errno.h> > +#include <pthread.h> > +#include <inttypes.h> > +#include <unistd.h> > +#include <limits.h> > +#include <sys/time.h> > +#include <sys/resource.h> > +#include <sys/types.h> > +#include <sys/wait.h> > +#include "logtrace.h" > +#include "ncsgl_defs.h" > +#include "osaf_utility.h" > > static uint32_t process_api_evt(CLMSV_CLMS_EVT * evt); static uint32_t > proc_clma_updn_mds_msg(CLMSV_CLMS_EVT * evt); @@ -26,6 +42,11 @@ > static uint32_t proc_rda_evt(CLMSV_CLMS_ static uint32_t > proc_mds_quiesced_ack_msg(CLMSV_CLMS_EVT * evt); static uint32_t > proc_node_lock_tmr_exp_msg(CLMSV_CLMS_EVT * evt); static uint32_t > proc_node_up_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); > +static void execute_scale_out_script(int argc, char *argv[]); static > +void *scale_out_thread(void *arg); static void > +start_scale_out_thread(CLMS_CB *cb); static void > scale_out_node(CLMS_CB > +*cb, > + const clmsv_clms_node_up_info_t *nodeup_info); > static uint32_t proc_initialize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); > static uint32_t proc_finalize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); > static uint32_t proc_track_start_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * > evt); @@ -64,6 +85,8 @@ static const CLMSV_CLMS_CLMA_API_MSG_HAN > proc_node_up_msg > }; > > +static char scale_out_path_env[] = "PATH=" SCALE_OUT_PATH_ENV; > + > /** > * Clear any pending clma_down records or node_down list > * > @@ -251,6 +274,212 @@ CLMS_CLIENT_INFO *clms_client_new(MDS_DE > return client; > } > > +/* > + * Execute the scale-out script with the parameters specified in @a > +argc, and > + * @a argv (analogous to the parameters taken by the main() function). > +The first > + * index in argv must be the full path to the executable script file. > +The rest > + * of the parameters shall specify the nodes to scale out. The format > +of each > + * parameter must be two strings separated by a comma character, where > +the first > + * string is the node id of a node to be scaled out (given as a decimal > +number), > + * and the second string is the node name. This function blocks until > +the script > + * has exited. > + */ > +static void execute_scale_out_script(int argc, char *argv[]) { > + struct rlimit rlim; > + int nofile = 1024; > + char *const env[] = { scale_out_path_env, NULL }; > + > + TRACE_ENTER(); > + osafassert(argc >= 1 && argv[argc] == NULL); > + LOG_NO("Running script %s to scale out %d node(s)", argv[0], argc - > +1); > + > + if (getrlimit(RLIMIT_NOFILE, &rlim) == 0 && rlim.rlim_cur <= > INT_MAX) { > + nofile = rlim.rlim_cur; > + } else { > + LOG_ER("getrlimit(RLIMIT_NOFILE) failed: %s", > strerror(errno)); > + } > + > + pid_t child_pid = fork(); > + if (child_pid == 0) { > + for (int fd = 3; fd < nofile; ++fd) close(fd); > + execve(argv[0], argv, env); > + _Exit(123); > + } else if (child_pid != (pid_t) -1) { > + int status; > + pid_t wait_pid; > + do { > + wait_pid = waitpid(child_pid, &status, 0); > + } while (wait_pid == (pid_t) -1 && errno == EINTR); > + if (wait_pid != (pid_t) -1) { > + if (!WIFEXITED(status)) { > + LOG_ER("Scale out script %s terminated " > + "abnormally", argv[0]); > + } else if (WEXITSTATUS(status) != 0) { > + if (WEXITSTATUS(status) == 123) { > + LOG_ER("Scale out script %s could " > + "not be executed", argv[0]); > + } else { > + LOG_ER("Scale out script %s failed " > + "with exit code %d", argv[0], > + WEXITSTATUS(status)); > + } > + } else { > + LOG_IN("Scale out script %s exited " > + "successfully", argv[0]); > + } > + } else { > + LOG_ER("Scale out script %s failed in waitpid(%u): > %s", > + argv[0], (unsigned) child_pid, > strerror(errno)); > + } > + } else { > + LOG_ER("Scale out script %s failed in fork(): %s", argv[0], > + strerror(errno)); > + } > + TRACE_LEAVE(); > +} > + > +/* > + * This function is executed in a separate thread, and will > +continuously call > + * the scale-out script to scale out pending nodes until > +no_of_pending_nodes > + * becomes zero. The thread terminates itself when there are no more > +pending > + * nodes. > + */ > +static void *scale_out_thread(void *arg) { > + CLMS_CB *cb = (CLMS_CB*) arg; > + > + TRACE_ENTER(); > + osafassert(cb->no_of_inprogress_nodes == 0); > + for (;;) { > + osaf_mutex_lock_ordie(&cb->scale_out_data_mutex); > + char *argv[MAX_PENDING_NODES + 2] = { cb- > >scale_out_script }; > + size_t no_of_pending_nodes = cb->no_of_pending_nodes; > + osafassert(no_of_pending_nodes <= > MAX_PENDING_NODES); > + for (size_t i = 0; i != (no_of_pending_nodes + 1); ++i) { > + argv[i + 1] = cb->pending_nodes[i]; > + cb->inprogress_node_ids[i] = cb- > >pending_node_ids[i]; > + cb->pending_nodes[i] = NULL; > + } > + cb->no_of_pending_nodes = 0; > + cb->no_of_inprogress_nodes = no_of_pending_nodes; > + if (no_of_pending_nodes == 0) { > + osafassert(cb->is_scale_out_thread_running == > true); > + cb->is_scale_out_thread_running = false; > + } > + osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex); > + if (no_of_pending_nodes == 0) break; > + execute_scale_out_script(no_of_pending_nodes + 1, argv); > + for (size_t i = 0; i != no_of_pending_nodes; ++i) { > + free(argv[i + 1]); > + } > + } > + LOG_IN("Scale out thread terminating"); > + TRACE_LEAVE(); > + return NULL; > +} > + > +/* > + * Start the scale-out thread which executes the function > +scale_out_thread() in > + * a background thread for as long as there are pending nodes to scale > +out. This > + * function must not be called if the thread is already running. > + */ > +static void start_scale_out_thread(CLMS_CB *cb) { > + pthread_attr_t attr; > + int result; > + > + TRACE_ENTER(); > + result = pthread_attr_init(&attr); > + if (result == 0) { > + result = pthread_attr_setdetachstate(&attr, > + PTHREAD_CREATE_DETACHED); > + if (result != 0) { > + LOG_ER("pthread_attr_setdetachstate() failed " > + "with return code %d", result); > + } > + pthread_t thread; > + result = pthread_create(&thread, &attr, scale_out_thread, > cb); > + if (result == 0) { > + LOG_IN("Scale out thread started"); > + osafassert(cb->is_scale_out_thread_running == > false); > + cb->is_scale_out_thread_running = true; > + } else { > + LOG_ER("pthread_create() failed with return code > %d", > + result); > + } > + pthread_attr_destroy(&attr); > + } else { > + LOG_ER("pthread_attr_init() failed with return code %d", > result); > + } > + TRACE_LEAVE(); > +} > + > +/* > + * Add the node given by the @a nodeup_info parameter to the queue of > +pending > + * nodes to be scaled out, and start the scale-out thread if it is not > +already > + * running. > + */ > +static void scale_out_node(CLMS_CB *cb, > + const clmsv_clms_node_up_info_t *nodeup_info) { > + char node_name[SA_MAX_NAME_LENGTH]; > + > + TRACE_ENTER(); > + size_t name_len = nodeup_info->node_name.length < > SA_MAX_NAME_LENGTH ? > + nodeup_info->node_name.length : > + (SA_MAX_NAME_LENGTH - 1); > + memcpy(node_name, nodeup_info->node_name.value, > name_len); > + node_name[name_len] = '\0'; > + > + osaf_mutex_lock_ordie(&cb->scale_out_data_mutex); > + size_t no_of_pending_nodes = cb->no_of_pending_nodes; > + size_t no_of_inprogress_nodes = cb->no_of_inprogress_nodes; > + bool queue_the_node = true; > + for (size_t i = 0; i != no_of_inprogress_nodes && queue_the_node; > ++i) { > + if (nodeup_info->node_id == cb->inprogress_node_ids[i]) { > + LOG_IN("Node 0x%" PRIx32 " (%s) is already being " > + "scaled out", nodeup_info->node_id, > node_name); > + queue_the_node = false; > + } > + } > + for (size_t i = 0; i != no_of_pending_nodes && queue_the_node; > ++i) { > + if (nodeup_info->node_id == cb->pending_node_ids[i]) { > + LOG_IN("Node 0x%" PRIx32 " (%s) is already queued > for " > + "scaled out", nodeup_info->node_id, > node_name); > + queue_the_node = false; > + } > + } > + if (no_of_pending_nodes >= MAX_PENDING_NODES && > queue_the_node) { > + LOG_ER("Failed to add node 0x%" PRIx32 " (%s): too many " > + "pending nodes", nodeup_info->node_id, > node_name); > + queue_the_node = false; > + } > + if (queue_the_node) { > + char *strp; > + if (asprintf(&strp, "%" PRIu32 ",%s", nodeup_info->node_id, > + node_name) != -1) { > + LOG_NO("Queuing request to scale out node 0x%" > PRIx32 > + " (%s)", nodeup_info->node_id, > node_name); > + osafassert(cb- > >pending_nodes[no_of_pending_nodes] == > + NULL); > + cb->pending_nodes[no_of_pending_nodes] = strp; > + cb->pending_node_ids[no_of_pending_nodes] = > + nodeup_info->node_id; > + cb->no_of_pending_nodes++; > + if (cb->is_scale_out_thread_running == false) { > + start_scale_out_thread(cb); > + } > + } else { > + LOG_ER("Failed to add node 0x%" PRIx32 " (%s): " > + "asprintf failed", nodeup_info->node_id, > + node_name); > + } > + } > + osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex); > + TRACE_LEAVE(); > +} > + > /** > * Processing Node up mesg from the node_agent utility > * > @@ -281,7 +510,6 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, > > node = clms_node_get_by_name(&node_name); > if (node == NULL) { > - clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST; > /* The /etc/opensaf/node_name is an user exposed > configuration file. > * The node_name file contains the RDN value of the CLM > node name. > * (a) When opensaf cluster configuration is pre-provisioned > using the OpenSAF IMM tools: > @@ -290,7 +518,13 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, > * (b) When opensaf cluster nodes are dynamically added at > runtime: > * the /etc/opensaf/node_name should contain the rdn > value. > */ > - LOG_NO("Node '%s' requests to join the cluster but is > unconfigured", nodeup_info->node_name.value); > + if (cb->scale_out_script != NULL) { > + clm_msg.info.api_resp_info.rc = > SA_AIS_ERR_TRY_AGAIN; > + scale_out_node(cb, nodeup_info); > + } else { > + clm_msg.info.api_resp_info.rc = > SA_AIS_ERR_NOT_EXIST; > + LOG_NO("Node '%s' requests to join the cluster but > is unconfigured", nodeup_info->node_name.value); > + } > } > > if (node != NULL) { > diff --git a/osaf/services/saf/clmsv/clms/clms_main.c > b/osaf/services/saf/clmsv/clms/clms_main.c > --- a/osaf/services/saf/clmsv/clms/clms_main.c > +++ b/osaf/services/saf/clmsv/clms/clms_main.c > @@ -226,6 +226,24 @@ uint32_t clms_cb_init(CLMS_CB * clms_cb) > clms_cb->is_impl_set = false; > clms_cb->rtu_pending = false; /* Flag to control try-again of rt- > updates */ > > + if (pthread_mutex_init(&clms_cb->scale_out_data_mutex, NULL) != > 0) { > + return NCSCC_RC_FAILURE; > + } > + clms_cb->no_of_pending_nodes = 0; > + clms_cb->no_of_inprogress_nodes = 0; > + for (int i = 0; i != (MAX_PENDING_NODES + 1); ++i) { > + clms_cb->pending_nodes[i] = NULL; > + clms_cb->pending_node_ids[i] = 0; > + clms_cb->inprogress_node_ids[i] = 0; > + } > + clms_cb->is_scale_out_thread_running = false; > + clms_cb->scale_out_script = NULL; > + char *tmp = getenv("CLMSV_SCALE_OUT_SCRIPT"); > + if (tmp != NULL) { > + clms_cb->scale_out_script = strdup(tmp); > + if (clms_cb->scale_out_script == NULL) return > NCSCC_RC_FAILURE; > + } > + > /* Assign Version. Currently, hardcoded, This will change later */ > clms_cb->clm_ver.releaseCode = CLM_RELEASE_CODE; > clms_cb->clm_ver.majorVersion = CLM_MAJOR_VERSION_4; diff -- > git a/osaf/services/saf/clmsv/config/clmd.conf > b/osaf/services/saf/clmsv/config/clmd.conf > --- a/osaf/services/saf/clmsv/config/clmd.conf > +++ b/osaf/services/saf/clmsv/config/clmd.conf > @@ -10,5 +10,14 @@ > # Healthcheck keys > export CLMSV_ENV_HEALTHCHECK_KEY="Default" > > +# Script to be called when a node which is not configured tries to join > +the # cluster. The idea is that this script can check if the node is > +eligible to be # added to the cluster, and if so add the necessary IMM > +objects so that the node # will be able to join the next time it tries. > +To enable this feature, configure # this variable with the full path to > +the executable script. The script will be # called with one or more > arguments, and each argument has the format "X,Y" > +# where X is the node id in decimal and Y is the name of the node. > +#export CLMSV_SCALE_OUT_SCRIPT="/usr/local/bin/scale_out_script" > + > # Uncomment the next line to enable info level logging #args="-- > loglevel=info" ------------------------------------------------------------------------------ _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel
