In virtual environments nodes can reboot very quickly (less than 1 minute). If the reboot is abrupt, plmd may not be aware that the EE went down until after it has already come back up because plmd relies on the TCP connection to plmcd on the node. In this case, plmd will set the readiness state to OOS after the EE is already back up. This causes CLM to evict the node from the cluster. plmd should use TCP_USER_TIMEOUT to notice that plmcd has exited abruptly.
This enhancement also refactors the threading involved with handling the plm clients, to support a large number of them. --- src/plm/plmcd/plmc.h | 3 + src/plm/plmcd/plmc_lib.c | 94 +-- src/plm/plmcd/plmc_lib_internal.c | 1313 +++++++++++++++---------------------- src/plm/plmcd/plmc_lib_internal.h | 13 +- src/plm/plmcd/plmc_read_config.c | 16 + src/plm/plmcd/plmcd.conf | 8 + src/plm/plmd/plms_plmc.c | 4 + 7 files changed, 618 insertions(+), 833 deletions(-) diff --git a/src/plm/plmcd/plmc.h b/src/plm/plmcd/plmc.h index e02523f..6145f89 100644 --- a/src/plm/plmcd/plmc.h +++ b/src/plm/plmcd/plmc.h @@ -37,6 +37,7 @@ #define KEEPIDLE_TIME 7200 #define KEEPALIVE_INTVL 75 #define KEEPALIVE_PROBES 9 +#define USER_TIMEOUT 5000 /* Tag value and message data lengths. */ #define PLMC_MAX_TAG_LEN 256 @@ -92,6 +93,7 @@ typedef enum { TCP_KEEPIDLE_TIME, TCP_KEEPALIVE_INTVL, TCP_KEEPALIVE_PROBES, + TCP_USER_TIMEOUT_VALUE } PLMC_config_tags; /* This struct holds the contents of the plmcd.conf configuration file. */ @@ -113,6 +115,7 @@ typedef struct { int tcp_keepidle_time; int tcp_keepalive_intvl; int tcp_keepalive_probes; + int tcp_user_timeout; } PLMC_config_data; /* The PLMC daemon command numerical index. */ diff --git a/src/plm/plmcd/plmc_lib.c b/src/plm/plmcd/plmc_lib.c index 5b3f11a..99574ea 100644 --- a/src/plm/plmcd/plmc_lib.c +++ b/src/plm/plmcd/plmc_lib.c @@ -22,6 +22,8 @@ #include <stdio.h> #include <errno.h> #include <string.h> +#include <sys/eventfd.h> +#include "base/logtrace.h" #include "plm/plmcd/plmc_lib_internal.h" #include "plm/plmcd/plmc_cmds.h" @@ -44,8 +46,6 @@ int do_command(char *ee_id, int (*cb)(tcp_msg *), char *cmd, PLMC_cmd_idx cmd_enum) { thread_entry *tentry; - pthread_attr_t client_mgr_attr; - pthread_t plmc_client_mgr_id; tentry = find_thread_entry(ee_id); if (tentry == NULL) { @@ -57,15 +57,6 @@ int do_command(char *ee_id, int (*cb)(tcp_msg *), char *cmd, "lock for a client"); return (PLMC_API_LOCK_FAILED); } - /* Check if there are pending work */ - if (tentry->thread_d.done == 0) { - if (pthread_mutex_unlock(&tentry->thread_d.td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error " - "unlocking a mutex for a client"); - return (PLMC_API_UNLOCK_FAILED); - } - return (PLMC_API_CLIENT_BUSY); - } /* Check if there is valid socket */ if (tentry->thread_d.socketfd == 0) { @@ -80,27 +71,8 @@ int do_command(char *ee_id, int (*cb)(tcp_msg *), char *cmd, strncpy(tentry->thread_d.command, cmd, PLMC_CMD_NAME_MAX_LENGTH); tentry->thread_d.command[PLMC_CMD_NAME_MAX_LENGTH - 1] = '\0'; tentry->thread_d.callback = cb; - tentry->thread_d.done = 0; - - /* Initialize and start the client_mgr_thread */ - pthread_attr_init(&client_mgr_attr); - pthread_attr_setdetachstate(&client_mgr_attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&(plmc_client_mgr_id), &client_mgr_attr, - plmc_client_mgr, (void *)tentry) != 0) { - syslog(LOG_ERR, "plmc_lib: Could not create a " - "new client mgr thread for connection"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_CLOSE_SOCKET, ee_id, cmd_enum); - /* Unlock mutex */ - if (pthread_mutex_unlock(&tentry->thread_d.td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error " - "unlocking when updated " - "thread_id"); - } - return (PLMC_API_FAILURE); - } - /* Update the thread_entry with the thread ID */ - tentry->thread_d.td_id = plmc_client_mgr_id; + + plmc_client_mgr(tentry); /* Unlock */ if (pthread_mutex_unlock(&tentry->thread_d.td_lock) != 0) { @@ -164,23 +136,27 @@ int plmc_initialize(int (*connect_cb)(char *, char *), int (*udp_cb)(udp_msg *), callbacks.udp_cb = udp_cb; callbacks.err_cb = err_cb; - /* Set these threads detached as we don't want to join them */ pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + tcp_listener_stop_fd = eventfd(0, EFD_NONBLOCK); + + if (tcp_listener_stop_fd < 0) { + LOG_ER("eventfd failed: %i", errno); + return PLMC_API_FAILURE; + } + + TRACE("tcp_listener_stop_fd: %i", tcp_listener_stop_fd); /* Create the threads */ if (pthread_create(&(tcp_listener_id), &attr, plmc_tcp_listener, NULL) != 0) { syslog(LOG_ERR, "plmc_lib: Could not create a new thread " "when initializing"); + pthread_attr_destroy(&attr); return (PLMC_API_FAILURE); } - if (pthread_create(&(plmc_connection_mgr_id), &attr, - plmc_connection_mgr, NULL) != 0) { - syslog(LOG_ERR, "plmc_lib: Could not create a new thread " - "when initializing"); - return (PLMC_API_FAILURE); - } + + pthread_attr_destroy(&attr); return (PLMC_API_SUCCESS); } /* CHANGE - move all logic to a single function and call it from @@ -399,18 +375,44 @@ int plmc_plmcd_restart(char *ee_id, int (*cb)(tcp_msg *)) int plmc_destroy() { - int i = 0; - if (pthread_cancel(udp_listener_id) != 0) { - i = 1; + int rc = 0, s = 0; + uint64_t stop = 1; + TRACE_ENTER(); + + s = write(udp_listener_stop_fd, &stop, sizeof(stop)); + + if (s < 0) { + LOG_ER("write failed: %i: stopping udp_listener thread", errno); + rc = 1; + } else { + TRACE("waiting for udp thread to stop"); + rc = pthread_join(udp_listener_id, 0); + if (rc != 0) + LOG_ER("joining udp thread failed: %i", rc); + else + TRACE("udp thread stopped"); } - if (pthread_cancel(tcp_listener_id) != 0) { - i = 1; + + s = write(tcp_listener_stop_fd, &stop, sizeof(stop)); + + if (s < 0) { + LOG_ER("write failed: %i: stopping tcp_listener thread", errno); + rc = 1; + } else { + TRACE("waiting for tcp thread to stop"); + rc = pthread_join(tcp_listener_id, 0); + if (rc != 0) + LOG_ER("joining tcp thread failed: %i", rc); + else + TRACE("tcp thread stopped"); } + #ifdef PLMC_LIB_DEBUG fprintf(plmc_lib_debug, "Closing the debug file\n"); /* Close the debug file */ sleep(2); fclose(plmc_lib_debug); #endif - return i; + TRACE_LEAVE2("rc: %i", rc); + return rc; } diff --git a/src/plm/plmcd/plmc_lib_internal.c b/src/plm/plmcd/plmc_lib_internal.c index 08e26b2..83a845d 100644 --- a/src/plm/plmcd/plmc_lib_internal.c +++ b/src/plm/plmcd/plmc_lib_internal.c @@ -15,16 +15,23 @@ * */ +#include <assert.h> +#include <stdbool.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <ctype.h> +#include <sys/epoll.h> +#include <sys/eventfd.h> +#include <sys/poll.h> #include <sys/socket.h> #include <netinet/in.h> +#include <netinet/tcp.h> #include <arpa/inet.h> #include <stdio.h> #include <string.h> #include <syslog.h> +#include "base/logtrace.h" #include "plm/plmcd/plmc_lib_internal.h" #include "plmc_cmds.h" @@ -36,16 +43,19 @@ static thread_entry *TAIL = NULL; static pthread_mutex_t td_list_lock; /* List-wide lock */ +static int epoll_fd; + FILE *plmc_lib_debug; char *plmc_config_file = 0; PLMC_config_data config; -pthread_t tcp_listener_id = 0, udp_listener_id = 0, plmc_connection_mgr_id = 0; +pthread_t tcp_listener_id = 0, udp_listener_id = 0; +int tcp_listener_stop_fd = -1, udp_listener_stop_fd = -1; cb_functions callbacks; /* returns pointer to thread entry on success, NULL otherwise */ -thread_entry *find_thread_entry(char *ee_id) +thread_entry *find_thread_entry(const char *ee_id) { thread_entry *curr_entry, *retval = NULL; @@ -75,99 +85,49 @@ thread_entry *find_thread_entry(char *ee_id) return (retval); } -/* returns pointer to new thread entry on success, NULL otherwise */ -thread_entry *create_thread_entry(char *ee_id, int sock) +static thread_entry * find_thread_entry_by_socket(int sockfd) { - thread_entry *new_entry = NULL; - thread_data *tdata = NULL; + thread_entry *entry = 0; -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "create_thread_entry was called with ee_id " - "%s\n", - ee_id); - fflush(plmc_lib_debug); -#endif - /* - * Check if the entry exists already. If so, check socket and switch - * kill to one if socket exist. - */ - new_entry = find_thread_entry(ee_id); + TRACE_ENTER2("fd: %i", sockfd); - if (new_entry != NULL) { -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "create_thread_entry: known entry " - "ee_id=%s\n", - ee_id); - fflush(plmc_lib_debug); -#endif - tdata = &(new_entry->thread_d); - /* Get lock, we are changing stuff here */ - if (pthread_mutex_lock(&tdata->td_lock) != 0) { - syslog( - LOG_ERR, - "plmc_lib: create_thread_entry " - "encountered an error getting a lock for a client"); + do { + thread_entry *curr_entry = 0; + + if (pthread_mutex_lock(&td_list_lock) != 0) { + syslog(LOG_ERR, + "error: plmc library could not lock thread " + "list"); + break; } - if (tdata->socketfd != 0) { -/* So there is still a socket open here - * We'll need to set kill to 1 so the connection_mgr - * can deal with him - */ -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "create_thread_entry: " - "socketfd was not zero. This guy needs to be " - "killed. Setting kill to 1. Ee_id=%s\n", - ee_id); - fflush(plmc_lib_debug); -#endif - tdata->kill = 1; - if (pthread_mutex_unlock(&tdata->td_lock) != 0) { - syslog( - LOG_ERR, - "plmc_lib: create_thread_entry " - "encountered an error unlockng a client"); - } - return NULL; - } else { -/* So the socket is zero. This means that the - * connection_mgr or client manager already took care - * of the socket and client_mgr has exited - */ -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "create_thread_entry: socketfd " - "was already zero. Resetting and reusing. " - "Ee_id=%s\n", - ee_id); - fflush(plmc_lib_debug); -#endif - strncpy(tdata->ee_id, ee_id, PLMC_EE_ID_MAX_LENGTH); - tdata->ee_id[PLMC_EE_ID_MAX_LENGTH - 1] = '\0'; - tdata->socketfd = sock; - tdata->command[0] = '\0'; - tdata->callback = NULL; - tdata->done = 1; - tdata->td_id = 0; - tdata->kill = 0; + + curr_entry = HEAD; + while (curr_entry) { + if (curr_entry->thread_d.socketfd == sockfd) { + entry = curr_entry; + break; + } else + curr_entry = curr_entry->next; } - if (pthread_mutex_unlock(&tdata->td_lock) != 0) { + + if (pthread_mutex_unlock(&td_list_lock) != 0) { syslog(LOG_ERR, - "plmc_lib: create thread_entry " - "encountered an error unlockng a client"); + "error: plmc library could not unlock thread " + "list"); + break; } - return new_entry; - } + } while (false); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "create_thread_entry: This is a new entry " - "ee_id=%s\n", - ee_id); - fflush(plmc_lib_debug); -#endif + TRACE_LEAVE2("ee_id: %s", entry ? entry->thread_d.ee_id : "none"); + return entry; +} + +/* returns pointer to new thread entry on success, NULL otherwise */ +static thread_entry *create_thread_entry(const char *ee_id, int sock) +{ + thread_entry *new_entry = NULL; + + TRACE_ENTER2("ee_id: %s fd: %i", ee_id, sock); new_entry = calloc(1, sizeof(thread_entry)); if (new_entry == NULL) { @@ -186,9 +146,6 @@ thread_entry *create_thread_entry(char *ee_id, int sock) new_entry->thread_d.socketfd = sock; new_entry->thread_d.command[0] = '\0'; new_entry->thread_d.callback = NULL; - new_entry->thread_d.done = 1; - new_entry->thread_d.td_id = 0; - new_entry->thread_d.kill = 0; } /*** Begin Critical Section ***/ @@ -223,14 +180,18 @@ thread_entry *create_thread_entry(char *ee_id, int sock) return (NULL); } /*** End Critical Section ***/ + + TRACE_LEAVE(); return (new_entry); } /* returns 0 on success, 1 on error */ -int delete_thread_entry(char *ee_id) +static int delete_thread_entry(const char *ee_id) { thread_entry *entry_to_delete; + TRACE_ENTER2("ee_id: %s", ee_id); + /* Make sure we have this entry in the list. */ entry_to_delete = find_thread_entry(ee_id); if (entry_to_delete == NULL) { @@ -278,25 +239,26 @@ int delete_thread_entry(char *ee_id) pthread_mutex_destroy(&(entry_to_delete->thread_d.td_lock)); free(entry_to_delete); + + TRACE_LEAVE(); + return (0); } /* delete all thread entries */ -void delete_all_thread_entries() +void delete_all_thread_entries(void) { thread_entry *curr_entry, *next_entry = NULL; int sockfd; char result[PLMC_MAX_TCP_DATA_LEN]; -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "delete_all_thread_entries was called\n"); - fflush(plmc_lib_debug); -#endif + TRACE_ENTER(); /*** Begin Critical Section ***/ if (pthread_mutex_lock(&td_list_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: delete_thread_entry encountered an " + LOG_ER("plmc_lib: delete_thread_entry encountered an " "error locking the list"); + TRACE_LEAVE(); return; } @@ -305,22 +267,11 @@ void delete_all_thread_entries() sockfd = curr_entry->thread_d.socketfd; next_entry = curr_entry->next; pthread_mutex_destroy(&(curr_entry->thread_d.td_lock)); - if (curr_entry->thread_d.td_id != 0) { - pthread_cancel(curr_entry->thread_d.td_id); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "delete_all_thread_entries " - "killed a client_mgr thread\n"); - fflush(plmc_lib_debug); -#endif - } + if (sockfd != 0 && recv(sockfd, result, PLMC_MAX_TCP_DATA_LEN, MSG_PEEK | MSG_DONTWAIT) != 0) { close(sockfd); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "delete_all_thread_entries " - "closed socket\n"); - fflush(plmc_lib_debug); -#endif + TRACE("closed socket"); } free(curr_entry); @@ -330,11 +281,13 @@ void delete_all_thread_entries() HEAD = NULL; if (pthread_mutex_unlock(&td_list_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: delete_thread_entry encountered an " + LOG_ER("plmc_lib: delete_thread_entry encountered an " "error unlocking the list"); + TRACE_LEAVE(); return; } /*** End Critical Section ***/ + TRACE_LEAVE(); } static char *PLMC_action_msg[] = {"Action not defined", "Closing client socket", @@ -494,105 +447,6 @@ int parse_udp(udp_msg *parsed, char *incoming) return 0; } -/*************************************************************** - * This is a thread cleanup function for the listerner - * This guy will close the main socket and then call - * pthread_cancel on the plmc_connection_mgr thread - * - * Parameters: - * arg - a pointer to the socket file descriptor - * - * Returns: - * This function doesn't return anything - ***************************************************************/ - -void tcp_listener_cleaner(void *arg) -{ - int socket; - socket = *((int *)arg); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "tcp_listener_cleaner was called\n"); - fflush(plmc_lib_debug); -#endif - - /* Close socket so no more connection can happen */ - close_socket(socket); - if (pthread_cancel(plmc_connection_mgr_id) != 0) { - syslog(LOG_ERR, "plmc_lib: tcp_listener_cleaner " - "encountered an error canceling a thread"); - } -} - -/*************************************************************** - * This is a thread cleanup function for the connection - * mgr. This guy will just call the - * delete_all_thread_entries, - * which in turn will call pthreat_cancel on all threads - * - * Parameters: - * arg - NULL - * - * Returns: - * This function doesn't return anything - ***************************************************************/ - -void connection_mgr_cleaner(void *arg) -{ -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "connection_mgr_cleaner was called\n"); - fflush(plmc_lib_debug); -#endif - - /* Delete all entries */ - delete_all_thread_entries(); -} - -/************************************************************* - * This is a thread cleanup function for the client_mgrs - * This guy will just close the socket - * - * Parameters: - * arg - a pointer to the socket file descriptor - * - * Returns: - * This function doesn't return anything - **************************************************************/ - -void client_mgr_cleaner(void *arg) -{ - int socket; - socket = *((int *)arg); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "client_mgr_cleaner was called\n"); - fflush(plmc_lib_debug); -#endif - /* Close socket so no more connections can happen */ - close_socket(socket); -} - -/************************************************************* - * This is a thread cleanup function for the udp_listener - * This guy will just close the socket - * - * Parameters: - * arg - a pointer to the socket file descriptor - * - * Returns: - * This function doesn't return anything - **************************************************************/ - -void udp_listener_cleaner(void *arg) -{ - int socket; - socket = *((int *)arg); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "udp_listener_cleaner was called\n"); - fflush(plmc_lib_debug); -#endif - /* Close socket so no more connection can happen */ - close(socket); -} - /********************************************************************* * This function takes care of extracting the essential from * the incoming TCP Message @@ -608,11 +462,11 @@ void udp_listener_cleaner(void *arg) * This function return 0 if successful and a 1 if unsuccessful *********************************************************************/ -int parse_tcp(tcp_msg *parsed, char *incoming) +static int parse_tcp(tcp_msg *parsed, const char *incoming) { - char *tmpstring; + const char *tmpstring = incoming; int i; - tmpstring = incoming; + #ifdef PLMC_LIB_DEBUG fprintf(plmc_lib_debug, "parse_tcp_msg was called with: %s\n", incoming); @@ -679,170 +533,75 @@ int parse_tcp(tcp_msg *parsed, char *incoming) return 0; } -/********************************************************************** - * plmc_connection_mgr - This is the thread that is responsible for - * monitoring all the connections - * This guy will periodically wakeup, walk through the list - * and ping all the plmc clients. If they are not anymore available - * plmc_connection_mgr will delete the entry and kill off any thread - * that is talking to them - * - * Parameters: - * No parameters - * - * Returns: - * This function returns a *void - ***********************************************************************/ -void *plmc_connection_mgr(void *arguments) +static int handle_plmc_response(int fd, const char *recv_mesg) { - thread_entry *tentry; - thread_data *tdata; - char result[PLMC_MAX_TCP_DATA_LEN]; - pthread_t plmc_client_mgr_id; - PLMC_cmd_idx cmd_enum; - int sockfd, o_state, kill; + int result = 0; - pthread_cleanup_push(connection_mgr_cleaner, NULL); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_connection_mgr started up\n"); - fflush(plmc_lib_debug); -#endif - while (1) { - /* Start by sleeping 5 seconds */ - sleep(5); /* Perhaps a definition in a header file */ - tentry = HEAD; - - while (tentry != NULL) { - /* Do not allow to be canceled */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, - &o_state); - - /* Lock to get the socket */ - if (pthread_mutex_lock(&tentry->thread_d.td_lock) != - 0) { - syslog(LOG_ERR, - "plmc_lib: " - "plmc_connection_mgr encountered an " - "error getting a lock for a client"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_IGNORING, NULL, - PLMC_NOOP_CMD); - break; - } - tdata = &(tentry->thread_d); - sockfd = tdata->socketfd; - plmc_client_mgr_id = tdata->td_id; - kill = tdata->kill; - if (tdata->done == 0) { - cmd_enum = - plmc_cmd_string_to_enum(tdata->command); - if (cmd_enum < 0) - cmd_enum = PLMC_NOOP_CMD; - } else { - cmd_enum = PLMC_NOOP_CMD; - } + do { + tcp_msg tcpmsg; + thread_entry *tentry = 0; - if (sockfd != 0 && - recv(sockfd, result, PLMC_MAX_TCP_DATA_LEN, - MSG_PEEK | MSG_DONTWAIT) == 0) { - kill = 1; #ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_connection_mgr: " - "Discovered a client that didn't " - "answer. Setting kill = 1. EE_ID is " - "%s\n", - tdata->ee_id); - fflush(plmc_lib_debug); + fprintf(plmc_lib_debug, + "plmc_tcp_listener got response back: " + "%s\n", + recv_mesg); + fflush(plmc_lib_debug); #endif - } - /* Check if kill is 1. If it is it means we potentially - * need to kill the client and close the socket - */ - if (kill == 1) { -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_connection_mgr: " - "Kill is set to 1. Cleaning up " - "for EE_ID %s\n", - tdata->ee_id); - fflush(plmc_lib_debug); -#endif - /* - * If the client_mgr thread is running, - * kill him - */ - if (plmc_client_mgr_id != 0) { -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_connection_mgr: Client " - "mgr is running. We will need " - "to kill him. EE_ID is %s\n", - tdata->ee_id); - fflush(plmc_lib_debug); -#endif - if (pthread_cancel( - plmc_client_mgr_id) != 0) { - syslog(LOG_ERR, - "plmc_lib: " - "encountered an error " - "trying to cancel a " - "client mgr thread"); - } - } - if (sockfd != 0) { - send_error(PLMC_LIBERR_LOST_CONNECTION, - PLMC_LIBACT_CLOSE_SOCKET, - tdata->ee_id, cmd_enum); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_connection_mgr: We are " - "closing the socket. EE_ID is " - "%s\n", - tdata->ee_id); - fflush(plmc_lib_debug); -#endif - /* Now, close the socket */ - close_socket(sockfd); - if ((callbacks.connect_cb( - tdata->ee_id, - PLMC_DISCONN_CB_MSG)) != 0) { - syslog(LOG_ERR, - "plmc_lib: " - "encountered an error " - "during the disconnect " - "callback"); - } - } - tdata->socketfd = 0; - tdata->td_id = 0; - tdata->kill = 0; - tdata->done = 1; + if (parse_tcp(&tcpmsg, recv_mesg) != 0) { + syslog(LOG_ERR, "plmc_lib: Invalid TCP message " + "during GET_EE_ID"); + send_error(PLMC_LIBERR_MSG_INVALID, + PLMC_LIBACT_CLOSE_SOCKET, NULL, + PLMC_GET_ID_CMD); + result = -1; + break; + } + + tentry = find_thread_entry(tcpmsg.ee_id); + + if (!tentry) { + tentry = create_thread_entry(tcpmsg.ee_id, fd); + + if (!tentry) { + syslog(LOG_ERR, + "unable to create thread entry for EE: " + "%s", + tcpmsg.ee_id); + assert(false); } + } - /* - * Ok, time to unlock - */ - if (pthread_mutex_unlock(&tdata->td_lock) != 0) { + /* Send callback */ + if (tentry->thread_d.callback) { + if (tentry->thread_d.callback(&tcpmsg) != 0) { + PLMC_cmd_idx cmd_enum = + plmc_cmd_string_to_enum( + tentry->thread_d.command); syslog(LOG_ERR, - "plmc_lib: encountered an " - "error unlocking a client mutex"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_UNDEFINED, tdata->ee_id, - cmd_enum); + "plmc_lib: encountered an error during " + "the result callback call to PLMSv"); + send_error(PLMC_LIBERR_ERR_CB, + PLMC_LIBACT_IGNORING, + tentry->thread_d.ee_id, + cmd_enum); } - /* Allow to be canceled again */ - pthread_setcancelstate(o_state, &o_state); - tentry = tentry->next; } - } -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_connection_mgr: I am exiting\n"); - fflush(plmc_lib_debug); -#endif - pthread_cleanup_pop(0); - pthread_exit((void *)NULL); + + if (tcpmsg.cmd_enum == PLMC_GET_ID_CMD) { + if ((callbacks.connect_cb(tcpmsg.ee_id, + PLMC_CONN_CB_MSG)) != 0) { + send_error(PLMC_LIBERR_ERR_CB, + PLMC_LIBACT_IGNORING, + tcpmsg.ee_id, + PLMC_GET_ID_CMD); + result = -1; + } + } + } while (false); + + return result; } /********************************************************************** @@ -861,62 +620,20 @@ void *plmc_connection_mgr(void *arguments) * This function returns a *void ***********************************************************************/ -void *plmc_client_mgr(void *arguments) +void plmc_client_mgr(thread_entry *tentry) { - thread_entry *tentry; - thread_data *tdata; - char command[PLMC_CMD_NAME_MAX_LENGTH]; - char result[PLMC_MAX_TCP_DATA_LEN]; - char ee_id[PLMC_EE_ID_MAX_LENGTH]; - PLMC_cmd_idx cmd_enum; - int sockfd, retval, bytes_received; - struct timeval tv; - int (*callback)(tcp_msg *); - tentry = (thread_entry *)arguments; - tdata = &(tentry->thread_d); - command[0] = '\0'; - - /* Adding 10 seconds to the plmc_cmd_timeout_secs in the - * plmcd.conf file - */ - tv.tv_sec = config.plmc_cmd_timeout_secs + 10; - tv.tv_usec = 0; - tcp_msg tcpmsg; + char command[PLMC_CMD_NAME_MAX_LENGTH] = { 0 }; + char ee_id[PLMC_EE_ID_MAX_LENGTH] = { 0 }; + int sockfd, retval; + thread_data *tdata = &(tentry->thread_d); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_client_mgr started up\n"); - fflush(plmc_lib_debug); -#endif + do { + sockfd = tentry->thread_d.socketfd; + strncpy(ee_id, tdata->ee_id, PLMC_EE_ID_MAX_LENGTH); - /* Lock to get the socket */ - if (pthread_mutex_lock(&tdata->td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: plmc_client_mgr encountered an " - "error getting a lock for a client"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_EXIT_THREAD, NULL, PLMC_NOOP_CMD); - pthread_exit((void *)NULL); - } - sockfd = tentry->thread_d.socketfd; - strncpy(ee_id, tdata->ee_id, PLMC_EE_ID_MAX_LENGTH); - ee_id[PLMC_EE_ID_MAX_LENGTH - 1] = '\0'; - - /* - * There is a new command, copy it locally and return the - * lock - */ - strncpy(command, tdata->command, PLMC_CMD_NAME_MAX_LENGTH); - command[PLMC_CMD_NAME_MAX_LENGTH - 1] = '\0'; - cmd_enum = plmc_cmd_string_to_enum(tdata->command); - /* Get the callback as well */ - callback = tdata->callback; - - /* Unlock */ - if (pthread_mutex_unlock(&tdata->td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an " - "error unlocking a client mutex"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_UNDEFINED, - ee_id, cmd_enum); - } + /* There is a new command, copy it locally */ + strncpy(command, tdata->command, PLMC_CMD_NAME_MAX_LENGTH); + command[PLMC_CMD_NAME_MAX_LENGTH - 1] = '\0'; #ifdef PLMC_LIB_DEBUG fprintf(plmc_lib_debug, @@ -926,140 +643,26 @@ void *plmc_client_mgr(void *arguments) fflush(plmc_lib_debug); #endif - /* - * Change the socket option so that it waits for tv.tv_sec seconds - * for a respons - */ - if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, - sizeof tv) < 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error during " - "a call to setsockopt() from client_mgr"); - syslog(LOG_ERR, "plmc_lib: errnor is %d", errno); - syslog(LOG_ERR, "plmc_lib: Closing socket and exiting " - "client_mgr"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_EXIT_THREAD, ee_id, cmd_enum); - if (close_socket(sockfd) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error " - "during a call to close_socket"); - } - pthread_exit((void *)NULL); - } - #ifdef PLMC_LIB_DEBUG fprintf(plmc_lib_debug, "plmc_client_mgr: Sending command to client\n"); fflush(plmc_lib_debug); #endif - /* Send the command to PLMc */ - retval = send(sockfd, command, strlen(command), 0); - if (retval <= 0) { - syslog(LOG_ERR, "plmc_lib: Client Mgr encountered an error " + /* Send the command to PLMc */ + retval = send(sockfd, command, strlen(command), 0); + if (retval <= 0) { + syslog(LOG_ERR, + "plmc_lib: Client Mgr encountered an error " "during a call to send(). Leaving cleanup for " "connection mgr"); - syslog(LOG_ERR, "plmc_lib: errnor is %d", errno); - /* Connection MGR will send error to plmc_lib */ - pthread_exit((void *)NULL); - } - - /* Get the response */ - bytes_received = recv(sockfd, result, PLMC_MAX_TCP_DATA_LEN, 0); - if (bytes_received <= 0) { - /* - * The client didn't come back in time. Can't - * hang around waiting for this guy. Close this - * socket. He'll just have to reconnect - */ - syslog(LOG_ERR, "plmc_lib: Client Mgr encountered an " - "error during a call to recv(). Leaving " - "cleanup for connection mgr"); - /* Send a timeout error */ - send_error(PLMC_LIBERR_TIMEOUT, PLMC_LIBACT_CLOSE_SOCKET, ee_id, - cmd_enum); - /* Lock to change the socket */ - if (pthread_mutex_lock(&tdata->td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: client_mgr encountered an " - "error getting a lock for a client"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_EXIT_THREAD, ee_id, cmd_enum); - pthread_exit((void *)NULL); - } - if (close_socket(sockfd) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error " - "during a call to close_socket"); - } - /* - * Set socketfd to zero to indicate to connection_mgr - * that we are gone - */ - tentry->thread_d.socketfd = 0; - tdata->td_id = 0; - tdata->kill = 1; - if (pthread_mutex_unlock(&tdata->td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: plmc_client_mgr encountered " - "an error unlocking a client mutex"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_UNDEFINED, ee_id, cmd_enum); + syslog(LOG_ERR, "plmc_lib: errno is %d", errno); } - pthread_exit((void *)NULL); - } - result[bytes_received] = '\0'; -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_client_mgr: Got command back %s\n", - result); - fflush(plmc_lib_debug); -#endif - - if (parse_tcp(&tcpmsg, result) != 0) { - syslog(LOG_ERR, - "plmc_lib: TCP message " - "invalid %s", - result); - send_error(PLMC_LIBERR_MSG_INVALID, PLMC_LIBACT_IGNORING, ee_id, - cmd_enum); - pthread_exit((void *)NULL); - } - - /* Get lock */ - if (pthread_mutex_lock(&tdata->td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: client_mgr encountered an error " - "getting a lock for a client"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_UNDEFINED, - ee_id, cmd_enum); - if (close_socket(sockfd) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error during " - "a call to close_socket"); - } - pthread_exit((void *)NULL); - } - - /* Tell the world we are done with this guy */ - tdata->done = 1; - tdata->td_id = 0; /* We are exiting */ - - /* Unlock */ - if (pthread_mutex_unlock(&tdata->td_lock) != 0) { - syslog(LOG_ERR, "plmc_lib: plmc_client_mgr encountered an " - "error unlocking a client mutex"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_UNDEFINED, - ee_id, cmd_enum); - } - - /* Send callback */ - if ((callback(&tcpmsg)) != 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error during the " - "result callback call to PLMSv"); - send_error(PLMC_LIBERR_ERR_CB, PLMC_LIBACT_IGNORING, ee_id, - cmd_enum); - pthread_exit((void *)NULL); - } + } while (false); #ifdef PLMC_LIB_DEBUG fprintf(plmc_lib_debug, "plmc_client_mgr: Done with command, " "exiting\n"); fflush(plmc_lib_debug); #endif - - pthread_exit((void *)NULL); } /********************************************************************** @@ -1076,18 +679,15 @@ void *plmc_client_mgr(void *arguments) void *plmc_udp_listener(void *arguments) { - int sockfd, msg_length; + int sockfd, true_value = true; struct sockaddr_in servaddr, cliaddr; struct in_addr inp; - socklen_t addr_length; - char mesg[PLMC_MAX_UDP_DATA_LEN]; - udp_msg udpmsg; char *match_ip; + bool done = false; + struct pollfd fds[2]; + + TRACE_ENTER(); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_udp_listener started\n"); - fflush(plmc_lib_debug); -#endif /* Get the socket */ if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { syslog(LOG_ERR, "plmc_lib: encountered an error opening a " @@ -1103,12 +703,27 @@ void *plmc_udp_listener(void *arguments) pthread_exit((void *)NULL); } - /* - * Here we need to register a cleanup function that will be called in - * case the UDP listener gets canceled by the library. This is to close - * the socket. - */ - pthread_cleanup_push(udp_listener_cleaner, (void *)&sockfd); + if (setsockopt(sockfd, + SOL_SOCKET, + SO_REUSEADDR, + &true_value, + sizeof(int)) < 0) { + syslog(LOG_ERR, "plmc_lib: encountered an error during a " + "call to setsockopt()"); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_IGNORING, + NULL, PLMC_NOOP_CMD); + } + + if (setsockopt(sockfd, + SOL_SOCKET, + SO_REUSEPORT, + &true_value, + sizeof(int)) < 0) { + syslog(LOG_ERR, "plmc_lib: encountered an error during a " + "call to setsockopt() REUSEPORT"); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_IGNORING, + NULL, PLMC_NOOP_CMD); + } match_ip = plmc_get_listening_ip_addr(plmc_config_file); if (strlen(match_ip) == 0) { @@ -1141,46 +756,197 @@ void *plmc_udp_listener(void *arguments) pthread_exit((void *)NULL); } - /* - We are now entering the while loop. We will hang out here till somebody - cancels us - */ - while (1) { - addr_length = sizeof(cliaddr); - msg_length = - recvfrom(sockfd, mesg, PLMC_MAX_UDP_DATA_LEN, 0, - (struct sockaddr *)&cliaddr, &addr_length); - mesg[msg_length] = '\0'; + fds[0].fd = sockfd; + fds[0].events = POLLIN; + fds[0].revents = 0; + + fds[1].fd = udp_listener_stop_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; + + while (!done) { + int res = poll(fds, sizeof(fds) / sizeof(struct pollfd), -1); + + if (res < 0) { + if (errno == EINTR) + continue; + else + LOG_ER("poll failed: %i", errno); + } + + if (fds[0].revents & POLLIN) { + char mesg[PLMC_MAX_UDP_DATA_LEN]; + udp_msg udpmsg; + socklen_t addr_length = sizeof(cliaddr); + int msg_length = recvfrom(sockfd, + mesg, + PLMC_MAX_UDP_DATA_LEN, + 0, + (struct sockaddr *)&cliaddr, + &addr_length); + mesg[msg_length] = '\0'; #ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_udp_listener got a new message: " - "%s\n", - mesg); - fflush(plmc_lib_debug); + fprintf(plmc_lib_debug, + "plmc_udp_listener got a new message: " + "%s\n", + mesg); + fflush(plmc_lib_debug); #endif - /* Get the struct setup */ - if (parse_udp(&udpmsg, mesg) != 0) { - syslog(LOG_ERR, "plmc_lib: UDP message invalid %s", - mesg); - send_error(PLMC_LIBERR_MSG_INVALID, - PLMC_LIBACT_IGNORING, NULL, PLMC_NOOP_CMD); - continue; + /* Get the struct setup */ + if (parse_udp(&udpmsg, mesg) != 0) { + syslog(LOG_ERR, + "plmc_lib: UDP message invalid %s", + mesg); + send_error(PLMC_LIBERR_MSG_INVALID, + PLMC_LIBACT_IGNORING, + NULL, + PLMC_NOOP_CMD); + continue; + } + if ((callbacks.udp_cb(&udpmsg)) != 0) { + syslog(LOG_ERR, + "plmc_lib: encountered an error during " + "the UDP callback call to PLMSv"); + send_error(PLMC_LIBERR_ERR_CB, + PLMC_LIBACT_IGNORING, + udpmsg.ee_id, + PLMC_NOOP_CMD); + continue; + } } - if ((callbacks.udp_cb(&udpmsg)) != 0) { - syslog(LOG_ERR, - "plmc_lib: encountered an error " - "during the UDP callback call to PLMSv"); - send_error(PLMC_LIBERR_ERR_CB, PLMC_LIBACT_IGNORING, - udpmsg.ee_id, PLMC_NOOP_CMD); - continue; + + if (fds[1].revents & POLLIN) { + TRACE("udp_listener thread received term request"); + + if (close(sockfd) < 0) { + LOG_ER("closing sockfd in udp_listener thread " + "failed: %i", errno); + } + + if (close(udp_listener_stop_fd) < 0) { + LOG_ER("closing stop fd in udp_listener thread " + "failed: %i", errno); + } + + done = true; } } - /* - Remove the cleanup for the connected socket - Normally the code will never get here. - */ - pthread_cleanup_pop(0); + + TRACE_LEAVE(); + return 0; +} + +static void accept_new_connection(int listen_fd) +{ + do { + struct epoll_event ev = { 0 }; + struct sockaddr_in cliaddr; + unsigned int sin_size = sizeof(struct sockaddr_in); + int optlen, retval; + + int conn_fd = accept4(listen_fd, + (struct sockaddr *)&cliaddr, + &sin_size, + SOCK_NONBLOCK); + + if (conn_fd < 0) { + syslog(LOG_ERR, "plmc_lib: encoutered a problem in " + "the accept() call"); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, + PLMC_LIBACT_IGNORING, NULL, PLMC_NOOP_CMD); + break; + } + + if (config.so_keepalive) { + /* Set SO_KEEPALIVE */ + optlen = sizeof(config.so_keepalive); + if (setsockopt(conn_fd, SOL_SOCKET, SO_KEEPALIVE, + &config.so_keepalive, optlen) < 0) { + syslog(LOG_ERR, + "setsockopt SO_KEEPALIVE failed error:%s", + strerror(errno)); + break; + } + + /* Set TCP_KEEPIDLE */ + optlen = sizeof(config.tcp_keepidle_time); + if (setsockopt(conn_fd, SOL_TCP, TCP_KEEPIDLE, + &config.tcp_keepidle_time, optlen) < 0) { + syslog(LOG_ERR, + "setsockopt TCP_KEEPIDLE " + "failed " + "error:%s", + strerror(errno)); + break; + } + + /* Set TCP_KEEPINTVL */ + optlen = sizeof(config.tcp_keepalive_intvl); + if (setsockopt(conn_fd, SOL_TCP, TCP_KEEPINTVL, + &config.tcp_keepalive_intvl, optlen) < 0) { + syslog(LOG_ERR, + "setsockopt TCP_KEEPINTVL failed" + " error:%s", + strerror(errno)); + break; + } + + /* Set TCP_KEEPCNT */ + optlen = sizeof(config.tcp_keepalive_probes); + if (setsockopt(conn_fd, SOL_TCP, TCP_KEEPCNT, + &config.tcp_keepalive_probes, optlen) < 0) { + syslog(LOG_ERR, + "setsockopt TCP_KEEPCNT failed" + " error:%s", + strerror(errno)); + break; + } + + optlen = sizeof(config.tcp_user_timeout); + if (setsockopt(conn_fd, + IPPROTO_TCP, + TCP_USER_TIMEOUT, + &config.tcp_user_timeout, + optlen) < 0) { + syslog(LOG_ERR, + "setting tcp user timeout " + "failed: %s", strerror(errno)); + break; + } + } + + /* Send a request for the ee_id to the client */ + retval = send(conn_fd, PLMC_cmd_name[PLMC_GET_ID_CMD], + strlen(PLMC_cmd_name[PLMC_GET_ID_CMD]), 0); + if (retval < 0) { + syslog(LOG_ERR, "plmc_lib: encountered an error " + "during a call to send(): %d", errno); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, + PLMC_LIBACT_CLOSE_SOCKET, NULL, + PLMC_GET_ID_CMD); + + if (close_socket(conn_fd) != 0) { + syslog(LOG_ERR, + "plmc_lib: encountered an " + "error during a call to close_socket"); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, + PLMC_LIBACT_CLOSE_SOCKET, NULL, + PLMC_GET_ID_CMD); + } + + break; + } + + /* add the socket descriptor to the polling set */ + ev.events = EPOLLIN | EPOLLRDHUP; + ev.data.fd = conn_fd; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) < 0) { + syslog(LOG_ERR, "epoll_ctl failed: %d", errno); + assert(false); + } + } while (false); } /********************************************************************** @@ -1199,28 +965,16 @@ void *plmc_udp_listener(void *arguments) void *plmc_tcp_listener(void *arguments) { - int sockfd, connected, true = 1, bytes_received; - unsigned int sin_size; - struct sockaddr_in servaddr, cliaddr; - struct timeval tv; + int sockfd, true_value = 1; + struct sockaddr_in servaddr; struct in_addr inp; - char recv_mesg[PLMC_MAX_TCP_DATA_LEN], *match_ip; + struct epoll_event ev = { 0 }; + char *match_ip; + bool done = false; pthread_attr_t udp_attr; - thread_entry *tentry; - int retval = 0; - tcp_msg tcpmsg; -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_tcp_listener started\n"); - fflush(plmc_lib_debug); -#endif - - /* - Set the timeout value for recieve messages from clients - */ - tv.tv_sec = 2; - tv.tv_usec = 0; + TRACE_ENTER(); /* Get the socket */ if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { @@ -1231,15 +985,8 @@ void *plmc_tcp_listener(void *arguments) pthread_exit((void *)NULL); } - /* - Here we need to register a cleanup function that will be called in case - the listener gets canceled by the library. This is to close the socket - and to cancel all children - */ - pthread_cleanup_push(tcp_listener_cleaner, (void *)&sockfd); - /* Set some socket options, not sure we need this */ - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &true, sizeof(int)) < + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &true_value, sizeof(int)) < 0) { syslog(LOG_ERR, "plmc_lib: encountered an error during a " "call to setsockopt()"); @@ -1247,6 +994,14 @@ void *plmc_tcp_listener(void *arguments) NULL, PLMC_NOOP_CMD); } + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &true_value, sizeof(int)) < + 0) { + syslog(LOG_ERR, "plmc_lib: encountered an error during a " + "call to setsockopt() REUSEPORT"); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_IGNORING, + NULL, PLMC_NOOP_CMD); + } + /* Set up the IP stuff */ match_ip = plmc_get_listening_ip_addr(plmc_config_file); if (strlen(match_ip) == 0) { @@ -1277,7 +1032,7 @@ void *plmc_tcp_listener(void *arguments) } /* Spec max # queued up plmc connections */ - if (listen(sockfd, 5) == -1) { + if (listen(sockfd, SOMAXCONN) < 0) { syslog(LOG_ERR, "plmc_lib: encountered an error during a call " "to listen()"); syslog(LOG_ERR, "plmc_lib: errnor is %d", errno); @@ -1287,192 +1042,194 @@ void *plmc_tcp_listener(void *arguments) pthread_exit((void *)NULL); } - sin_size = sizeof(struct sockaddr_in); - - /* Set threads detached as we don't want to join them */ - pthread_attr_init(&udp_attr); - pthread_attr_setdetachstate(&udp_attr, PTHREAD_CREATE_DETACHED); - #ifdef PLMC_LIB_DEBUG fprintf(plmc_lib_debug, "plmc_tcp_listener starting the UDP thread\n"); fflush(plmc_lib_debug); #endif + udp_listener_stop_fd = eventfd(0, EFD_NONBLOCK); + + if (udp_listener_stop_fd < 0) { + LOG_ER("eventfd failed: %i for udp_listener thread", errno); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, + PLMC_LIBACT_DESTROY_LIBRARY, NULL, PLMC_NOOP_CMD); + close(sockfd); + pthread_exit(0); + } + /* Create the UDP thread */ + pthread_attr_init(&udp_attr); + if (pthread_create(&(udp_listener_id), &udp_attr, plmc_udp_listener, NULL) != 0) { syslog(LOG_ERR, "plmc_lib: Could not create the UDP thread"); send_error(PLMC_LIBERR_SYSTEM_RESOURCES, PLMC_LIBACT_DESTROY_LIBRARY, NULL, PLMC_NOOP_CMD); close(sockfd); + close(udp_listener_stop_fd); + pthread_attr_destroy(&udp_attr); pthread_exit((void *)NULL); } - /* - * This is the while loop that will take in new connections and - * start client_mgr threads that deal with those connections. - */ - while (1) { -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_tcp_listener entering the while " - "loop\n"); - fflush(plmc_lib_debug); -#endif - connected = - accept(sockfd, (struct sockaddr *)&cliaddr, &sin_size); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_tcp_listener got a new " - "connection\n"); - fflush(plmc_lib_debug); -#endif - if (connected < 0) { - syslog(LOG_ERR, "plmc_lib: encoutered a problem in " - "the accept() call"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_IGNORING, NULL, PLMC_NOOP_CMD); - continue; - } + pthread_attr_destroy(&udp_attr); - /* - * Push another cleanup function as we have opened a new - * socket (connected) and have not yet handed it over to a child - * thread (client_mgr). If we get canceled here we need to close - * that socket. - */ - pthread_cleanup_push(client_mgr_cleaner, (void *)&connected); - - /* - Set the timeout SO_RCVTIMEO for the new socket - This so a client can't hang the listener for too long - */ - if (setsockopt(connected, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, - sizeof tv) < 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error during " - "a call to setsockopt()"); - syslog(LOG_ERR, "plmc_lib: errnor is %d", errno); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_CLOSE_SOCKET, NULL, - PLMC_NOOP_CMD); - if (close_socket(connected) != 0) { - syslog(LOG_ERR, - "plmc_lib: encountered an " - "error during a call to close_socket"); - } - continue; - } + epoll_fd = epoll_create1(EPOLL_CLOEXEC); -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, "plmc_tcp_listener sending a request " - "for the ee_id\n"); - fflush(plmc_lib_debug); -#endif - /* Send a request for the ee_id to the client */ - retval = send(connected, PLMC_cmd_name[PLMC_GET_ID_CMD], - strlen(PLMC_cmd_name[PLMC_GET_ID_CMD]), 0); - if (retval < 0) { - syslog(LOG_ERR, "plmc_lib: encountered an error " - "during a call to send()"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_CLOSE_SOCKET, NULL, - PLMC_GET_ID_CMD); - if (close_socket(connected) != 0) { - syslog(LOG_ERR, - "plmc_lib: encountered an " - "error during a call to close_socket"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_CLOSE_SOCKET, NULL, - PLMC_GET_ID_CMD); - } - continue; - } + if (epoll_fd < 0) { + syslog(LOG_ERR, "epoll_create1 failed: %d", errno); + send_error(PLMC_LIBERR_SYSTEM_RESOURCES, + PLMC_LIBACT_DESTROY_LIBRARY, NULL, PLMC_NOOP_CMD); + close(sockfd); + close(udp_listener_stop_fd); + pthread_exit(0); + } - /* Now wait for the response for tv time */ - bytes_received = - recv(connected, recv_mesg, PLMC_MAX_TCP_DATA_LEN, 0); - if (bytes_received < 0) { - /* The client didn't come back in time. Can't hang - around waiting for this guy. Close this socket. - He'll just have to reconnect */ - syslog(LOG_ERR, "plmc_lib: The client is taking too " - "long to respond, closing socket"); - send_error(PLMC_LIBERR_TIMEOUT, - PLMC_LIBACT_CLOSE_SOCKET, NULL, - PLMC_GET_ID_CMD); - if (close_socket(connected) != 0) { - syslog(LOG_ERR, - "plmc_lib: encountered an " - "error during a call to close_socket"); - send_error(PLMC_LIBERR_SYSTEM_RESOURCES, - PLMC_LIBACT_CLOSE_SOCKET, NULL, - PLMC_GET_ID_CMD); - } - continue; - } + /* add the listen fd */ + ev.events = EPOLLIN; + ev.data.fd = sockfd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev) < 0) { + syslog(LOG_ERR, "epoll_ctl failed: %d", errno); + assert(false); + } - recv_mesg[bytes_received] = '\0'; + /* add the term fd */ + ev.events = EPOLLIN; + ev.data.fd = tcp_listener_stop_fd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, tcp_listener_stop_fd, &ev) < 0) { + syslog(LOG_ERR, "epoll_ctl failed: %d", errno); + assert(false); + } -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_tcp_listener got response back: " - "%s\n", - recv_mesg); - fflush(plmc_lib_debug); -#endif - if (parse_tcp(&tcpmsg, recv_mesg) != 0) { - syslog(LOG_ERR, "plmc_lib: Invalid TCP message " - "during GET_EE_ID"); - send_error(PLMC_LIBERR_MSG_INVALID, - PLMC_LIBACT_CLOSE_SOCKET, NULL, - PLMC_GET_ID_CMD); - if (close_socket(connected) != 0) { - syslog(LOG_ERR, - "plmc_lib: encountered an " - "error during a call to close_socket"); - } - continue; + while (!done) { + struct epoll_event events[128]; + int poll_ret; + do { + poll_ret = epoll_wait(epoll_fd, + &events[0], + sizeof(events) / + sizeof(events[0]), + -1); + } while (poll_ret < 0 && errno == EINTR); + + /* Check to see if the poll call failed. */ + if (poll_ret < 0) { + syslog(LOG_ERR, "epoll_wait() failed: %d", errno); + break; } -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_tcp_listener creating thread " - "entry with ee_id %s and socket %d\n", - tcpmsg.ee_id, connected); - fflush(plmc_lib_debug); -#endif + for (int i = 0; i < poll_ret; ++i) { + int close_conn = 0; + + if (events[i].data.fd == sockfd) { + accept_new_connection(sockfd); + } else if (events[i].data.fd == tcp_listener_stop_fd) { + TRACE("tcp_listener thread received term " + "request"); + delete_all_thread_entries(); + if (close(sockfd) < 0) { + LOG_ER("error closing " + "tcp_listener_thread socket: " + "%i", errno); + } - /* Create the thread_entry - This is thread safe */ - tentry = create_thread_entry(tcpmsg.ee_id, connected); - if (tentry == NULL) { -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_tcp_listener found an " - "entry that has to be closed, " - "closing socket on current client\n"); - fflush(plmc_lib_debug); -#endif - close_socket(connected); - } else { -#ifdef PLMC_LIB_DEBUG - fprintf(plmc_lib_debug, - "plmc_tcp_listener didn't " - "find an entry that has to be closed. " - "We can reuse this entry\n"); - fflush(plmc_lib_debug); -#endif - if ((callbacks.connect_cb(tcpmsg.ee_id, - PLMC_CONN_CB_MSG)) != 0) { - send_error(PLMC_LIBERR_ERR_CB, - PLMC_LIBACT_IGNORING, tcpmsg.ee_id, - PLMC_GET_ID_CMD); + if (close(epoll_fd) < 0) { + LOG_ER("error closing epoll socket: %i", + errno); + } + + if (close(tcp_listener_stop_fd) < 0) { + LOG_ER("error closing " + "tcp_listener_stop_fd socket: " + "%i", + errno); + } + + done = true; + break; + } else { + if (events[i].events & EPOLLIN) { + char recv_mesg[PLMC_MAX_TCP_DATA_LEN] = + { 0 }; + + int bytes_received = + recv(events[i].data.fd, + recv_mesg, + PLMC_MAX_TCP_DATA_LEN, + 0); + + if (bytes_received < 0) { + TRACE("recv failed: %d for fd: " + "%i", + errno, + events[i].data.fd); + send_error(PLMC_LIBERR_TIMEOUT, + PLMC_LIBACT_CLOSE_SOCKET, + NULL, + PLMC_GET_ID_CMD); + close_conn = 1; + } else if (bytes_received == 0) { + TRACE("client closed " + "connection fd: %i", + events[i].data.fd); + close_conn = 1; + } + + if (!close_conn) { + handle_plmc_response( + events[i].data.fd, + recv_mesg); + } + } + + if (events[i].events & (EPOLLHUP | + EPOLLRDHUP | + EPOLLERR)) { + TRACE("HUP for fd: %i", + events[i].data.fd); + close_conn = 1; + } + + if (close_conn) { + thread_entry *entry = + find_thread_entry_by_socket( + events[i].data.fd); + + if (!entry) { + /* if client never responded to + * GET_ID we could get here */ + TRACE("unable to find thread " + "entry for socket: %d", + events[i].data.fd); + } + + close_conn = 0; + + if (close_socket(events[i].data.fd) != 0) { + syslog(LOG_ERR, + "closing socket failed: " + "%d", + errno); + } + + if (entry) { + if (callbacks.connect_cb( + entry->thread_d.ee_id, + PLMC_DISCONN_CB_MSG) != + 0) { + send_error( + PLMC_LIBERR_SYSTEM_RESOURCES, + PLMC_LIBACT_CLOSE_SOCKET, + entry->thread_d.ee_id, + PLMC_GET_ID_CMD); + } + + delete_thread_entry( + entry->thread_d.ee_id); + } + } } } - - /* - * Remove the cleanup for the connected socket as we don't care - * about that anymore. This socket is now handled by the - * client_mgr and this guy will close it if it is canceled - */ - pthread_cleanup_pop(0); } - pthread_cleanup_pop(1); - pthread_exit((void *)NULL); + + TRACE_LEAVE(); + return 0; } diff --git a/src/plm/plmcd/plmc_lib_internal.h b/src/plm/plmcd/plmc_lib_internal.h index f798513..fd5c778 100644 --- a/src/plm/plmcd/plmc_lib_internal.h +++ b/src/plm/plmcd/plmc_lib_internal.h @@ -33,7 +33,8 @@ typedef struct cb_functions_struct { extern char *plmc_config_file; extern PLMC_config_data config; -extern pthread_t tcp_listener_id, udp_listener_id, plmc_connection_mgr_id; +extern pthread_t tcp_listener_id, udp_listener_id; +extern int tcp_listener_stop_fd, udp_listener_stop_fd; /******************************************************************** * This struct is used for the data entry that a client_mgr thread @@ -42,13 +43,10 @@ extern pthread_t tcp_listener_id, udp_listener_id, plmc_connection_mgr_id; ********************************************************************/ typedef struct thread_data_struct { pthread_mutex_t td_lock; /* single thread lock */ - pthread_t td_id; char ee_id[PLMC_EE_ID_MAX_LENGTH]; char command[PLMC_CMD_NAME_MAX_LENGTH]; int (*callback)(tcp_msg *); int socketfd; - int done; - int kill; } thread_data; typedef struct thread_entry_struct thread_entry; @@ -63,12 +61,9 @@ struct thread_entry_struct { thread_entry *previous; }; -thread_entry *find_thread_entry(char *ee_id); -thread_entry *create_thread_entry(char *ee_id, int sock); -int delete_thread_entry(char *ee_id); +thread_entry *find_thread_entry(const char *ee_id); void delete_all_thread_entries(); void *plmc_tcp_listener(void *arguments); -void *plmc_connection_mgr(void *arguments); -void *plmc_client_mgr(void *arguments); +void plmc_client_mgr(thread_entry *); int send_error(int error, int action, char *ee_id, PLMC_cmd_idx cmd_enum); #endif // PLM_PLMCD_PLMC_LIB_INTERNAL_H_ diff --git a/src/plm/plmcd/plmc_read_config.c b/src/plm/plmcd/plmc_read_config.c index 5641b6f..8476676 100644 --- a/src/plm/plmcd/plmc_read_config.c +++ b/src/plm/plmcd/plmc_read_config.c @@ -85,6 +85,7 @@ int plmc_read_config(char *plmc_config_file, PLMC_config_data *config) config->tcp_keepidle_time = KEEPIDLE_TIME; config->tcp_keepalive_intvl = KEEPALIVE_INTVL; config->tcp_keepalive_probes = KEEPALIVE_PROBES; + config->tcp_user_timeout = USER_TIMEOUT; /* * Set timeout to a negative value so we can detect if the value @@ -161,6 +162,8 @@ int plmc_read_config(char *plmc_config_file, PLMC_config_data *config) tag = TCP_KEEPALIVE_INTVL; if (strcmp(line, "[tcp_keepalive_probes]") == 0) tag = TCP_KEEPALIVE_PROBES; + if (strcmp(line, "[tcp_user_timeout]") == 0) + tag = TCP_USER_TIMEOUT_VALUE; } else { /* Set the value of the tag in config data structure. */ switch (tag) { @@ -322,6 +325,16 @@ int plmc_read_config(char *plmc_config_file, PLMC_config_data *config) } tag = 0; break; + case TCP_USER_TIMEOUT_VALUE: + config->tcp_user_timeout = atoi(line); + if (config->tcp_user_timeout < 1) { + syslog(LOG_ERR, + "tcp_user_timeout " + "must be a positive integer"); + return -1; + } + tag = 0; + break; default: break; } @@ -450,5 +463,8 @@ void plmc_print_config(PLMC_config_data *config) printf(" [tcp_keepalive_probes]\n"); printf(" %d\n", config->tcp_keepalive_probes); + printf(" [tcp_user_timeout]\n"); + printf(" %d\n", config->tcp_user_timeout); + printf("\n"); } diff --git a/src/plm/plmcd/plmcd.conf b/src/plm/plmcd/plmcd.conf index a09394f..01dd668 100644 --- a/src/plm/plmcd/plmcd.conf +++ b/src/plm/plmcd/plmcd.conf @@ -165,3 +165,11 @@ # Optional [tcp_keepalive_probes] 2 + +# +# tcp_user_timeout: The maximum amount of time in milliseconds that transmitted +# data may remain unacknowledged before TCP will forcibly close the +# corresponding connection and return ETIMEDOUT to the application +# Optional +[tcp_user_timeout] +5000 diff --git a/src/plm/plmd/plms_plmc.c b/src/plm/plmd/plms_plmc.c index 0667099..06c8d4b 100644 --- a/src/plm/plmd/plms_plmc.c +++ b/src/plm/plmd/plms_plmc.c @@ -293,6 +293,10 @@ SaUint32T plms_plmc_terminated_process(PLMS_ENTITY *ent) if (plms_rdness_flag_is_set(ent, SA_PLM_RF_MANAGEMENT_LOST)) plms_ee_terminated_mngt_flag_clear(ent); + plms_readiness_state_set(ent, SA_PLM_READINESS_OUT_OF_SERVICE, + NULL, SA_NTF_OBJECT_OPERATION, + SA_PLM_NTFID_STATE_CHANGE_ROOT); + TRACE_LEAVE2("Return Val: %d", ret_err); return ret_err; } -- 2.9.5 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel