* David Goulet ([email protected]) wrote: > > > Mathieu Desnoyers: > > * David Goulet ([email protected]) wrote: > >> As a second step of refactoring, upon receiving a data stream, we send > >> it to the data thread that is now in charge of handling it. > >> > >> Furthermore, in order for this to behave correctly, we have to make the > >> ustctl actions on the stream upon before passing it to the right thread > >> (the kernel does not need special actions.). This way, once the sessiond > >> thread reply back to the session daemon, the stream is sure to be open > >> and ready for data to be recorded on the application side so we avoid a > >> race between the application thinking the stream is ready and the stream > >> thread still scheduled out. > > > > Normally, as long as we have a reference on the SHM file descriptor, and > > we have the wakeup FD, we should be good to fetch the data of buffers > > belonging to an application that has already exited, even if it did so > > before the ustctl calls are done. > > > > So I'm wondering why you do the ustctl calls in the sessiond thread ? It > > seems to complexify the implementation needlessly: we could still do the > > ustctl calls and output file open at the same location, the > > data/metadata threads. > > Hmmm, it was my understanding that does
does -> those > ustctl_* calls were needed > before the trace could be recording thus making them quickly. Wrong? Can you rephrase your question ? I don't understand. Thanks, Mathieu > > David > > > > > Thanks, > > > > Mathieu > > > >> > >> This commit should speed up the add stream process for the session > >> daemon. There is still some actions to move out of the session daemon > >> poll thread to gain speed significantly, especially for network > >> streaming. > >> > >> Signed-off-by: David Goulet <[email protected]> > >> --- > >> src/common/consumer.c | 123 > >> +++++++++++--------------- > >> src/common/consumer.h | 1 + > >> src/common/kernel-consumer/kernel-consumer.c | 24 ++--- > >> src/common/ust-consumer/ust-consumer.c | 40 ++++----- > >> 4 files changed, 78 insertions(+), 110 deletions(-) > >> > >> diff --git a/src/common/consumer.c b/src/common/consumer.c > >> index 055de1b..1d2b1f7 100644 > >> --- a/src/common/consumer.c > >> +++ b/src/common/consumer.c > >> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream > >> *consumer_find_stream(int key, > >> return stream; > >> } > >> > >> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) > >> +void consumer_steal_stream_key(int key, struct lttng_ht *ht) > >> { > >> struct lttng_consumer_stream *stream; > >> > >> @@ -409,6 +409,14 @@ struct lttng_consumer_stream > >> *consumer_allocate_stream( > >> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); > >> lttng_ht_node_init_ulong(&stream->node, stream->key); > >> > >> + /* > >> + * The cpu number is needed before using any ustctl_* actions. Ignored > >> for > >> + * the kernel so the value does not matter. > >> + */ > >> + pthread_mutex_lock(&consumer_data.lock); > >> + stream->cpu = stream->chan->cpucount++; > >> + pthread_mutex_unlock(&consumer_data.lock); > >> + > >> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len > >> %llu," > >> " out_fd %d, net_seq_idx %d)", stream->path_name, > >> stream->key, > >> stream->shm_fd, stream->wait_fd, > >> @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream > >> *stream) > >> pthread_mutex_lock(&consumer_data.lock); > >> rcu_read_lock(); > >> > >> - switch (consumer_data.type) { > >> - case LTTNG_CONSUMER_KERNEL: > >> - break; > >> - case LTTNG_CONSUMER32_UST: > >> - case LTTNG_CONSUMER64_UST: > >> - stream->cpu = stream->chan->cpucount++; > >> - ret = lttng_ustconsumer_add_stream(stream); > >> - if (ret) { > >> - ret = -EINVAL; > >> - goto error; > >> - } > >> - > >> - /* Steal stream identifier only for UST */ > >> - consumer_steal_stream_key(stream->key, consumer_data.stream_ht); > >> - break; > >> - default: > >> - ERR("Unknown consumer_data type"); > >> - assert(0); > >> - ret = -ENOSYS; > >> - goto error; > >> - } > >> - > >> lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); > >> > >> /* Check and cleanup relayd */ > >> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream > >> *stream) > >> consumer_data.stream_count++; > >> consumer_data.need_update = 1; > >> > >> -error: > >> rcu_read_unlock(); > >> pthread_mutex_unlock(&consumer_data.lock); > >> > >> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct > >> lttng_consumer_stream *stream, > >> > >> DBG3("Consumer delete metadata stream %d", stream->wait_fd); > >> > >> - if (ht == NULL) { > >> - /* Means the stream was allocated but not successfully added */ > >> - goto free_stream; > >> - } > >> - > >> - rcu_read_lock(); > >> - iter.iter.node = &stream->waitfd_node.node; > >> - ret = lttng_ht_del(ht, &iter); > >> - assert(!ret); > >> - rcu_read_unlock(); > >> - > >> pthread_mutex_lock(&consumer_data.lock); > >> switch (consumer_data.type) { > >> case LTTNG_CONSUMER_KERNEL: > >> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct > >> lttng_consumer_stream *stream, > >> goto end; > >> } > >> > >> + if (ht == NULL) { > >> + pthread_mutex_unlock(&consumer_data.lock); > >> + /* Means the stream was allocated but not successfully added */ > >> + goto free_stream; > >> + } > >> + > >> + rcu_read_lock(); > >> + iter.iter.node = &stream->waitfd_node.node; > >> + ret = lttng_ht_del(ht, &iter); > >> + assert(!ret); > >> + rcu_read_unlock(); > >> + > >> if (stream->out_fd >= 0) { > >> ret = close(stream->out_fd); > >> if (ret) { > >> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct > >> lttng_consumer_stream *stream, > >> > >> pthread_mutex_lock(&consumer_data.lock); > >> > >> - switch (consumer_data.type) { > >> - case LTTNG_CONSUMER_KERNEL: > >> - break; > >> - case LTTNG_CONSUMER32_UST: > >> - case LTTNG_CONSUMER64_UST: > >> - ret = lttng_ustconsumer_add_stream(stream); > >> - if (ret) { > >> - ret = -EINVAL; > >> - goto error; > >> - } > >> - > >> - /* Steal stream identifier only for UST */ > >> - consumer_steal_stream_key(stream->wait_fd, ht); > >> - break; > >> - default: > >> - ERR("Unknown consumer_data type"); > >> - assert(0); > >> - ret = -ENOSYS; > >> - goto error; > >> - } > >> - > >> /* > >> * From here, refcounts are updated so be _careful_ when returning an > >> error > >> * after this point. > >> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct > >> lttng_consumer_stream *stream, > >> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); > >> rcu_read_unlock(); > >> > >> -error: > >> pthread_mutex_unlock(&consumer_data.lock); > >> return ret; > >> } > >> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data) > >> int num_rdy, num_hup, high_prio, ret, i; > >> struct pollfd *pollfd = NULL; > >> /* local view of the streams */ > >> - struct lttng_consumer_stream **local_stream = NULL; > >> + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; > >> /* local view of consumer_data.fds_count */ > >> int nb_fd = 0; > >> struct lttng_consumer_local_data *ctx = data; > >> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data) > >> */ > >> if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { > >> size_t pipe_readlen; > >> - char tmp; > >> > >> DBG("consumer_poll_pipe wake up"); > >> /* Consume 1 byte of pipe data */ > >> do { > >> - pipe_readlen = read(ctx->consumer_poll_pipe[0], > >> &tmp, 1); > >> + pipe_readlen = read(ctx->consumer_poll_pipe[0], > >> &new_stream, > >> + sizeof(new_stream)); > >> } while (pipe_readlen == -1 && errno == EINTR); > >> + > >> + /* > >> + * If the stream is NULL, just ignore it. It's also > >> possible that > >> + * the sessiond poll thread changed the consumer_quit > >> state and is > >> + * waking us up to test it. > >> + */ > >> + if (new_stream == NULL) { > >> + continue; > >> + } > >> + > >> + ret = consumer_add_stream(new_stream); > >> + if (ret) { > >> + ERR("Consumer add stream %d failed. Continuing", > >> + new_stream->key); > >> + /* > >> + * At this point, if the add_stream fails, it > >> is not in the > >> + * hash table thus passing the NULL value here. > >> + */ > >> + consumer_del_stream(new_stream, NULL); > >> + } > >> + > >> + /* Continue to update the local streams and handle prio > >> ones */ > >> continue; > >> } > >> > >> @@ -2260,19 +2246,16 @@ end: > >> consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; > >> > >> /* > >> - * Wake-up the other end by writing a null byte in the pipe > >> - * (non-blocking). Important note: Because writing into the > >> - * pipe is non-blocking (and therefore we allow dropping wakeup > >> - * data, as long as there is wakeup data present in the pipe > >> - * buffer to wake up the other end), the other end should > >> - * perform the following sequence for waiting: > >> - * 1) empty the pipe (reads). > >> - * 2) perform update operation. > >> - * 3) wait on the pipe (poll). > >> + * Notify the data poll thread to poll back again and test the > >> + * consumer_quit state to quit gracefully. > >> */ > >> do { > >> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >> + struct lttng_consumer_stream *null_stream = NULL; > >> + > >> + ret = write(ctx->consumer_poll_pipe[1], &null_stream, > >> + sizeof(null_stream)); > >> } while (ret < 0 && errno == EINTR); > >> + > >> rcu_unregister_thread(); > >> return NULL; > >> } > >> diff --git a/src/common/consumer.h b/src/common/consumer.h > >> index 4b225e4..8e5891a 100644 > >> --- a/src/common/consumer.h > >> +++ b/src/common/consumer.h > >> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair > >> *consumer_allocate_relayd_sock_pair( > >> struct consumer_relayd_sock_pair *consumer_find_relayd(int key); > >> int consumer_handle_stream_before_relayd(struct lttng_consumer_stream > >> *stream, > >> size_t data_size); > >> +void consumer_steal_stream_key(int key, struct lttng_ht *ht); > >> > >> extern struct lttng_consumer_local_data *lttng_consumer_create( > >> enum lttng_consumer_type type, > >> diff --git a/src/common/kernel-consumer/kernel-consumer.c > >> b/src/common/kernel-consumer/kernel-consumer.c > >> index 13cbe21..444f5e0 100644 > >> --- a/src/common/kernel-consumer/kernel-consumer.c > >> +++ b/src/common/kernel-consumer/kernel-consumer.c > >> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct > >> lttng_consumer_local_data *ctx, > >> consumer_del_stream(new_stream, NULL); > >> } > >> } else { > >> - ret = consumer_add_stream(new_stream); > >> - if (ret) { > >> - ERR("Consumer add stream %d failed. Continuing", > >> - new_stream->key); > >> + do { > >> + ret = write(ctx->consumer_poll_pipe[1], > >> &new_stream, > >> + sizeof(new_stream)); > >> + } while (ret < 0 && errno == EINTR); > >> + if (ret < 0) { > >> + PERROR("write data pipe"); > >> consumer_del_stream(new_stream, NULL); > >> goto end_nosignal; > >> } > >> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct > >> lttng_consumer_local_data *ctx, > >> goto end_nosignal; > >> } > >> > >> - /* > >> - * Wake-up the other end by writing a null byte in the pipe > >> (non-blocking). > >> - * Important note: Because writing into the pipe is non-blocking (and > >> - * therefore we allow dropping wakeup data, as long as there is wakeup > >> data > >> - * present in the pipe buffer to wake up the other end), the other end > >> - * should perform the following sequence for waiting: > >> - * > >> - * 1) empty the pipe (reads). > >> - * 2) perform update operation. > >> - * 3) wait on the pipe (poll). > >> - */ > >> - do { > >> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >> - } while (ret < 0 && errno == EINTR); > >> end_nosignal: > >> rcu_read_unlock(); > >> > >> diff --git a/src/common/ust-consumer/ust-consumer.c > >> b/src/common/ust-consumer/ust-consumer.c > >> index 1170687..4ca4b84 100644 > >> --- a/src/common/ust-consumer/ust-consumer.c > >> +++ b/src/common/ust-consumer/ust-consumer.c > >> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct > >> lttng_consumer_local_data *ctx, > >> goto end_nosignal; > >> } > >> > >> + /* > >> + * This needs to be done as soon as we can so we don't block the > >> + * application too long. > >> + */ > >> + ret = lttng_ustconsumer_add_stream(new_stream); > >> + if (ret) { > >> + consumer_del_stream(new_stream, NULL); > >> + goto end_nosignal; > >> + } > >> + /* Steal stream identifier to avoid having streams with the > >> same key */ > >> + consumer_steal_stream_key(new_stream->key, > >> consumer_data.stream_ht); > >> + > >> /* The stream is not metadata. Get relayd reference if exists. > >> */ > >> relayd = consumer_find_relayd(msg.u.stream.net_index); > >> if (relayd != NULL) { > >> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct > >> lttng_consumer_local_data *ctx, > >> goto end_nosignal; > >> } > >> } else { > >> - ret = consumer_add_stream(new_stream); > >> - if (ret) { > >> - ERR("Consumer add stream %d failed. Continuing", > >> - new_stream->key); > >> - /* > >> - * At this point, if the add_stream fails, it > >> is not in the > >> - * hash table thus passing the NULL value here. > >> - */ > >> + do { > >> + ret = write(ctx->consumer_poll_pipe[1], > >> &new_stream, > >> + sizeof(new_stream)); > >> + } while (ret < 0 && errno == EINTR); > >> + if (ret < 0) { > >> + PERROR("write data pipe"); > >> consumer_del_stream(new_stream, NULL); > >> goto end_nosignal; > >> } > >> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct > >> lttng_consumer_local_data *ctx, > >> break; > >> } > >> > >> - /* > >> - * Wake-up the other end by writing a null byte in the pipe > >> (non-blocking). > >> - * Important note: Because writing into the pipe is non-blocking (and > >> - * therefore we allow dropping wakeup data, as long as there is wakeup > >> data > >> - * present in the pipe buffer to wake up the other end), the other end > >> - * should perform the following sequence for waiting: > >> - * > >> - * 1) empty the pipe (reads). > >> - * 2) perform update operation. > >> - * 3) wait on the pipe (poll). > >> - */ > >> - do { > >> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >> - } while (ret < 0 && errno == EINTR); > >> end_nosignal: > >> rcu_read_unlock(); > >> > >> -- > >> 1.7.10.4 > >> > >> > >> _______________________________________________ > >> lttng-dev mailing list > >> [email protected] > >> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev > > -- Mathieu Desnoyers Operating System Efficiency R&D Consultant EfficiOS Inc. http://www.efficios.com _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
