Move consumerd ownership to thread_manage_consumer to scope the lifetime
on the consumerd to its manager thread.

"thread_manage_consumer" is responsible for signaling and waiting the
termination of its consumerd.

All thread_manage_consumer threads now wait on a unique quit pipe
different from the global thread quit pipe. This allow control over its
lifetime.

The termination notification is sent during sessiond_cleanup after the
destroy session command to ensure that no session are still active at
the moment the consumerds are terminated.

Signed-off-by: Jonathan Rajotte <[email protected]>
---
 src/bin/lttng-sessiond/main.c | 174 +++++++++++++++++++++++++++---------------
 1 file changed, 112 insertions(+), 62 deletions(-)

diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c
index a840e8de..fb58ab4b 100644
--- a/src/bin/lttng-sessiond/main.c
+++ b/src/bin/lttng-sessiond/main.c
@@ -206,6 +206,7 @@ static int kernel_poll_pipe[2] = { -1, -1 };
 static int thread_quit_pipe[2] = { -1, -1 };
 static int thread_health_teardown_trigger_pipe[2] = { -1, -1 };
 static int thread_apps_teardown_trigger_pipe[2] = { -1, -1 };
+static int thread_consumers_teardown_trigger_pipe[2] = { -1, -1 };
 int thread_apps_notify_teardown_trigger_pipe[2] = { -1, -1 };
 
 /*
@@ -495,6 +496,11 @@ static int 
init_thread_apps_notify_teardown_trigger_pipe(void)
        return 
__init_thread_quit_pipe(thread_apps_notify_teardown_trigger_pipe);
 }
 
+static int init_thread_consumers_teardown_trigger_pipe(void)
+{
+       return __init_thread_quit_pipe(thread_consumers_teardown_trigger_pipe);
+}
+
 /*
  * Stop first wave threads by closing the thread quit pipe.
  *  - kernel thread
@@ -601,14 +607,15 @@ static int generate_lock_file_path(char *path, size_t len)
 /*
  * Wait on consumer process termination.
  *
- * Need to be called with the consumer data lock held or from a context
- * ensuring no concurrent access to data (e.g: cleanup).
+ * Need to be called with the consumer data lock held.
  */
 static void wait_consumer(struct consumer_data *consumer_data)
 {
        pid_t ret;
        int status;
 
+       assert(consumer_data);
+
        if (consumer_data->pid <= 0) {
                return;
        }
@@ -626,6 +633,52 @@ static void wait_consumer(struct consumer_data 
*consumer_data)
 }
 
 /*
+ * Signal to the consumer process to terminate.
+ *
+ * Need to be called with the consumer data lock held.
+ */
+static void kill_consumer(struct consumer_data *consumer_data)
+{
+       int ret;
+
+       assert(consumer_data);
+
+       /* Consumer pid must be a real one. */
+       if (consumer_data->pid <= 0) {
+               goto end;
+       }
+
+       ret = kill(consumer_data->pid, SIGTERM);
+       if (ret) {
+               PERROR("Error killing consumer daemon");
+               goto end;
+       }
+end:
+       return;
+}
+
+static int join_thread_consumer(struct consumer_data *consumer_data)
+{
+       int ret;
+       void *status;
+
+       assert(consumer_data);
+
+       /* Consumer pid must be a real one. */
+       if (consumer_data->pid <= 0) {
+               ret = 0;
+               goto end;
+       }
+
+       ret = pthread_join(consumer_data->thread, &status);
+       if (ret) {
+               ERR("Joining consumer thread pid %d", consumer_data->pid);
+       }
+end:
+       return ret;
+}
+
+/*
  * Cleanup the session daemon's data structures.
  */
 static void sessiond_cleanup(void)
@@ -707,7 +760,6 @@ static void sessiond_cleanup(void)
        (void) rmdir(path);
 
        DBG("Cleaning up all sessions");
-
        /* Destroy session list mutex */
        if (session_list_ptr != NULL) {
                pthread_mutex_destroy(&session_list_ptr->lock);
@@ -719,9 +771,35 @@ static void sessiond_cleanup(void)
                }
        }
 
-       wait_consumer(&kconsumer_data);
-       wait_consumer(&ustconsumer64_data);
-       wait_consumer(&ustconsumer32_data);
+       /*
+        * Delay the termination of manage_consumer_thread threads to allow
+        * proper metadata flushing, following the session destroy. Use a
+        * barrier to ensure that all call_rcu are executed at this point.
+        */
+       DBG("Teardown consurmer thread");
+       rcu_barrier();
+       ret = notify_thread_pipe(thread_consumers_teardown_trigger_pipe[1]);
+       if (ret < 0) {
+               ERR("write error on thread consumer quit pipe");
+       }
+
+       ret = join_thread_consumer(&kconsumer_data);
+       if (ret) {
+               errno = ret;
+               PERROR("join_consumer kernel");
+       }
+
+       ret = join_thread_consumer(&ustconsumer32_data);
+       if (ret) {
+               errno = ret;
+               PERROR("join_consumer ust32");
+       }
+
+       ret = join_thread_consumer(&ustconsumer64_data);
+       if (ret) {
+               errno = ret;
+               PERROR("join_consumer ust64");
+       }
 
        DBG("Cleaning up all agent apps");
        agent_app_ht_clean();
