Mathieu Desnoyers: > * David Goulet ([email protected]) wrote: >> As a second step of refactoring, upon receiving a data stream, we send >> it to the data thread that is now in charge of handling it. >> >> Furthermore, in order for this to behave correctly, we have to make the >> ustctl actions on the stream upon before passing it to the right thread >> (the kernel does not need special actions.). This way, once the sessiond >> thread reply back to the session daemon, the stream is sure to be open >> and ready for data to be recorded on the application side so we avoid a >> race between the application thinking the stream is ready and the stream >> thread still scheduled out. > > Normally, as long as we have a reference on the SHM file descriptor, and > we have the wakeup FD, we should be good to fetch the data of buffers > belonging to an application that has already exited, even if it did so > before the ustctl calls are done. > > So I'm wondering why you do the ustctl calls in the sessiond thread ? It > seems to complexify the implementation needlessly: we could still do the > ustctl calls and output file open at the same location, the > data/metadata threads.
Hmmm, it was my understanding that does ustctl_* calls were needed before the trace could be recording thus making them quickly. Wrong? David > > Thanks, > > Mathieu > >> >> This commit should speed up the add stream process for the session >> daemon. There is still some actions to move out of the session daemon >> poll thread to gain speed significantly, especially for network >> streaming. >> >> Signed-off-by: David Goulet <[email protected]> >> --- >> src/common/consumer.c | 123 >> +++++++++++--------------- >> src/common/consumer.h | 1 + >> src/common/kernel-consumer/kernel-consumer.c | 24 ++--- >> src/common/ust-consumer/ust-consumer.c | 40 ++++----- >> 4 files changed, 78 insertions(+), 110 deletions(-) >> >> diff --git a/src/common/consumer.c b/src/common/consumer.c >> index 055de1b..1d2b1f7 100644 >> --- a/src/common/consumer.c >> +++ b/src/common/consumer.c >> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream >> *consumer_find_stream(int key, >> return stream; >> } >> >> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) >> +void consumer_steal_stream_key(int key, struct lttng_ht *ht) >> { >> struct lttng_consumer_stream *stream; >> >> @@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream( >> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); >> lttng_ht_node_init_ulong(&stream->node, stream->key); >> >> + /* >> + * The cpu number is needed before using any ustctl_* actions. Ignored >> for >> + * the kernel so the value does not matter. >> + */ >> + pthread_mutex_lock(&consumer_data.lock); >> + stream->cpu = stream->chan->cpucount++; >> + pthread_mutex_unlock(&consumer_data.lock); >> + >> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len >> %llu," >> " out_fd %d, net_seq_idx %d)", stream->path_name, >> stream->key, >> stream->shm_fd, stream->wait_fd, >> @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream >> *stream) >> pthread_mutex_lock(&consumer_data.lock); >> rcu_read_lock(); >> >> - switch (consumer_data.type) { >> - case LTTNG_CONSUMER_KERNEL: >> - break; >> - case LTTNG_CONSUMER32_UST: >> - case LTTNG_CONSUMER64_UST: >> - stream->cpu = stream->chan->cpucount++; >> - ret = lttng_ustconsumer_add_stream(stream); >> - if (ret) { >> - ret = -EINVAL; >> - goto error; >> - } >> - >> - /* Steal stream identifier only for UST */ >> - consumer_steal_stream_key(stream->key, consumer_data.stream_ht); >> - break; >> - default: >> - ERR("Unknown consumer_data type"); >> - assert(0); >> - ret = -ENOSYS; >> - goto error; >> - } >> - >> lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); >> >> /* Check and cleanup relayd */ >> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream >> *stream) >> consumer_data.stream_count++; >> consumer_data.need_update = 1; >> >> -error: >> rcu_read_unlock(); >> pthread_mutex_unlock(&consumer_data.lock); >> >> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct >> lttng_consumer_stream *stream, >> >> DBG3("Consumer delete metadata stream %d", stream->wait_fd); >> >> - if (ht == NULL) { >> - /* Means the stream was allocated but not successfully added */ >> - goto free_stream; >> - } >> - >> - rcu_read_lock(); >> - iter.iter.node = &stream->waitfd_node.node; >> - ret = lttng_ht_del(ht, &iter); >> - assert(!ret); >> - rcu_read_unlock(); >> - >> pthread_mutex_lock(&consumer_data.lock); >> switch (consumer_data.type) { >> case LTTNG_CONSUMER_KERNEL: >> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct >> lttng_consumer_stream *stream, >> goto end; >> } >> >> + if (ht == NULL) { >> + pthread_mutex_unlock(&consumer_data.lock); >> + /* Means the stream was allocated but not successfully added */ >> + goto free_stream; >> + } >> + >> + rcu_read_lock(); >> + iter.iter.node = &stream->waitfd_node.node; >> + ret = lttng_ht_del(ht, &iter); >> + assert(!ret); >> + rcu_read_unlock(); >> + >> if (stream->out_fd >= 0) { >> ret = close(stream->out_fd); >> if (ret) { >> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct >> lttng_consumer_stream *stream, >> >> pthread_mutex_lock(&consumer_data.lock); >> >> - switch (consumer_data.type) { >> - case LTTNG_CONSUMER_KERNEL: >> - break; >> - case LTTNG_CONSUMER32_UST: >> - case LTTNG_CONSUMER64_UST: >> - ret = lttng_ustconsumer_add_stream(stream); >> - if (ret) { >> - ret = -EINVAL; >> - goto error; >> - } >> - >> - /* Steal stream identifier only for UST */ >> - consumer_steal_stream_key(stream->wait_fd, ht); >> - break; >> - default: >> - ERR("Unknown consumer_data type"); >> - assert(0); >> - ret = -ENOSYS; >> - goto error; >> - } >> - >> /* >> * From here, refcounts are updated so be _careful_ when returning an >> error >> * after this point. >> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct >> lttng_consumer_stream *stream, >> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); >> rcu_read_unlock(); >> >> -error: >> pthread_mutex_unlock(&consumer_data.lock); >> return ret; >> } >> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data) >> int num_rdy, num_hup, high_prio, ret, i; >> struct pollfd *pollfd = NULL; >> /* local view of the streams */ >> - struct lttng_consumer_stream **local_stream = NULL; >> + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; >> /* local view of consumer_data.fds_count */ >> int nb_fd = 0; >> struct lttng_consumer_local_data *ctx = data; >> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data) >> */ >> if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { >> size_t pipe_readlen; >> - char tmp; >> >> DBG("consumer_poll_pipe wake up"); >> /* Consume 1 byte of pipe data */ >> do { >> - pipe_readlen = read(ctx->consumer_poll_pipe[0], >> &tmp, 1); >> + pipe_readlen = read(ctx->consumer_poll_pipe[0], >> &new_stream, >> + sizeof(new_stream)); >> } while (pipe_readlen == -1 && errno == EINTR); >> + >> + /* >> + * If the stream is NULL, just ignore it. It's also >> possible that >> + * the sessiond poll thread changed the consumer_quit >> state and is >> + * waking us up to test it. >> + */ >> + if (new_stream == NULL) { >> + continue; >> + } >> + >> + ret = consumer_add_stream(new_stream); >> + if (ret) { >> + ERR("Consumer add stream %d failed. Continuing", >> + new_stream->key); >> + /* >> + * At this point, if the add_stream fails, it >> is not in the >> + * hash table thus passing the NULL value here. >> + */ >> + consumer_del_stream(new_stream, NULL); >> + } >> + >> + /* Continue to update the local streams and handle prio >> ones */ >> continue; >> } >> >> @@ -2260,19 +2246,16 @@ end: >> consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; >> >> /* >> - * Wake-up the other end by writing a null byte in the pipe >> - * (non-blocking). Important note: Because writing into the >> - * pipe is non-blocking (and therefore we allow dropping wakeup >> - * data, as long as there is wakeup data present in the pipe >> - * buffer to wake up the other end), the other end should >> - * perform the following sequence for waiting: >> - * 1) empty the pipe (reads). >> - * 2) perform update operation. >> - * 3) wait on the pipe (poll). >> + * Notify the data poll thread to poll back again and test the >> + * consumer_quit state to quit gracefully. >> */ >> do { >> - ret = write(ctx->consumer_poll_pipe[1], "", 1); >> + struct lttng_consumer_stream *null_stream = NULL; >> + >> + ret = write(ctx->consumer_poll_pipe[1], &null_stream, >> + sizeof(null_stream)); >> } while (ret < 0 && errno == EINTR); >> + >> rcu_unregister_thread(); >> return NULL; >> } >> diff --git a/src/common/consumer.h b/src/common/consumer.h >> index 4b225e4..8e5891a 100644 >> --- a/src/common/consumer.h >> +++ b/src/common/consumer.h >> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair >> *consumer_allocate_relayd_sock_pair( >> struct consumer_relayd_sock_pair *consumer_find_relayd(int key); >> int consumer_handle_stream_before_relayd(struct lttng_consumer_stream >> *stream, >> size_t data_size); >> +void consumer_steal_stream_key(int key, struct lttng_ht *ht); >> >> extern struct lttng_consumer_local_data *lttng_consumer_create( >> enum lttng_consumer_type type, >> diff --git a/src/common/kernel-consumer/kernel-consumer.c >> b/src/common/kernel-consumer/kernel-consumer.c >> index 13cbe21..444f5e0 100644 >> --- a/src/common/kernel-consumer/kernel-consumer.c >> +++ b/src/common/kernel-consumer/kernel-consumer.c >> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct >> lttng_consumer_local_data *ctx, >> consumer_del_stream(new_stream, NULL); >> } >> } else { >> - ret = consumer_add_stream(new_stream); >> - if (ret) { >> - ERR("Consumer add stream %d failed. Continuing", >> - new_stream->key); >> + do { >> + ret = write(ctx->consumer_poll_pipe[1], >> &new_stream, >> + sizeof(new_stream)); >> + } while (ret < 0 && errno == EINTR); >> + if (ret < 0) { >> + PERROR("write data pipe"); >> consumer_del_stream(new_stream, NULL); >> goto end_nosignal; >> } >> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct >> lttng_consumer_local_data *ctx, >> goto end_nosignal; >> } >> >> - /* >> - * Wake-up the other end by writing a null byte in the pipe >> (non-blocking). >> - * Important note: Because writing into the pipe is non-blocking (and >> - * therefore we allow dropping wakeup data, as long as there is wakeup >> data >> - * present in the pipe buffer to wake up the other end), the other end >> - * should perform the following sequence for waiting: >> - * >> - * 1) empty the pipe (reads). >> - * 2) perform update operation. >> - * 3) wait on the pipe (poll). >> - */ >> - do { >> - ret = write(ctx->consumer_poll_pipe[1], "", 1); >> - } while (ret < 0 && errno == EINTR); >> end_nosignal: >> rcu_read_unlock(); >> >> diff --git a/src/common/ust-consumer/ust-consumer.c >> b/src/common/ust-consumer/ust-consumer.c >> index 1170687..4ca4b84 100644 >> --- a/src/common/ust-consumer/ust-consumer.c >> +++ b/src/common/ust-consumer/ust-consumer.c >> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct >> lttng_consumer_local_data *ctx, >> goto end_nosignal; >> } >> >> + /* >> + * This needs to be done as soon as we can so we don't block the >> + * application too long. >> + */ >> + ret = lttng_ustconsumer_add_stream(new_stream); >> + if (ret) { >> + consumer_del_stream(new_stream, NULL); >> + goto end_nosignal; >> + } >> + /* Steal stream identifier to avoid having streams with the >> same key */ >> + consumer_steal_stream_key(new_stream->key, >> consumer_data.stream_ht); >> + >> /* The stream is not metadata. Get relayd reference if exists. >> */ >> relayd = consumer_find_relayd(msg.u.stream.net_index); >> if (relayd != NULL) { >> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct >> lttng_consumer_local_data *ctx, >> goto end_nosignal; >> } >> } else { >> - ret = consumer_add_stream(new_stream); >> - if (ret) { >> - ERR("Consumer add stream %d failed. Continuing", >> - new_stream->key); >> - /* >> - * At this point, if the add_stream fails, it >> is not in the >> - * hash table thus passing the NULL value here. >> - */ >> + do { >> + ret = write(ctx->consumer_poll_pipe[1], >> &new_stream, >> + sizeof(new_stream)); >> + } while (ret < 0 && errno == EINTR); >> + if (ret < 0) { >> + PERROR("write data pipe"); >> consumer_del_stream(new_stream, NULL); >> goto end_nosignal; >> } >> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct >> lttng_consumer_local_data *ctx, >> break; >> } >> >> - /* >> - * Wake-up the other end by writing a null byte in the pipe >> (non-blocking). >> - * Important note: Because writing into the pipe is non-blocking (and >> - * therefore we allow dropping wakeup data, as long as there is wakeup >> data >> - * present in the pipe buffer to wake up the other end), the other end >> - * should perform the following sequence for waiting: >> - * >> - * 1) empty the pipe (reads). >> - * 2) perform update operation. >> - * 3) wait on the pipe (poll). >> - */ >> - do { >> - ret = write(ctx->consumer_poll_pipe[1], "", 1); >> - } while (ret < 0 && errno == EINTR); >> end_nosignal: >> rcu_read_unlock(); >> >> -- >> 1.7.10.4 >> >> >> _______________________________________________ >> lttng-dev mailing list >> [email protected] >> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev > _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
