Mathieu Desnoyers: > * David Goulet ([email protected]) wrote: >> >> >> Mathieu Desnoyers: >>> * David Goulet ([email protected]) wrote: >>>> >>>> >>>> Mathieu Desnoyers: >>>>> * David Goulet ([email protected]) wrote: >>>>>> As a second step of refactoring, upon receiving a data stream, we send >>>>>> it to the data thread that is now in charge of handling it. >>>>>> >>>>>> Furthermore, in order for this to behave correctly, we have to make the >>>>>> ustctl actions on the stream upon before passing it to the right thread >>>>>> (the kernel does not need special actions.). This way, once the sessiond >>>>>> thread reply back to the session daemon, the stream is sure to be open >>>>>> and ready for data to be recorded on the application side so we avoid a >>>>>> race between the application thinking the stream is ready and the stream >>>>>> thread still scheduled out. >>>>> >>>>> Normally, as long as we have a reference on the SHM file descriptor, and >>>>> we have the wakeup FD, we should be good to fetch the data of buffers >>>>> belonging to an application that has already exited, even if it did so >>>>> before the ustctl calls are done. >>>>> >>>>> So I'm wondering why you do the ustctl calls in the sessiond thread ? It >>>>> seems to complexify the implementation needlessly: we could still do the >>>>> ustctl calls and output file open at the same location, the >>>>> data/metadata threads. >>>> >>>> Hmmm, it was my understanding that does >>> >>> does -> those >>> >>>> ustctl_* calls were needed >>>> before the trace could be recording thus making them quickly. Wrong? >>> >>> Can you rephrase your question ? I don't understand. >>> >> >> My understanding was that _those_ ustctl calls need to be done before >> the tracer could start recording data. This is why they were moved to >> the session daemon thread. >> >> Am I wrong here? When receiving an UST stream< on the consumer side, is >> the SHM reference already acquired? > > yes, the reference to shm is already acquired: it's the FD that _has_ > the reference.
Ok good. So just to be crystal clear here, the ustctl* calls can be delayed and done in the right thread? (data/metadata). David > > Thanks, > > Mathieu > >> >> David >> >>> Thanks, >>> >>> Mathieu >>> >>>> >>>> David >>>> >>>>> >>>>> Thanks, >>>>> >>>>> Mathieu >>>>> >>>>>> >>>>>> This commit should speed up the add stream process for the session >>>>>> daemon. There is still some actions to move out of the session daemon >>>>>> poll thread to gain speed significantly, especially for network >>>>>> streaming. >>>>>> >>>>>> Signed-off-by: David Goulet <[email protected]> >>>>>> --- >>>>>> src/common/consumer.c | 123 >>>>>> +++++++++++--------------- >>>>>> src/common/consumer.h | 1 + >>>>>> src/common/kernel-consumer/kernel-consumer.c | 24 ++--- >>>>>> src/common/ust-consumer/ust-consumer.c | 40 ++++----- >>>>>> 4 files changed, 78 insertions(+), 110 deletions(-) >>>>>> >>>>>> diff --git a/src/common/consumer.c b/src/common/consumer.c >>>>>> index 055de1b..1d2b1f7 100644 >>>>>> --- a/src/common/consumer.c >>>>>> +++ b/src/common/consumer.c >>>>>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream >>>>>> *consumer_find_stream(int key, >>>>>> return stream; >>>>>> } >>>>>> >>>>>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) >>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht) >>>>>> { >>>>>> struct lttng_consumer_stream *stream; >>>>>> >>>>>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream >>>>>> *consumer_allocate_stream( >>>>>> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); >>>>>> lttng_ht_node_init_ulong(&stream->node, stream->key); >>>>>> >>>>>> + /* >>>>>> + * The cpu number is needed before using any ustctl_* actions. >>>>>> Ignored for >>>>>> + * the kernel so the value does not matter. >>>>>> + */ >>>>>> + pthread_mutex_lock(&consumer_data.lock); >>>>>> + stream->cpu = stream->chan->cpucount++; >>>>>> + pthread_mutex_unlock(&consumer_data.lock); >>>>>> + >>>>>> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, >>>>>> mmap_len %llu," >>>>>> " out_fd %d, net_seq_idx %d)", >>>>>> stream->path_name, stream->key, >>>>>> stream->shm_fd, stream->wait_fd, >>>>>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct >>>>>> lttng_consumer_stream *stream) >>>>>> pthread_mutex_lock(&consumer_data.lock); >>>>>> rcu_read_lock(); >>>>>> >>>>>> - switch (consumer_data.type) { >>>>>> - case LTTNG_CONSUMER_KERNEL: >>>>>> - break; >>>>>> - case LTTNG_CONSUMER32_UST: >>>>>> - case LTTNG_CONSUMER64_UST: >>>>>> - stream->cpu = stream->chan->cpucount++; >>>>>> - ret = lttng_ustconsumer_add_stream(stream); >>>>>> - if (ret) { >>>>>> - ret = -EINVAL; >>>>>> - goto error; >>>>>> - } >>>>>> - >>>>>> - /* Steal stream identifier only for UST */ >>>>>> - consumer_steal_stream_key(stream->key, >>>>>> consumer_data.stream_ht); >>>>>> - break; >>>>>> - default: >>>>>> - ERR("Unknown consumer_data type"); >>>>>> - assert(0); >>>>>> - ret = -ENOSYS; >>>>>> - goto error; >>>>>> - } >>>>>> - >>>>>> lttng_ht_add_unique_ulong(consumer_data.stream_ht, >>>>>> &stream->node); >>>>>> >>>>>> /* Check and cleanup relayd */ >>>>>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream >>>>>> *stream) >>>>>> consumer_data.stream_count++; >>>>>> consumer_data.need_update = 1; >>>>>> >>>>>> -error: >>>>>> rcu_read_unlock(); >>>>>> pthread_mutex_unlock(&consumer_data.lock); >>>>>> >>>>>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct >>>>>> lttng_consumer_stream *stream, >>>>>> >>>>>> DBG3("Consumer delete metadata stream %d", stream->wait_fd); >>>>>> >>>>>> - if (ht == NULL) { >>>>>> - /* Means the stream was allocated but not successfully >>>>>> added */ >>>>>> - goto free_stream; >>>>>> - } >>>>>> - >>>>>> - rcu_read_lock(); >>>>>> - iter.iter.node = &stream->waitfd_node.node; >>>>>> - ret = lttng_ht_del(ht, &iter); >>>>>> - assert(!ret); >>>>>> - rcu_read_unlock(); >>>>>> - >>>>>> pthread_mutex_lock(&consumer_data.lock); >>>>>> switch (consumer_data.type) { >>>>>> case LTTNG_CONSUMER_KERNEL: >>>>>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct >>>>>> lttng_consumer_stream *stream, >>>>>> goto end; >>>>>> } >>>>>> >>>>>> + if (ht == NULL) { >>>>>> + pthread_mutex_unlock(&consumer_data.lock); >>>>>> + /* Means the stream was allocated but not successfully >>>>>> added */ >>>>>> + goto free_stream; >>>>>> + } >>>>>> + >>>>>> + rcu_read_lock(); >>>>>> + iter.iter.node = &stream->waitfd_node.node; >>>>>> + ret = lttng_ht_del(ht, &iter); >>>>>> + assert(!ret); >>>>>> + rcu_read_unlock(); >>>>>> + >>>>>> if (stream->out_fd >= 0) { >>>>>> ret = close(stream->out_fd); >>>>>> if (ret) { >>>>>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct >>>>>> lttng_consumer_stream *stream, >>>>>> >>>>>> pthread_mutex_lock(&consumer_data.lock); >>>>>> >>>>>> - switch (consumer_data.type) { >>>>>> - case LTTNG_CONSUMER_KERNEL: >>>>>> - break; >>>>>> - case LTTNG_CONSUMER32_UST: >>>>>> - case LTTNG_CONSUMER64_UST: >>>>>> - ret = lttng_ustconsumer_add_stream(stream); >>>>>> - if (ret) { >>>>>> - ret = -EINVAL; >>>>>> - goto error; >>>>>> - } >>>>>> - >>>>>> - /* Steal stream identifier only for UST */ >>>>>> - consumer_steal_stream_key(stream->wait_fd, ht); >>>>>> - break; >>>>>> - default: >>>>>> - ERR("Unknown consumer_data type"); >>>>>> - assert(0); >>>>>> - ret = -ENOSYS; >>>>>> - goto error; >>>>>> - } >>>>>> - >>>>>> /* >>>>>> * From here, refcounts are updated so be _careful_ when >>>>>> returning an error >>>>>> * after this point. >>>>>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct >>>>>> lttng_consumer_stream *stream, >>>>>> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); >>>>>> rcu_read_unlock(); >>>>>> >>>>>> -error: >>>>>> pthread_mutex_unlock(&consumer_data.lock); >>>>>> return ret; >>>>>> } >>>>>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data) >>>>>> int num_rdy, num_hup, high_prio, ret, i; >>>>>> struct pollfd *pollfd = NULL; >>>>>> /* local view of the streams */ >>>>>> - struct lttng_consumer_stream **local_stream = NULL; >>>>>> + struct lttng_consumer_stream **local_stream = NULL, *new_stream >>>>>> = NULL; >>>>>> /* local view of consumer_data.fds_count */ >>>>>> int nb_fd = 0; >>>>>> struct lttng_consumer_local_data *ctx = data; >>>>>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data) >>>>>> */ >>>>>> if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { >>>>>> size_t pipe_readlen; >>>>>> - char tmp; >>>>>> >>>>>> DBG("consumer_poll_pipe wake up"); >>>>>> /* Consume 1 byte of pipe data */ >>>>>> do { >>>>>> - pipe_readlen = >>>>>> read(ctx->consumer_poll_pipe[0], &tmp, 1); >>>>>> + pipe_readlen = >>>>>> read(ctx->consumer_poll_pipe[0], &new_stream, >>>>>> + sizeof(new_stream)); >>>>>> } while (pipe_readlen == -1 && errno == EINTR); >>>>>> + >>>>>> + /* >>>>>> + * If the stream is NULL, just ignore it. It's >>>>>> also possible that >>>>>> + * the sessiond poll thread changed the >>>>>> consumer_quit state and is >>>>>> + * waking us up to test it. >>>>>> + */ >>>>>> + if (new_stream == NULL) { >>>>>> + continue; >>>>>> + } >>>>>> + >>>>>> + ret = consumer_add_stream(new_stream); >>>>>> + if (ret) { >>>>>> + ERR("Consumer add stream %d failed. >>>>>> Continuing", >>>>>> + new_stream->key); >>>>>> + /* >>>>>> + * At this point, if the add_stream >>>>>> fails, it is not in the >>>>>> + * hash table thus passing the NULL >>>>>> value here. >>>>>> + */ >>>>>> + consumer_del_stream(new_stream, NULL); >>>>>> + } >>>>>> + >>>>>> + /* Continue to update the local streams and >>>>>> handle prio ones */ >>>>>> continue; >>>>>> } >>>>>> >>>>>> @@ -2260,19 +2246,16 @@ end: >>>>>> consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; >>>>>> >>>>>> /* >>>>>> - * Wake-up the other end by writing a null byte in the pipe >>>>>> - * (non-blocking). Important note: Because writing into the >>>>>> - * pipe is non-blocking (and therefore we allow dropping wakeup >>>>>> - * data, as long as there is wakeup data present in the pipe >>>>>> - * buffer to wake up the other end), the other end should >>>>>> - * perform the following sequence for waiting: >>>>>> - * 1) empty the pipe (reads). >>>>>> - * 2) perform update operation. >>>>>> - * 3) wait on the pipe (poll). >>>>>> + * Notify the data poll thread to poll back again and test the >>>>>> + * consumer_quit state to quit gracefully. >>>>>> */ >>>>>> do { >>>>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); >>>>>> + struct lttng_consumer_stream *null_stream = NULL; >>>>>> + >>>>>> + ret = write(ctx->consumer_poll_pipe[1], &null_stream, >>>>>> + sizeof(null_stream)); >>>>>> } while (ret < 0 && errno == EINTR); >>>>>> + >>>>>> rcu_unregister_thread(); >>>>>> return NULL; >>>>>> } >>>>>> diff --git a/src/common/consumer.h b/src/common/consumer.h >>>>>> index 4b225e4..8e5891a 100644 >>>>>> --- a/src/common/consumer.h >>>>>> +++ b/src/common/consumer.h >>>>>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair >>>>>> *consumer_allocate_relayd_sock_pair( >>>>>> struct consumer_relayd_sock_pair *consumer_find_relayd(int key); >>>>>> int consumer_handle_stream_before_relayd(struct lttng_consumer_stream >>>>>> *stream, >>>>>> size_t data_size); >>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht); >>>>>> >>>>>> extern struct lttng_consumer_local_data *lttng_consumer_create( >>>>>> enum lttng_consumer_type type, >>>>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c >>>>>> b/src/common/kernel-consumer/kernel-consumer.c >>>>>> index 13cbe21..444f5e0 100644 >>>>>> --- a/src/common/kernel-consumer/kernel-consumer.c >>>>>> +++ b/src/common/kernel-consumer/kernel-consumer.c >>>>>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct >>>>>> lttng_consumer_local_data *ctx, >>>>>> consumer_del_stream(new_stream, NULL); >>>>>> } >>>>>> } else { >>>>>> - ret = consumer_add_stream(new_stream); >>>>>> - if (ret) { >>>>>> - ERR("Consumer add stream %d failed. >>>>>> Continuing", >>>>>> - new_stream->key); >>>>>> + do { >>>>>> + ret = write(ctx->consumer_poll_pipe[1], >>>>>> &new_stream, >>>>>> + sizeof(new_stream)); >>>>>> + } while (ret < 0 && errno == EINTR); >>>>>> + if (ret < 0) { >>>>>> + PERROR("write data pipe"); >>>>>> consumer_del_stream(new_stream, NULL); >>>>>> goto end_nosignal; >>>>>> } >>>>>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct >>>>>> lttng_consumer_local_data *ctx, >>>>>> goto end_nosignal; >>>>>> } >>>>>> >>>>>> - /* >>>>>> - * Wake-up the other end by writing a null byte in the pipe >>>>>> (non-blocking). >>>>>> - * Important note: Because writing into the pipe is >>>>>> non-blocking (and >>>>>> - * therefore we allow dropping wakeup data, as long as there is >>>>>> wakeup data >>>>>> - * present in the pipe buffer to wake up the other end), the >>>>>> other end >>>>>> - * should perform the following sequence for waiting: >>>>>> - * >>>>>> - * 1) empty the pipe (reads). >>>>>> - * 2) perform update operation. >>>>>> - * 3) wait on the pipe (poll). >>>>>> - */ >>>>>> - do { >>>>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); >>>>>> - } while (ret < 0 && errno == EINTR); >>>>>> end_nosignal: >>>>>> rcu_read_unlock(); >>>>>> >>>>>> diff --git a/src/common/ust-consumer/ust-consumer.c >>>>>> b/src/common/ust-consumer/ust-consumer.c >>>>>> index 1170687..4ca4b84 100644 >>>>>> --- a/src/common/ust-consumer/ust-consumer.c >>>>>> +++ b/src/common/ust-consumer/ust-consumer.c >>>>>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct >>>>>> lttng_consumer_local_data *ctx, >>>>>> goto end_nosignal; >>>>>> } >>>>>> >>>>>> + /* >>>>>> + * This needs to be done as soon as we can so we don't >>>>>> block the >>>>>> + * application too long. >>>>>> + */ >>>>>> + ret = lttng_ustconsumer_add_stream(new_stream); >>>>>> + if (ret) { >>>>>> + consumer_del_stream(new_stream, NULL); >>>>>> + goto end_nosignal; >>>>>> + } >>>>>> + /* Steal stream identifier to avoid having streams with >>>>>> the same key */ >>>>>> + consumer_steal_stream_key(new_stream->key, >>>>>> consumer_data.stream_ht); >>>>>> + >>>>>> /* The stream is not metadata. Get relayd reference if >>>>>> exists. */ >>>>>> relayd = consumer_find_relayd(msg.u.stream.net_index); >>>>>> if (relayd != NULL) { >>>>>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct >>>>>> lttng_consumer_local_data *ctx, >>>>>> goto end_nosignal; >>>>>> } >>>>>> } else { >>>>>> - ret = consumer_add_stream(new_stream); >>>>>> - if (ret) { >>>>>> - ERR("Consumer add stream %d failed. >>>>>> Continuing", >>>>>> - new_stream->key); >>>>>> - /* >>>>>> - * At this point, if the add_stream >>>>>> fails, it is not in the >>>>>> - * hash table thus passing the NULL >>>>>> value here. >>>>>> - */ >>>>>> + do { >>>>>> + ret = write(ctx->consumer_poll_pipe[1], >>>>>> &new_stream, >>>>>> + sizeof(new_stream)); >>>>>> + } while (ret < 0 && errno == EINTR); >>>>>> + if (ret < 0) { >>>>>> + PERROR("write data pipe"); >>>>>> consumer_del_stream(new_stream, NULL); >>>>>> goto end_nosignal; >>>>>> } >>>>>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct >>>>>> lttng_consumer_local_data *ctx, >>>>>> break; >>>>>> } >>>>>> >>>>>> - /* >>>>>> - * Wake-up the other end by writing a null byte in the pipe >>>>>> (non-blocking). >>>>>> - * Important note: Because writing into the pipe is >>>>>> non-blocking (and >>>>>> - * therefore we allow dropping wakeup data, as long as there is >>>>>> wakeup data >>>>>> - * present in the pipe buffer to wake up the other end), the >>>>>> other end >>>>>> - * should perform the following sequence for waiting: >>>>>> - * >>>>>> - * 1) empty the pipe (reads). >>>>>> - * 2) perform update operation. >>>>>> - * 3) wait on the pipe (poll). >>>>>> - */ >>>>>> - do { >>>>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); >>>>>> - } while (ret < 0 && errno == EINTR); >>>>>> end_nosignal: >>>>>> rcu_read_unlock(); >>>>>> >>>>>> -- >>>>>> 1.7.10.4 >>>>>> >>>>>> >>>>>> _______________________________________________ >>>>>> lttng-dev mailing list >>>>>> [email protected] >>>>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev >>>>> >>> > _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