@@ -1289,14 +1367,20 @@ static void *thread_manage_consumer(void *data)
        health_code_update();
 
        /*
-        * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and 
the
+        * Pass 3 as size here for the thread consumer quit pipe, 
consumerd_err_sock and the
         * metadata_sock. Nothing more will be added to this poll set.
         */
-       ret = sessiond_set_thread_pollset(&events, 3);
+       ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
        if (ret < 0) {
                goto error_poll;
        }
 
+       /* Add quit pipe */
+       ret = lttng_poll_add(&events, 
thread_consumers_teardown_trigger_pipe[0], LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               goto error;
+       }
+
        /*
         * The error socket here is already in a listening state which was done
         * just before spawning this thread to avoid a race between the consumer
@@ -1344,7 +1428,7 @@ restart:
                }
 
                /* Thread quit pipe has been closed. Killing thread. */
-               ret = sessiond_check_thread_quit_pipe(pollfd, revents);
+               ret = (pollfd == thread_consumers_teardown_trigger_pipe[0] && 
(revents & LPOLLIN)) ? 1 : 0;
                if (ret) {
                        err = 0;
                        goto exit;
@@ -1509,7 +1593,7 @@ restart_poll:
                         * but continue the current loop to handle potential 
data from
                         * consumer.
                         */
-                       should_quit = sessiond_check_thread_quit_pipe(pollfd, 
revents);
+                       should_quit = (pollfd == 
thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;
 
                        if (pollfd == sock) {
                                /* Event on the consumerd socket */
@@ -1552,11 +1636,6 @@ restart_poll:
 
 exit:
 error:
-       /*
-        * We lock here because we are about to close the sockets and some other
-        * thread might be using them so get exclusive access which will abort 
all
-        * other consumer command by other threads.
-        */
        pthread_mutex_lock(&consumer_data->lock);
 
        /* Immediately set the consumerd state to stopped */
@@ -1570,6 +1649,13 @@ error:
                assert(0);
        }
 
+       /*
+        * This thread is responsible for its consumerd. Make sure the
+        * consumerd teardown is complete before proceding.
+        */
+       kill_consumer(consumer_data);
+       wait_consumer(consumer_data);
+
        if (consumer_data->err_sock >= 0) {
                ret = close(consumer_data->err_sock);
                if (ret) {
@@ -1600,13 +1686,15 @@ error:
 
        unlink(consumer_data->err_unix_sock_path);
        unlink(consumer_data->cmd_unix_sock_path);
-       pthread_mutex_unlock(&consumer_data->lock);
 
        /* Cleanup metadata socket mutex. */
        if (consumer_data->metadata_sock.lock) {
                pthread_mutex_destroy(consumer_data->metadata_sock.lock);
                free(consumer_data->metadata_sock.lock);
        }
+
+       pthread_mutex_unlock(&consumer_data->lock);
+
        lttng_poll_clean(&events);
 
        if (cmd_socket_wrapper) {
@@ -2560,27 +2648,6 @@ error:
 }
 
 /*
- * Join consumer thread
- */
-static int join_consumer_thread(struct consumer_data *consumer_data)
-{
-       void *status;
-
-       /* Consumer pid must be a real one. */
-       if (consumer_data->pid > 0) {
-               int ret;
-               ret = kill(consumer_data->pid, SIGTERM);
-               if (ret) {
-                       PERROR("Error killing consumer daemon");
-                       return ret;
-               }
-               return pthread_join(consumer_data->thread, &status);
-       } else {
-               return 0;
-       }
-}
-
-/*
  * Fork and exec a consumer daemon (consumerd).
  *
  * Return pid if successful else -1.
@@ -4741,27 +4808,6 @@ error_create_poll:
 
        rcu_unregister_thread();
 
-       /*
-        * Since we are creating the consumer threads, we own them, so we need
-        * to join them before our thread exits.
-        */
-       ret = join_consumer_thread(&kconsumer_data);
-       if (ret) {
-               errno = ret;
-               PERROR("join_consumer");
-       }
-
-       ret = join_consumer_thread(&ustconsumer32_data);
-       if (ret) {
-               errno = ret;
-               PERROR("join_consumer ust32");
-       }
-
-       ret = join_consumer_thread(&ustconsumer64_data);
-       if (ret) {
-               errno = ret;
-               PERROR("join_consumer ust64");
-       }
        return NULL;
 }
 
@@ -5785,6 +5831,11 @@ int main(int argc, char **argv)
                goto exit_init_data;
        }
 
+       if (init_thread_consumers_teardown_trigger_pipe()) {
+               retval = -1;
+               goto exit_init_data;
+       }
+
        /* Check if daemon is UID = 0 */
        is_root = !getuid();
 
@@ -6406,11 +6457,8 @@ exit_init_data:
         * perform lookups in those structures.
         */
        rcu_barrier();
-       /*
-        * sessiond_cleanup() is called when no other thread is running, except
-        * the ht_cleanup thread, which is needed to destroy the hash tables.
-        */
        rcu_thread_online();
+
        sessiond_cleanup();
 
        /*
@@ -6461,6 +6509,8 @@ exit_init_data:
                retval = -1;
        }
 
+       /* Consumers thread teardown pipe cleanup */
+       utils_close_pipe(thread_consumers_teardown_trigger_pipe);
        /* Health thread teardown pipe cleanup */
        utils_close_pipe(thread_health_teardown_trigger_pipe);
        /* Apps thread teardown pipe cleanup */
-- 
2.11.0

_______________________________________________
lttng-dev mailing list
[email protected]
https://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to