Move consumerd ownership to thread_manage_consumer to scope the lifetime on the consumerd to its manager thread.
"thread_manage_consumer" is responsible for signaling and waiting the termination of its consumerd. All thread_manage_consumer threads now wait on a unique quit pipe different from the global thread quit pipe. This allow control over its lifetime. The termination notification is sent during sessiond_cleanup after the destroy session command to ensure that no session are still active at the moment the consumerds are terminated. Signed-off-by: Jonathan Rajotte <[email protected]> --- src/bin/lttng-sessiond/main.c | 174 +++++++++++++++++++++++++++--------------- 1 file changed, 112 insertions(+), 62 deletions(-) diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index a840e8de..fb58ab4b 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -206,6 +206,7 @@ static int kernel_poll_pipe[2] = { -1, -1 }; static int thread_quit_pipe[2] = { -1, -1 }; static int thread_health_teardown_trigger_pipe[2] = { -1, -1 }; static int thread_apps_teardown_trigger_pipe[2] = { -1, -1 }; +static int thread_consumers_teardown_trigger_pipe[2] = { -1, -1 }; int thread_apps_notify_teardown_trigger_pipe[2] = { -1, -1 }; /* @@ -495,6 +496,11 @@ static int init_thread_apps_notify_teardown_trigger_pipe(void) return __init_thread_quit_pipe(thread_apps_notify_teardown_trigger_pipe); } +static int init_thread_consumers_teardown_trigger_pipe(void) +{ + return __init_thread_quit_pipe(thread_consumers_teardown_trigger_pipe); +} + /* * Stop first wave threads by closing the thread quit pipe. * - kernel thread @@ -601,14 +607,15 @@ static int generate_lock_file_path(char *path, size_t len) /* * Wait on consumer process termination. * - * Need to be called with the consumer data lock held or from a context - * ensuring no concurrent access to data (e.g: cleanup). + * Need to be called with the consumer data lock held. */ static void wait_consumer(struct consumer_data *consumer_data) { pid_t ret; int status; + assert(consumer_data); + if (consumer_data->pid <= 0) { return; } @@ -626,6 +633,52 @@ static void wait_consumer(struct consumer_data *consumer_data) } /* + * Signal to the consumer process to terminate. + * + * Need to be called with the consumer data lock held. + */ +static void kill_consumer(struct consumer_data *consumer_data) +{ + int ret; + + assert(consumer_data); + + /* Consumer pid must be a real one. */ + if (consumer_data->pid <= 0) { + goto end; + } + + ret = kill(consumer_data->pid, SIGTERM); + if (ret) { + PERROR("Error killing consumer daemon"); + goto end; + } +end: + return; +} + +static int join_thread_consumer(struct consumer_data *consumer_data) +{ + int ret; + void *status; + + assert(consumer_data); + + /* Consumer pid must be a real one. */ + if (consumer_data->pid <= 0) { + ret = 0; + goto end; + } + + ret = pthread_join(consumer_data->thread, &status); + if (ret) { + ERR("Joining consumer thread pid %d", consumer_data->pid); + } +end: + return ret; +} + +/* * Cleanup the session daemon's data structures. */ static void sessiond_cleanup(void) @@ -707,7 +760,6 @@ static void sessiond_cleanup(void) (void) rmdir(path); DBG("Cleaning up all sessions"); - /* Destroy session list mutex */ if (session_list_ptr != NULL) { pthread_mutex_destroy(&session_list_ptr->lock); @@ -719,9 +771,35 @@ static void sessiond_cleanup(void) } } - wait_consumer(&kconsumer_data); - wait_consumer(&ustconsumer64_data); - wait_consumer(&ustconsumer32_data); + /* + * Delay the termination of manage_consumer_thread threads to allow + * proper metadata flushing, following the session destroy. Use a + * barrier to ensure that all call_rcu are executed at this point. + */ + DBG("Teardown consurmer thread"); + rcu_barrier(); + ret = notify_thread_pipe(thread_consumers_teardown_trigger_pipe[1]); + if (ret < 0) { + ERR("write error on thread consumer quit pipe"); + } + + ret = join_thread_consumer(&kconsumer_data); + if (ret) { + errno = ret; + PERROR("join_consumer kernel"); + } + + ret = join_thread_consumer(&ustconsumer32_data); + if (ret) { + errno = ret; + PERROR("join_consumer ust32"); + } + + ret = join_thread_consumer(&ustconsumer64_data); + if (ret) { + errno = ret; + PERROR("join_consumer ust64"); + } DBG("Cleaning up all agent apps"); agent_app_ht_clean(); @@ -1289,14 +1367,20 @@ static void *thread_manage_consumer(void *data) health_code_update(); /* - * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the + * Pass 3 as size here for the thread consumer quit pipe, consumerd_err_sock and the * metadata_sock. Nothing more will be added to this poll set. */ - ret = sessiond_set_thread_pollset(&events, 3); + ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC); if (ret < 0) { goto error_poll; } + /* Add quit pipe */ + ret = lttng_poll_add(&events, thread_consumers_teardown_trigger_pipe[0], LPOLLIN | LPOLLERR); + if (ret < 0) { + goto error; + } + /* * The error socket here is already in a listening state which was done * just before spawning this thread to avoid a race between the consumer @@ -1344,7 +1428,7 @@ restart: } /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); + ret = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0; if (ret) { err = 0; goto exit; @@ -1509,7 +1593,7 @@ restart_poll: * but continue the current loop to handle potential data from * consumer. */ - should_quit = sessiond_check_thread_quit_pipe(pollfd, revents); + should_quit = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0; if (pollfd == sock) { /* Event on the consumerd socket */ @@ -1552,11 +1636,6 @@ restart_poll: exit: error: - /* - * We lock here because we are about to close the sockets and some other - * thread might be using them so get exclusive access which will abort all - * other consumer command by other threads. - */ pthread_mutex_lock(&consumer_data->lock); /* Immediately set the consumerd state to stopped */ @@ -1570,6 +1649,13 @@ error: assert(0); } + /* + * This thread is responsible for its consumerd. Make sure the + * consumerd teardown is complete before proceding. + */ + kill_consumer(consumer_data); + wait_consumer(consumer_data); + if (consumer_data->err_sock >= 0) { ret = close(consumer_data->err_sock); if (ret) { @@ -1600,13 +1686,15 @@ error: unlink(consumer_data->err_unix_sock_path); unlink(consumer_data->cmd_unix_sock_path); - pthread_mutex_unlock(&consumer_data->lock); /* Cleanup metadata socket mutex. */ if (consumer_data->metadata_sock.lock) { pthread_mutex_destroy(consumer_data->metadata_sock.lock); free(consumer_data->metadata_sock.lock); } + + pthread_mutex_unlock(&consumer_data->lock); + lttng_poll_clean(&events); if (cmd_socket_wrapper) { @@ -2560,27 +2648,6 @@ error: } /* - * Join consumer thread - */ -static int join_consumer_thread(struct consumer_data *consumer_data) -{ - void *status; - - /* Consumer pid must be a real one. */ - if (consumer_data->pid > 0) { - int ret; - ret = kill(consumer_data->pid, SIGTERM); - if (ret) { - PERROR("Error killing consumer daemon"); - return ret; - } - return pthread_join(consumer_data->thread, &status); - } else { - return 0; - } -} - -/* * Fork and exec a consumer daemon (consumerd). * * Return pid if successful else -1. @@ -4741,27 +4808,6 @@ error_create_poll: rcu_unregister_thread(); - /* - * Since we are creating the consumer threads, we own them, so we need - * to join them before our thread exits. - */ - ret = join_consumer_thread(&kconsumer_data); - if (ret) { - errno = ret; - PERROR("join_consumer"); - } - - ret = join_consumer_thread(&ustconsumer32_data); - if (ret) { - errno = ret; - PERROR("join_consumer ust32"); - } - - ret = join_consumer_thread(&ustconsumer64_data); - if (ret) { - errno = ret; - PERROR("join_consumer ust64"); - } return NULL; } @@ -5785,6 +5831,11 @@ int main(int argc, char **argv) goto exit_init_data; } + if (init_thread_consumers_teardown_trigger_pipe()) { + retval = -1; + goto exit_init_data; + } + /* Check if daemon is UID = 0 */ is_root = !getuid(); @@ -6406,11 +6457,8 @@ exit_init_data: * perform lookups in those structures. */ rcu_barrier(); - /* - * sessiond_cleanup() is called when no other thread is running, except - * the ht_cleanup thread, which is needed to destroy the hash tables. - */ rcu_thread_online(); + sessiond_cleanup(); /* @@ -6461,6 +6509,8 @@ exit_init_data: retval = -1; } + /* Consumers thread teardown pipe cleanup */ + utils_close_pipe(thread_consumers_teardown_trigger_pipe); /* Health thread teardown pipe cleanup */ utils_close_pipe(thread_health_teardown_trigger_pipe); /* Apps thread teardown pipe cleanup */ -- 2.11.0 _______________________________________________ lttng-dev mailing list [email protected] https://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
