Agreed. Mathi. ----- [email protected] wrote:
> One comment inline. > > thanks, > Anders Widell > > On 10/01/2015 10:51 AM, Mathivanan Naickan Palanivelu wrote: > > As agreed in our discussion, the following are the minor general > changes > > to the approach before pushing. > > > > a) Let this 'feature' be controlled by the variable > > "OPENSAF_CLUSTERAUTO_SCALE_ENABLED" > > > > b) Move this variable to nid.conf. This would be helpful > > in the future if this script will get invoked from the > > individual nodes(say clmna or a cluster manager). > > > > c) Make this as an opensaf's script itself. My suggestion for > > the script name say opensaf_cluster_autoscale and installed > > in pkglibdir/ dir. i.e. same directory as the opensaf_reboot > script. > > Note: I'm consciously using the word cluster instead of clm. In > future > > some other service might become an owner of this. > > > > d) Call this script if OPENSAF_CLUSTERAUTO_SCALE_ENABLED=true > > in nid.conf > I think we previously have used "1" for enable and "0" for disable, so > I > think we should stick with that instead of "true". > > > > e) By default, let the script have an example of adding this(instead > of being an empty script). > > Actually, if you can fill the content of the script, it will help me > test this once. > > You could perhaps go ahead and push this before FC since it is > functionally reviewed. > > I will revert back once after iam able to test it. > > > > Thanks, > > Mathi. > > > > ----- [email protected] wrote: > > > >> osaf/services/saf/clmsv/README | 17 ++ > >> osaf/services/saf/clmsv/clms/clms_cb.h | 31 ++++ > >> osaf/services/saf/clmsv/clms/clms_evt.c | 238 > >> ++++++++++++++++++++++++++++++- > >> osaf/services/saf/clmsv/clms/clms_main.c | 18 ++ > >> osaf/services/saf/clmsv/config/clmd.conf | 9 + > >> 5 files changed, 311 insertions(+), 2 deletions(-) > >> > >> > >> The scale-out feature makes it possible to run a custom script when > a > >> node which > >> is not configured in IMM tries to join the cluster. The intention > is > >> that the > >> custom script will check if the new node is eligible to be added > to > >> the cluster, > >> and if so add the necessary IMM objects so that the node will be > able > >> to join > >> the next time it tries. The script will be called with one or more > >> arguments, > >> where each argument represents a node which requests to be added > to > >> the > >> cluster. Each argument has the format "X,Y" where X is the node id > in > >> decimal > >> and Y is the name of the node. > >> > >> To enable the scale-out feature in CLM, set the > CLMSV_SCALE_OUT_SCRIPT > >> variable > >> in clmd.conf to point to the executable script which shall be > executed > >> when new > >> nodes try to join the cluster. > >> > >> diff --git a/osaf/services/saf/clmsv/README > >> b/osaf/services/saf/clmsv/README > >> --- a/osaf/services/saf/clmsv/README > >> +++ b/osaf/services/saf/clmsv/README > >> @@ -54,6 +54,23 @@ SA_CLM_AF_INET(i.e. for IPv4) and SA_CLM > >> > >> The CLMA is the clm agent library that user applications shall > link > >> with. > >> > >> + > >> +SCALE OUT > >> + > >> +The scale-out feature makes it possible to run a custom script > when a > >> node which > >> +is not configured in IMM tries to join the cluster. The intention > is > >> that the > >> +custom script will check if the new node is eligible to be added > to > >> the cluster, > >> +and if so add the necessary IMM objects so that the node will be > able > >> to join > >> +the next time it tries. The script will be called with one or > more > >> arguments, > >> +where each argument represents a node which requests to be added > to > >> the > >> +cluster. Each argument has the format "X,Y" where X is the node id > in > >> decimal > >> +and Y is the name of the node. > >> + > >> +To enable the scale-out feature in CLM, set the > >> CLMSV_SCALE_OUT_SCRIPT variable > >> +in clmd.conf to point to the executable script which shall be > >> executed when new > >> +nodes try to join the cluster. > >> + > >> + > >> DEPENDENCIES > >> > >> CLMSv depends on the following services: > >> diff --git a/osaf/services/saf/clmsv/clms/clms_cb.h > >> b/osaf/services/saf/clmsv/clms/clms_cb.h > >> --- a/osaf/services/saf/clmsv/clms/clms_cb.h > >> +++ b/osaf/services/saf/clmsv/clms/clms_cb.h > >> @@ -18,10 +18,20 @@ > >> #define CLMS_CB_H > >> > >> #include <stdbool.h> > >> +#include <pthread.h> > >> > >> #define IMPLEMENTER_NAME "safClmService" > >> #define CLMS_HA_INIT_STATE 0 > >> > >> +/* The maximum number of nodes that can be queued for scale-out > while > >> the > >> + scale-out script is executing */ > >> +#define MAX_PENDING_NODES 32 > >> + > >> +/* The value to put in the PATH environment variable when calling > >> the > >> + scale-out script */ > >> +#define SCALE_OUT_PATH_ENV SBINDIR ":" BINDIR ":/usr/local/sbin:" > \ > >> + "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" > >> + > >> typedef enum clms_tmr_type_t { > >> CLMS_TMR_BASE, > >> CLMS_CLIENT_RESP_TMR = CLMS_TMR_BASE, > >> @@ -204,6 +214,27 @@ typedef struct clms_cb_t { > >> bool is_impl_set; > >> bool nid_started; /**< true if started by NID */ > >> NCS_PATRICIA_TREE iplist; /* To temporarily store ipaddress > >> information recieved in MDS NODE_UP */ > >> + > >> + /* Mutex protecting shared data used by the scale-out > functionality > >> */ > >> + pthread_mutex_t scale_out_data_mutex; > >> + /* Number of occupied indices in the vectors pending_nodes[] and > >> + * pending_node_ids[] */ > >> + size_t no_of_pending_nodes; > >> + /* Number of occupied indices in the vector > inprogress_node_ids[] > >> */ > >> + size_t no_of_inprogress_nodes; > >> + /* Names of the nodes to be added in the next run of the > scale-out > >> + * script */ > >> + char *pending_nodes[MAX_PENDING_NODES + 1]; > >> + /* Node ids of the nodes to be added in the next run of the the > >> + * scale-out script */ > >> + SaClmNodeIdT pending_node_ids[MAX_PENDING_NODES + 1]; > >> + /* Node ids of the nodes that are being added by the currently > >> executing > >> + * instance of the scale-out script */ > >> + SaClmNodeIdT inprogress_node_ids[MAX_PENDING_NODES + 1]; > >> + /* True if the scale-out thread is currently running */ > >> + bool is_scale_out_thread_running; > >> + /* Full path to the scale-out script */ > >> + char *scale_out_script; > >> } CLMS_CB; > >> > >> typedef struct clms_lock_tmr_t { > >> diff --git a/osaf/services/saf/clmsv/clms/clms_evt.c > >> b/osaf/services/saf/clmsv/clms/clms_evt.c > >> --- a/osaf/services/saf/clmsv/clms/clms_evt.c > >> +++ b/osaf/services/saf/clmsv/clms/clms_evt.c > >> @@ -15,9 +15,25 @@ > >> * > >> */ > >> > >> +#define _GNU_SOURCE > >> #include <configmake.h> > >> > >> #include "clms.h" > >> +#include <stdio.h> > >> +#include <string.h> > >> +#include <stdlib.h> > >> +#include <errno.h> > >> +#include <pthread.h> > >> +#include <inttypes.h> > >> +#include <unistd.h> > >> +#include <limits.h> > >> +#include <sys/time.h> > >> +#include <sys/resource.h> > >> +#include <sys/types.h> > >> +#include <sys/wait.h> > >> +#include "logtrace.h" > >> +#include "ncsgl_defs.h" > >> +#include "osaf_utility.h" > >> > >> static uint32_t process_api_evt(CLMSV_CLMS_EVT * evt); > >> static uint32_t proc_clma_updn_mds_msg(CLMSV_CLMS_EVT * evt); > >> @@ -26,6 +42,11 @@ static uint32_t proc_rda_evt(CLMSV_CLMS_ > >> static uint32_t proc_mds_quiesced_ack_msg(CLMSV_CLMS_EVT * evt); > >> static uint32_t proc_node_lock_tmr_exp_msg(CLMSV_CLMS_EVT * > evt); > >> static uint32_t proc_node_up_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * > >> evt); > >> +static void execute_scale_out_script(int argc, char *argv[]); > >> +static void *scale_out_thread(void *arg); > >> +static void start_scale_out_thread(CLMS_CB *cb); > >> +static void scale_out_node(CLMS_CB *cb, > >> + const clmsv_clms_node_up_info_t *nodeup_info); > >> static uint32_t proc_initialize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT > * > >> evt); > >> static uint32_t proc_finalize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * > >> evt); > >> static uint32_t proc_track_start_msg(CLMS_CB * cb, CLMSV_CLMS_EVT > * > >> evt); > >> @@ -64,6 +85,8 @@ static const CLMSV_CLMS_CLMA_API_MSG_HAN > >> proc_node_up_msg > >> }; > >> > >> +static char scale_out_path_env[] = "PATH=" SCALE_OUT_PATH_ENV; > >> + > >> /** > >> * Clear any pending clma_down records or node_down list > >> * > >> @@ -251,6 +274,212 @@ CLMS_CLIENT_INFO *clms_client_new(MDS_DE > >> return client; > >> } > >> > >> +/* > >> + * Execute the scale-out script with the parameters specified in > @a > >> argc, and > >> + * @a argv (analogous to the parameters taken by the main() > >> function). The first > >> + * index in argv must be the full path to the executable script > file. > >> The rest > >> + * of the parameters shall specify the nodes to scale out. The > format > >> of each > >> + * parameter must be two strings separated by a comma character, > >> where the first > >> + * string is the node id of a node to be scaled out (given as a > >> decimal number), > >> + * and the second string is the node name. This function blocks > until > >> the script > >> + * has exited. > >> + */ > >> +static void execute_scale_out_script(int argc, char *argv[]) > >> +{ > >> + struct rlimit rlim; > >> + int nofile = 1024; > >> + char *const env[] = { scale_out_path_env, NULL }; > >> + > >> + TRACE_ENTER(); > >> + osafassert(argc >= 1 && argv[argc] == NULL); > >> + LOG_NO("Running script %s to scale out %d node(s)", argv[0], argc > - > >> 1); > >> + > >> + if (getrlimit(RLIMIT_NOFILE, &rlim) == 0 && rlim.rlim_cur <= > >> INT_MAX) { > >> + nofile = rlim.rlim_cur; > >> + } else { > >> + LOG_ER("getrlimit(RLIMIT_NOFILE) failed: %s", strerror(errno)); > >> + } > >> + > >> + pid_t child_pid = fork(); > >> + if (child_pid == 0) { > >> + for (int fd = 3; fd < nofile; ++fd) close(fd); > >> + execve(argv[0], argv, env); > >> + _Exit(123); > >> + } else if (child_pid != (pid_t) -1) { > >> + int status; > >> + pid_t wait_pid; > >> + do { > >> + wait_pid = waitpid(child_pid, &status, 0); > >> + } while (wait_pid == (pid_t) -1 && errno == EINTR); > >> + if (wait_pid != (pid_t) -1) { > >> + if (!WIFEXITED(status)) { > >> + LOG_ER("Scale out script %s terminated " > >> + "abnormally", argv[0]); > >> + } else if (WEXITSTATUS(status) != 0) { > >> + if (WEXITSTATUS(status) == 123) { > >> + LOG_ER("Scale out script %s could " > >> + "not be executed", argv[0]); > >> + } else { > >> + LOG_ER("Scale out script %s failed " > >> + "with exit code %d", argv[0], > >> + WEXITSTATUS(status)); > >> + } > >> + } else { > >> + LOG_IN("Scale out script %s exited " > >> + "successfully", argv[0]); > >> + } > >> + } else { > >> + LOG_ER("Scale out script %s failed in waitpid(%u): %s", > >> + argv[0], (unsigned) child_pid, strerror(errno)); > >> + } > >> + } else { > >> + LOG_ER("Scale out script %s failed in fork(): %s", argv[0], > >> + strerror(errno)); > >> + } > >> + TRACE_LEAVE(); > >> +} > >> + > >> +/* > >> + * This function is executed in a separate thread, and will > >> continuously call > >> + * the scale-out script to scale out pending nodes until > >> no_of_pending_nodes > >> + * becomes zero. The thread terminates itself when there are no > more > >> pending > >> + * nodes. > >> + */ > >> +static void *scale_out_thread(void *arg) > >> +{ > >> + CLMS_CB *cb = (CLMS_CB*) arg; > >> + > >> + TRACE_ENTER(); > >> + osafassert(cb->no_of_inprogress_nodes == 0); > >> + for (;;) { > >> + osaf_mutex_lock_ordie(&cb->scale_out_data_mutex); > >> + char *argv[MAX_PENDING_NODES + 2] = { cb->scale_out_script }; > >> + size_t no_of_pending_nodes = cb->no_of_pending_nodes; > >> + osafassert(no_of_pending_nodes <= MAX_PENDING_NODES); > >> + for (size_t i = 0; i != (no_of_pending_nodes + 1); ++i) { > >> + argv[i + 1] = cb->pending_nodes[i]; > >> + cb->inprogress_node_ids[i] = cb->pending_node_ids[i]; > >> + cb->pending_nodes[i] = NULL; > >> + } > >> + cb->no_of_pending_nodes = 0; > >> + cb->no_of_inprogress_nodes = no_of_pending_nodes; > >> + if (no_of_pending_nodes == 0) { > >> + osafassert(cb->is_scale_out_thread_running == true); > >> + cb->is_scale_out_thread_running = false; > >> + } > >> + osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex); > >> + if (no_of_pending_nodes == 0) break; > >> + execute_scale_out_script(no_of_pending_nodes + 1, argv); > >> + for (size_t i = 0; i != no_of_pending_nodes; ++i) { > >> + free(argv[i + 1]); > >> + } > >> + } > >> + LOG_IN("Scale out thread terminating"); > >> + TRACE_LEAVE(); > >> + return NULL; > >> +} > >> + > >> +/* > >> + * Start the scale-out thread which executes the function > >> scale_out_thread() in > >> + * a background thread for as long as there are pending nodes to > >> scale out. This > >> + * function must not be called if the thread is already running. > >> + */ > >> +static void start_scale_out_thread(CLMS_CB *cb) > >> +{ > >> + pthread_attr_t attr; > >> + int result; > >> + > >> + TRACE_ENTER(); > >> + result = pthread_attr_init(&attr); > >> + if (result == 0) { > >> + result = pthread_attr_setdetachstate(&attr, > >> + PTHREAD_CREATE_DETACHED); > >> + if (result != 0) { > >> + LOG_ER("pthread_attr_setdetachstate() failed " > >> + "with return code %d", result); > >> + } > >> + pthread_t thread; > >> + result = pthread_create(&thread, &attr, scale_out_thread, cb); > >> + if (result == 0) { > >> + LOG_IN("Scale out thread started"); > >> + osafassert(cb->is_scale_out_thread_running == false); > >> + cb->is_scale_out_thread_running = true; > >> + } else { > >> + LOG_ER("pthread_create() failed with return code %d", > >> + result); > >> + } > >> + pthread_attr_destroy(&attr); > >> + } else { > >> + LOG_ER("pthread_attr_init() failed with return code %d", > result); > >> + } > >> + TRACE_LEAVE(); > >> +} > >> + > >> +/* > >> + * Add the node given by the @a nodeup_info parameter to the queue > of > >> pending > >> + * nodes to be scaled out, and start the scale-out thread if it > is > >> not already > >> + * running. > >> + */ > >> +static void scale_out_node(CLMS_CB *cb, > >> + const clmsv_clms_node_up_info_t *nodeup_info) > >> +{ > >> + char node_name[SA_MAX_NAME_LENGTH]; > >> + > >> + TRACE_ENTER(); > >> + size_t name_len = nodeup_info->node_name.length < > SA_MAX_NAME_LENGTH > >> ? > >> + nodeup_info->node_name.length : > >> + (SA_MAX_NAME_LENGTH - 1); > >> + memcpy(node_name, nodeup_info->node_name.value, name_len); > >> + node_name[name_len] = '\0'; > >> + > >> + osaf_mutex_lock_ordie(&cb->scale_out_data_mutex); > >> + size_t no_of_pending_nodes = cb->no_of_pending_nodes; > >> + size_t no_of_inprogress_nodes = cb->no_of_inprogress_nodes; > >> + bool queue_the_node = true; > >> + for (size_t i = 0; i != no_of_inprogress_nodes && > queue_the_node; > >> ++i) { > >> + if (nodeup_info->node_id == cb->inprogress_node_ids[i]) { > >> + LOG_IN("Node 0x%" PRIx32 " (%s) is already being " > >> + "scaled out", nodeup_info->node_id, node_name); > >> + queue_the_node = false; > >> + } > >> + } > >> + for (size_t i = 0; i != no_of_pending_nodes && queue_the_node; > ++i) > >> { > >> + if (nodeup_info->node_id == cb->pending_node_ids[i]) { > >> + LOG_IN("Node 0x%" PRIx32 " (%s) is already queued for " > >> + "scaled out", nodeup_info->node_id, node_name); > >> + queue_the_node = false; > >> + } > >> + } > >> + if (no_of_pending_nodes >= MAX_PENDING_NODES && queue_the_node) > { > >> + LOG_ER("Failed to add node 0x%" PRIx32 " (%s): too many " > >> + "pending nodes", nodeup_info->node_id, node_name); > >> + queue_the_node = false; > >> + } > >> + if (queue_the_node) { > >> + char *strp; > >> + if (asprintf(&strp, "%" PRIu32 ",%s", nodeup_info->node_id, > >> + node_name) != -1) { > >> + LOG_NO("Queuing request to scale out node 0x%" PRIx32 > >> + " (%s)", nodeup_info->node_id, node_name); > >> + osafassert(cb->pending_nodes[no_of_pending_nodes] == > >> + NULL); > >> + cb->pending_nodes[no_of_pending_nodes] = strp; > >> + cb->pending_node_ids[no_of_pending_nodes] = > >> + nodeup_info->node_id; > >> + cb->no_of_pending_nodes++; > >> + if (cb->is_scale_out_thread_running == false) { > >> + start_scale_out_thread(cb); > >> + } > >> + } else { > >> + LOG_ER("Failed to add node 0x%" PRIx32 " (%s): " > >> + "asprintf failed", nodeup_info->node_id, > >> + node_name); > >> + } > >> + } > >> + osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex); > >> + TRACE_LEAVE(); > >> +} > >> + > >> /** > >> * Processing Node up mesg from the node_agent utility > >> * > >> @@ -281,7 +510,6 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, > >> > >> node = clms_node_get_by_name(&node_name); > >> if (node == NULL) { > >> - clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST; > >> /* The /etc/opensaf/node_name is an user exposed configuration > >> file. > >> * The node_name file contains the RDN value of the CLM node > name. > >> * (a) When opensaf cluster configuration is pre-provisioned > using > >> the OpenSAF IMM tools: > >> @@ -290,7 +518,13 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, > >> * (b) When opensaf cluster nodes are dynamically added at > >> runtime: > >> * the /etc/opensaf/node_name should contain the rdn > value. > >> */ > >> - LOG_NO("Node '%s' requests to join the cluster but is > >> unconfigured", nodeup_info->node_name.value); > >> + if (cb->scale_out_script != NULL) { > >> + clm_msg.info.api_resp_info.rc = SA_AIS_ERR_TRY_AGAIN; > >> + scale_out_node(cb, nodeup_info); > >> + } else { > >> + clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST; > >> + LOG_NO("Node '%s' requests to join the cluster but is > >> unconfigured", nodeup_info->node_name.value); > >> + } > >> } > >> > >> if (node != NULL) { > >> diff --git a/osaf/services/saf/clmsv/clms/clms_main.c > >> b/osaf/services/saf/clmsv/clms/clms_main.c > >> --- a/osaf/services/saf/clmsv/clms/clms_main.c > >> +++ b/osaf/services/saf/clmsv/clms/clms_main.c > >> @@ -226,6 +226,24 @@ uint32_t clms_cb_init(CLMS_CB * clms_cb) > >> clms_cb->is_impl_set = false; > >> clms_cb->rtu_pending = false; /* Flag to control try-again of > >> rt-updates */ > >> > >> + if (pthread_mutex_init(&clms_cb->scale_out_data_mutex, NULL) != > 0) > >> { > >> + return NCSCC_RC_FAILURE; > >> + } > >> + clms_cb->no_of_pending_nodes = 0; > >> + clms_cb->no_of_inprogress_nodes = 0; > >> + for (int i = 0; i != (MAX_PENDING_NODES + 1); ++i) { > >> + clms_cb->pending_nodes[i] = NULL; > >> + clms_cb->pending_node_ids[i] = 0; > >> + clms_cb->inprogress_node_ids[i] = 0; > >> + } > >> + clms_cb->is_scale_out_thread_running = false; > >> + clms_cb->scale_out_script = NULL; > >> + char *tmp = getenv("CLMSV_SCALE_OUT_SCRIPT"); > >> + if (tmp != NULL) { > >> + clms_cb->scale_out_script = strdup(tmp); > >> + if (clms_cb->scale_out_script == NULL) return NCSCC_RC_FAILURE; > >> + } > >> + > >> /* Assign Version. Currently, hardcoded, This will change later > */ > >> clms_cb->clm_ver.releaseCode = CLM_RELEASE_CODE; > >> clms_cb->clm_ver.majorVersion = CLM_MAJOR_VERSION_4; > >> diff --git a/osaf/services/saf/clmsv/config/clmd.conf > >> b/osaf/services/saf/clmsv/config/clmd.conf > >> --- a/osaf/services/saf/clmsv/config/clmd.conf > >> +++ b/osaf/services/saf/clmsv/config/clmd.conf > >> @@ -10,5 +10,14 @@ > >> # Healthcheck keys > >> export CLMSV_ENV_HEALTHCHECK_KEY="Default" > >> > >> +# Script to be called when a node which is not configured tries > to > >> join the > >> +# cluster. The idea is that this script can check if the node is > >> eligible to be > >> +# added to the cluster, and if so add the necessary IMM objects > so > >> that the node > >> +# will be able to join the next time it tries. To enable this > >> feature, configure > >> +# this variable with the full path to the executable script. The > >> script will be > >> +# called with one or more arguments, and each argument has the > format > >> "X,Y" > >> +# where X is the node id in decimal and Y is the name of the > node. > >> +#export CLMSV_SCALE_OUT_SCRIPT="/usr/local/bin/scale_out_script" > >> + > >> # Uncomment the next line to enable info level logging > >> #args="--loglevel=info" ------------------------------------------------------------------------------ _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel
