Hi Anders, This is a good hook to have for the scaling out use case but is there a reason why you put this as an configuration variable in clmd.conf instead of as a configuration attribute in IMM? Is it to handle scale out events before IMM has started?
/ Johan -----Original Message----- From: Anders Widell [mailto:[email protected]] Sent: den 28 september 2015 21:37 To: [email protected] Cc: [email protected] Subject: [devel] [PATCH 1 of 2] clmd: Add support for scale-out by calling a custom script to configure new nodes [#1453] osaf/services/saf/clmsv/README | 17 ++ osaf/services/saf/clmsv/clms/clms_cb.h | 31 ++++ osaf/services/saf/clmsv/clms/clms_evt.c | 238 ++++++++++++++++++++++++++++++- osaf/services/saf/clmsv/clms/clms_main.c | 18 ++ osaf/services/saf/clmsv/config/clmd.conf | 9 + 5 files changed, 311 insertions(+), 2 deletions(-) The scale-out feature makes it possible to run a custom script when a node which is not configured in IMM tries to join the cluster. The intention is that the custom script will check if the new node is eligible to be added to the cluster, and if so add the necessary IMM objects so that the node will be able to join the next time it tries. The script will be called with one or more arguments, where each argument represents a node which requests to be added to the cluster. Each argument has the format "X,Y" where X is the node id in decimal and Y is the name of the node. To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT variable in clmd.conf to point to the executable script which shall be executed when new nodes try to join the cluster. diff --git a/osaf/services/saf/clmsv/README b/osaf/services/saf/clmsv/README --- a/osaf/services/saf/clmsv/README +++ b/osaf/services/saf/clmsv/README @@ -54,6 +54,23 @@ SA_CLM_AF_INET(i.e. for IPv4) and SA_CLM The CLMA is the clm agent library that user applications shall link with. + +SCALE OUT + +The scale-out feature makes it possible to run a custom script when a +node which is not configured in IMM tries to join the cluster. The +intention is that the custom script will check if the new node is +eligible to be added to the cluster, and if so add the necessary IMM +objects so that the node will be able to join the next time it tries. +The script will be called with one or more arguments, where each +argument represents a node which requests to be added to the cluster. +Each argument has the format "X,Y" where X is the node id in decimal and Y is the name of the node. + +To enable the scale-out feature in CLM, set the CLMSV_SCALE_OUT_SCRIPT +variable in clmd.conf to point to the executable script which shall be +executed when new nodes try to join the cluster. + + DEPENDENCIES CLMSv depends on the following services: diff --git a/osaf/services/saf/clmsv/clms/clms_cb.h b/osaf/services/saf/clmsv/clms/clms_cb.h --- a/osaf/services/saf/clmsv/clms/clms_cb.h +++ b/osaf/services/saf/clmsv/clms/clms_cb.h @@ -18,10 +18,20 @@ #define CLMS_CB_H #include <stdbool.h> +#include <pthread.h> #define IMPLEMENTER_NAME "safClmService" #define CLMS_HA_INIT_STATE 0 +/* The maximum number of nodes that can be queued for scale-out while the + scale-out script is executing */ +#define MAX_PENDING_NODES 32 + +/* The value to put in the PATH environment variable when calling the + scale-out script */ +#define SCALE_OUT_PATH_ENV SBINDIR ":" BINDIR ":/usr/local/sbin:" \ + "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" + typedef enum clms_tmr_type_t { CLMS_TMR_BASE, CLMS_CLIENT_RESP_TMR = CLMS_TMR_BASE, @@ -204,6 +214,27 @@ typedef struct clms_cb_t { bool is_impl_set; bool nid_started; /**< true if started by NID */ NCS_PATRICIA_TREE iplist; /* To temporarily store ipaddress information recieved in MDS NODE_UP */ + + /* Mutex protecting shared data used by the scale-out functionality */ + pthread_mutex_t scale_out_data_mutex; + /* Number of occupied indices in the vectors pending_nodes[] and + * pending_node_ids[] */ + size_t no_of_pending_nodes; + /* Number of occupied indices in the vector inprogress_node_ids[] */ + size_t no_of_inprogress_nodes; + /* Names of the nodes to be added in the next run of the scale-out + * script */ + char *pending_nodes[MAX_PENDING_NODES + 1]; + /* Node ids of the nodes to be added in the next run of the the + * scale-out script */ + SaClmNodeIdT pending_node_ids[MAX_PENDING_NODES + 1]; + /* Node ids of the nodes that are being added by the currently executing + * instance of the scale-out script */ + SaClmNodeIdT inprogress_node_ids[MAX_PENDING_NODES + 1]; + /* True if the scale-out thread is currently running */ + bool is_scale_out_thread_running; + /* Full path to the scale-out script */ + char *scale_out_script; } CLMS_CB; typedef struct clms_lock_tmr_t { diff --git a/osaf/services/saf/clmsv/clms/clms_evt.c b/osaf/services/saf/clmsv/clms/clms_evt.c --- a/osaf/services/saf/clmsv/clms/clms_evt.c +++ b/osaf/services/saf/clmsv/clms/clms_evt.c @@ -15,9 +15,25 @@ * */ +#define _GNU_SOURCE #include <configmake.h> #include "clms.h" +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <errno.h> +#include <pthread.h> +#include <inttypes.h> +#include <unistd.h> +#include <limits.h> +#include <sys/time.h> +#include <sys/resource.h> +#include <sys/types.h> +#include <sys/wait.h> +#include "logtrace.h" +#include "ncsgl_defs.h" +#include "osaf_utility.h" static uint32_t process_api_evt(CLMSV_CLMS_EVT * evt); static uint32_t proc_clma_updn_mds_msg(CLMSV_CLMS_EVT * evt); @@ -26,6 +42,11 @@ static uint32_t proc_rda_evt(CLMSV_CLMS_ static uint32_t proc_mds_quiesced_ack_msg(CLMSV_CLMS_EVT * evt); static uint32_t proc_node_lock_tmr_exp_msg(CLMSV_CLMS_EVT * evt); static uint32_t proc_node_up_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); +static void execute_scale_out_script(int argc, char *argv[]); static +void *scale_out_thread(void *arg); static void +start_scale_out_thread(CLMS_CB *cb); static void scale_out_node(CLMS_CB +*cb, + const clmsv_clms_node_up_info_t *nodeup_info); static uint32_t proc_initialize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); static uint32_t proc_finalize_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); static uint32_t proc_track_start_msg(CLMS_CB * cb, CLMSV_CLMS_EVT * evt); @@ -64,6 +85,8 @@ static const CLMSV_CLMS_CLMA_API_MSG_HAN proc_node_up_msg }; +static char scale_out_path_env[] = "PATH=" SCALE_OUT_PATH_ENV; + /** * Clear any pending clma_down records or node_down list * @@ -251,6 +274,212 @@ CLMS_CLIENT_INFO *clms_client_new(MDS_DE return client; } +/* + * Execute the scale-out script with the parameters specified in @a +argc, and + * @a argv (analogous to the parameters taken by the main() function). +The first + * index in argv must be the full path to the executable script file. +The rest + * of the parameters shall specify the nodes to scale out. The format +of each + * parameter must be two strings separated by a comma character, where +the first + * string is the node id of a node to be scaled out (given as a decimal +number), + * and the second string is the node name. This function blocks until +the script + * has exited. + */ +static void execute_scale_out_script(int argc, char *argv[]) { + struct rlimit rlim; + int nofile = 1024; + char *const env[] = { scale_out_path_env, NULL }; + + TRACE_ENTER(); + osafassert(argc >= 1 && argv[argc] == NULL); + LOG_NO("Running script %s to scale out %d node(s)", argv[0], argc - +1); + + if (getrlimit(RLIMIT_NOFILE, &rlim) == 0 && rlim.rlim_cur <= INT_MAX) { + nofile = rlim.rlim_cur; + } else { + LOG_ER("getrlimit(RLIMIT_NOFILE) failed: %s", strerror(errno)); + } + + pid_t child_pid = fork(); + if (child_pid == 0) { + for (int fd = 3; fd < nofile; ++fd) close(fd); + execve(argv[0], argv, env); + _Exit(123); + } else if (child_pid != (pid_t) -1) { + int status; + pid_t wait_pid; + do { + wait_pid = waitpid(child_pid, &status, 0); + } while (wait_pid == (pid_t) -1 && errno == EINTR); + if (wait_pid != (pid_t) -1) { + if (!WIFEXITED(status)) { + LOG_ER("Scale out script %s terminated " + "abnormally", argv[0]); + } else if (WEXITSTATUS(status) != 0) { + if (WEXITSTATUS(status) == 123) { + LOG_ER("Scale out script %s could " + "not be executed", argv[0]); + } else { + LOG_ER("Scale out script %s failed " + "with exit code %d", argv[0], + WEXITSTATUS(status)); + } + } else { + LOG_IN("Scale out script %s exited " + "successfully", argv[0]); + } + } else { + LOG_ER("Scale out script %s failed in waitpid(%u): %s", + argv[0], (unsigned) child_pid, strerror(errno)); + } + } else { + LOG_ER("Scale out script %s failed in fork(): %s", argv[0], + strerror(errno)); + } + TRACE_LEAVE(); +} + +/* + * This function is executed in a separate thread, and will +continuously call + * the scale-out script to scale out pending nodes until +no_of_pending_nodes + * becomes zero. The thread terminates itself when there are no more +pending + * nodes. + */ +static void *scale_out_thread(void *arg) { + CLMS_CB *cb = (CLMS_CB*) arg; + + TRACE_ENTER(); + osafassert(cb->no_of_inprogress_nodes == 0); + for (;;) { + osaf_mutex_lock_ordie(&cb->scale_out_data_mutex); + char *argv[MAX_PENDING_NODES + 2] = { cb->scale_out_script }; + size_t no_of_pending_nodes = cb->no_of_pending_nodes; + osafassert(no_of_pending_nodes <= MAX_PENDING_NODES); + for (size_t i = 0; i != (no_of_pending_nodes + 1); ++i) { + argv[i + 1] = cb->pending_nodes[i]; + cb->inprogress_node_ids[i] = cb->pending_node_ids[i]; + cb->pending_nodes[i] = NULL; + } + cb->no_of_pending_nodes = 0; + cb->no_of_inprogress_nodes = no_of_pending_nodes; + if (no_of_pending_nodes == 0) { + osafassert(cb->is_scale_out_thread_running == true); + cb->is_scale_out_thread_running = false; + } + osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex); + if (no_of_pending_nodes == 0) break; + execute_scale_out_script(no_of_pending_nodes + 1, argv); + for (size_t i = 0; i != no_of_pending_nodes; ++i) { + free(argv[i + 1]); + } + } + LOG_IN("Scale out thread terminating"); + TRACE_LEAVE(); + return NULL; +} + +/* + * Start the scale-out thread which executes the function +scale_out_thread() in + * a background thread for as long as there are pending nodes to scale +out. This + * function must not be called if the thread is already running. + */ +static void start_scale_out_thread(CLMS_CB *cb) { + pthread_attr_t attr; + int result; + + TRACE_ENTER(); + result = pthread_attr_init(&attr); + if (result == 0) { + result = pthread_attr_setdetachstate(&attr, + PTHREAD_CREATE_DETACHED); + if (result != 0) { + LOG_ER("pthread_attr_setdetachstate() failed " + "with return code %d", result); + } + pthread_t thread; + result = pthread_create(&thread, &attr, scale_out_thread, cb); + if (result == 0) { + LOG_IN("Scale out thread started"); + osafassert(cb->is_scale_out_thread_running == false); + cb->is_scale_out_thread_running = true; + } else { + LOG_ER("pthread_create() failed with return code %d", + result); + } + pthread_attr_destroy(&attr); + } else { + LOG_ER("pthread_attr_init() failed with return code %d", result); + } + TRACE_LEAVE(); +} + +/* + * Add the node given by the @a nodeup_info parameter to the queue of +pending + * nodes to be scaled out, and start the scale-out thread if it is not +already + * running. + */ +static void scale_out_node(CLMS_CB *cb, + const clmsv_clms_node_up_info_t *nodeup_info) { + char node_name[SA_MAX_NAME_LENGTH]; + + TRACE_ENTER(); + size_t name_len = nodeup_info->node_name.length < SA_MAX_NAME_LENGTH ? + nodeup_info->node_name.length : + (SA_MAX_NAME_LENGTH - 1); + memcpy(node_name, nodeup_info->node_name.value, name_len); + node_name[name_len] = '\0'; + + osaf_mutex_lock_ordie(&cb->scale_out_data_mutex); + size_t no_of_pending_nodes = cb->no_of_pending_nodes; + size_t no_of_inprogress_nodes = cb->no_of_inprogress_nodes; + bool queue_the_node = true; + for (size_t i = 0; i != no_of_inprogress_nodes && queue_the_node; ++i) { + if (nodeup_info->node_id == cb->inprogress_node_ids[i]) { + LOG_IN("Node 0x%" PRIx32 " (%s) is already being " + "scaled out", nodeup_info->node_id, node_name); + queue_the_node = false; + } + } + for (size_t i = 0; i != no_of_pending_nodes && queue_the_node; ++i) { + if (nodeup_info->node_id == cb->pending_node_ids[i]) { + LOG_IN("Node 0x%" PRIx32 " (%s) is already queued for " + "scaled out", nodeup_info->node_id, node_name); + queue_the_node = false; + } + } + if (no_of_pending_nodes >= MAX_PENDING_NODES && queue_the_node) { + LOG_ER("Failed to add node 0x%" PRIx32 " (%s): too many " + "pending nodes", nodeup_info->node_id, node_name); + queue_the_node = false; + } + if (queue_the_node) { + char *strp; + if (asprintf(&strp, "%" PRIu32 ",%s", nodeup_info->node_id, + node_name) != -1) { + LOG_NO("Queuing request to scale out node 0x%" PRIx32 + " (%s)", nodeup_info->node_id, node_name); + osafassert(cb->pending_nodes[no_of_pending_nodes] == + NULL); + cb->pending_nodes[no_of_pending_nodes] = strp; + cb->pending_node_ids[no_of_pending_nodes] = + nodeup_info->node_id; + cb->no_of_pending_nodes++; + if (cb->is_scale_out_thread_running == false) { + start_scale_out_thread(cb); + } + } else { + LOG_ER("Failed to add node 0x%" PRIx32 " (%s): " + "asprintf failed", nodeup_info->node_id, + node_name); + } + } + osaf_mutex_unlock_ordie(&cb->scale_out_data_mutex); + TRACE_LEAVE(); +} + /** * Processing Node up mesg from the node_agent utility * @@ -281,7 +510,6 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, node = clms_node_get_by_name(&node_name); if (node == NULL) { - clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST; /* The /etc/opensaf/node_name is an user exposed configuration file. * The node_name file contains the RDN value of the CLM node name. * (a) When opensaf cluster configuration is pre-provisioned using the OpenSAF IMM tools: @@ -290,7 +518,13 @@ uint32_t proc_node_up_msg(CLMS_CB * cb, * (b) When opensaf cluster nodes are dynamically added at runtime: * the /etc/opensaf/node_name should contain the rdn value. */ - LOG_NO("Node '%s' requests to join the cluster but is unconfigured", nodeup_info->node_name.value); + if (cb->scale_out_script != NULL) { + clm_msg.info.api_resp_info.rc = SA_AIS_ERR_TRY_AGAIN; + scale_out_node(cb, nodeup_info); + } else { + clm_msg.info.api_resp_info.rc = SA_AIS_ERR_NOT_EXIST; + LOG_NO("Node '%s' requests to join the cluster but is unconfigured", nodeup_info->node_name.value); + } } if (node != NULL) { diff --git a/osaf/services/saf/clmsv/clms/clms_main.c b/osaf/services/saf/clmsv/clms/clms_main.c --- a/osaf/services/saf/clmsv/clms/clms_main.c +++ b/osaf/services/saf/clmsv/clms/clms_main.c @@ -226,6 +226,24 @@ uint32_t clms_cb_init(CLMS_CB * clms_cb) clms_cb->is_impl_set = false; clms_cb->rtu_pending = false; /* Flag to control try-again of rt-updates */ + if (pthread_mutex_init(&clms_cb->scale_out_data_mutex, NULL) != 0) { + return NCSCC_RC_FAILURE; + } + clms_cb->no_of_pending_nodes = 0; + clms_cb->no_of_inprogress_nodes = 0; + for (int i = 0; i != (MAX_PENDING_NODES + 1); ++i) { + clms_cb->pending_nodes[i] = NULL; + clms_cb->pending_node_ids[i] = 0; + clms_cb->inprogress_node_ids[i] = 0; + } + clms_cb->is_scale_out_thread_running = false; + clms_cb->scale_out_script = NULL; + char *tmp = getenv("CLMSV_SCALE_OUT_SCRIPT"); + if (tmp != NULL) { + clms_cb->scale_out_script = strdup(tmp); + if (clms_cb->scale_out_script == NULL) return NCSCC_RC_FAILURE; + } + /* Assign Version. Currently, hardcoded, This will change later */ clms_cb->clm_ver.releaseCode = CLM_RELEASE_CODE; clms_cb->clm_ver.majorVersion = CLM_MAJOR_VERSION_4; diff --git a/osaf/services/saf/clmsv/config/clmd.conf b/osaf/services/saf/clmsv/config/clmd.conf --- a/osaf/services/saf/clmsv/config/clmd.conf +++ b/osaf/services/saf/clmsv/config/clmd.conf @@ -10,5 +10,14 @@ # Healthcheck keys export CLMSV_ENV_HEALTHCHECK_KEY="Default" +# Script to be called when a node which is not configured tries to join +the # cluster. The idea is that this script can check if the node is +eligible to be # added to the cluster, and if so add the necessary IMM +objects so that the node # will be able to join the next time it tries. +To enable this feature, configure # this variable with the full path to +the executable script. The script will be # called with one or more arguments, and each argument has the format "X,Y" +# where X is the node id in decimal and Y is the name of the node. +#export CLMSV_SCALE_OUT_SCRIPT="/usr/local/bin/scale_out_script" + # Uncomment the next line to enable info level logging #args="--loglevel=info" ------------------------------------------------------------------------------ _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel ------------------------------------------------------------------------------ _______________________________________________ Opensaf-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/opensaf-devel
