* David Goulet ([email protected]) wrote: > > > Mathieu Desnoyers: > > * David Goulet ([email protected]) wrote: > >> > >> > >> Mathieu Desnoyers: > >>> * David Goulet ([email protected]) wrote: > >>>> > >>>> > >>>> Mathieu Desnoyers: > >>>>> * David Goulet ([email protected]) wrote: > >>>>>> As a second step of refactoring, upon receiving a data stream, we send > >>>>>> it to the data thread that is now in charge of handling it. > >>>>>> > >>>>>> Furthermore, in order for this to behave correctly, we have to make the > >>>>>> ustctl actions on the stream upon before passing it to the right thread > >>>>>> (the kernel does not need special actions.). This way, once the > >>>>>> sessiond > >>>>>> thread reply back to the session daemon, the stream is sure to be open > >>>>>> and ready for data to be recorded on the application side so we avoid a > >>>>>> race between the application thinking the stream is ready and the > >>>>>> stream > >>>>>> thread still scheduled out. > >>>>> > >>>>> Normally, as long as we have a reference on the SHM file descriptor, and > >>>>> we have the wakeup FD, we should be good to fetch the data of buffers > >>>>> belonging to an application that has already exited, even if it did so > >>>>> before the ustctl calls are done. > >>>>> > >>>>> So I'm wondering why you do the ustctl calls in the sessiond thread ? It > >>>>> seems to complexify the implementation needlessly: we could still do the > >>>>> ustctl calls and output file open at the same location, the > >>>>> data/metadata threads. > >>>> > >>>> Hmmm, it was my understanding that does > >>> > >>> does -> those > >>> > >>>> ustctl_* calls were needed > >>>> before the trace could be recording thus making them quickly. Wrong? > >>> > >>> Can you rephrase your question ? I don't understand. > >>> > >> > >> My understanding was that _those_ ustctl calls need to be done before > >> the tracer could start recording data. This is why they were moved to > >> the session daemon thread. > >> > >> Am I wrong here? When receiving an UST stream< on the consumer side, is > >> the SHM reference already acquired? > > > > yes, the reference to shm is already acquired: it's the FD that _has_ > > the reference. > > Ok good. So just to be crystal clear here, the ustctl* calls can be > delayed and done in the right thread? (data/metadata).
yes, I expect it should work. Mathieu > > David > > > > > Thanks, > > > > Mathieu > > > >> > >> David > >> > >>> Thanks, > >>> > >>> Mathieu > >>> > >>>> > >>>> David > >>>> > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Mathieu > >>>>> > >>>>>> > >>>>>> This commit should speed up the add stream process for the session > >>>>>> daemon. There is still some actions to move out of the session daemon > >>>>>> poll thread to gain speed significantly, especially for network > >>>>>> streaming. > >>>>>> > >>>>>> Signed-off-by: David Goulet <[email protected]> > >>>>>> --- > >>>>>> src/common/consumer.c | 123 > >>>>>> +++++++++++--------------- > >>>>>> src/common/consumer.h | 1 + > >>>>>> src/common/kernel-consumer/kernel-consumer.c | 24 ++--- > >>>>>> src/common/ust-consumer/ust-consumer.c | 40 ++++----- > >>>>>> 4 files changed, 78 insertions(+), 110 deletions(-) > >>>>>> > >>>>>> diff --git a/src/common/consumer.c b/src/common/consumer.c > >>>>>> index 055de1b..1d2b1f7 100644 > >>>>>> --- a/src/common/consumer.c > >>>>>> +++ b/src/common/consumer.c > >>>>>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream > >>>>>> *consumer_find_stream(int key, > >>>>>> return stream; > >>>>>> } > >>>>>> > >>>>>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) > >>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht) > >>>>>> { > >>>>>> struct lttng_consumer_stream *stream; > >>>>>> > >>>>>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream > >>>>>> *consumer_allocate_stream( > >>>>>> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); > >>>>>> lttng_ht_node_init_ulong(&stream->node, stream->key); > >>>>>> > >>>>>> + /* > >>>>>> + * The cpu number is needed before using any ustctl_* actions. > >>>>>> Ignored for > >>>>>> + * the kernel so the value does not matter. > >>>>>> + */ > >>>>>> + pthread_mutex_lock(&consumer_data.lock); > >>>>>> + stream->cpu = stream->chan->cpucount++; > >>>>>> + pthread_mutex_unlock(&consumer_data.lock); > >>>>>> + > >>>>>> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, > >>>>>> mmap_len %llu," > >>>>>> " out_fd %d, net_seq_idx %d)", > >>>>>> stream->path_name, stream->key, > >>>>>> stream->shm_fd, stream->wait_fd, > >>>>>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct > >>>>>> lttng_consumer_stream *stream) > >>>>>> pthread_mutex_lock(&consumer_data.lock); > >>>>>> rcu_read_lock(); > >>>>>> > >>>>>> - switch (consumer_data.type) { > >>>>>> - case LTTNG_CONSUMER_KERNEL: > >>>>>> - break; > >>>>>> - case LTTNG_CONSUMER32_UST: > >>>>>> - case LTTNG_CONSUMER64_UST: > >>>>>> - stream->cpu = stream->chan->cpucount++; > >>>>>> - ret = lttng_ustconsumer_add_stream(stream); > >>>>>> - if (ret) { > >>>>>> - ret = -EINVAL; > >>>>>> - goto error; > >>>>>> - } > >>>>>> - > >>>>>> - /* Steal stream identifier only for UST */ > >>>>>> - consumer_steal_stream_key(stream->key, > >>>>>> consumer_data.stream_ht); > >>>>>> - break; > >>>>>> - default: > >>>>>> - ERR("Unknown consumer_data type"); > >>>>>> - assert(0); > >>>>>> - ret = -ENOSYS; > >>>>>> - goto error; > >>>>>> - } > >>>>>> - > >>>>>> lttng_ht_add_unique_ulong(consumer_data.stream_ht, > >>>>>> &stream->node); > >>>>>> > >>>>>> /* Check and cleanup relayd */ > >>>>>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct > >>>>>> lttng_consumer_stream *stream) > >>>>>> consumer_data.stream_count++; > >>>>>> consumer_data.need_update = 1; > >>>>>> > >>>>>> -error: > >>>>>> rcu_read_unlock(); > >>>>>> pthread_mutex_unlock(&consumer_data.lock); > >>>>>> > >>>>>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct > >>>>>> lttng_consumer_stream *stream, > >>>>>> > >>>>>> DBG3("Consumer delete metadata stream %d", stream->wait_fd); > >>>>>> > >>>>>> - if (ht == NULL) { > >>>>>> - /* Means the stream was allocated but not successfully > >>>>>> added */ > >>>>>> - goto free_stream; > >>>>>> - } > >>>>>> - > >>>>>> - rcu_read_lock(); > >>>>>> - iter.iter.node = &stream->waitfd_node.node; > >>>>>> - ret = lttng_ht_del(ht, &iter); > >>>>>> - assert(!ret); > >>>>>> - rcu_read_unlock(); > >>>>>> - > >>>>>> pthread_mutex_lock(&consumer_data.lock); > >>>>>> switch (consumer_data.type) { > >>>>>> case LTTNG_CONSUMER_KERNEL: > >>>>>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct > >>>>>> lttng_consumer_stream *stream, > >>>>>> goto end; > >>>>>> } > >>>>>> > >>>>>> + if (ht == NULL) { > >>>>>> + pthread_mutex_unlock(&consumer_data.lock); > >>>>>> + /* Means the stream was allocated but not successfully > >>>>>> added */ > >>>>>> + goto free_stream; > >>>>>> + } > >>>>>> + > >>>>>> + rcu_read_lock(); > >>>>>> + iter.iter.node = &stream->waitfd_node.node; > >>>>>> + ret = lttng_ht_del(ht, &iter); > >>>>>> + assert(!ret); > >>>>>> + rcu_read_unlock(); > >>>>>> + > >>>>>> if (stream->out_fd >= 0) { > >>>>>> ret = close(stream->out_fd); > >>>>>> if (ret) { > >>>>>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct > >>>>>> lttng_consumer_stream *stream, > >>>>>> > >>>>>> pthread_mutex_lock(&consumer_data.lock); > >>>>>> > >>>>>> - switch (consumer_data.type) { > >>>>>> - case LTTNG_CONSUMER_KERNEL: > >>>>>> - break; > >>>>>> - case LTTNG_CONSUMER32_UST: > >>>>>> - case LTTNG_CONSUMER64_UST: > >>>>>> - ret = lttng_ustconsumer_add_stream(stream); > >>>>>> - if (ret) { > >>>>>> - ret = -EINVAL; > >>>>>> - goto error; > >>>>>> - } > >>>>>> - > >>>>>> - /* Steal stream identifier only for UST */ > >>>>>> - consumer_steal_stream_key(stream->wait_fd, ht); > >>>>>> - break; > >>>>>> - default: > >>>>>> - ERR("Unknown consumer_data type"); > >>>>>> - assert(0); > >>>>>> - ret = -ENOSYS; > >>>>>> - goto error; > >>>>>> - } > >>>>>> - > >>>>>> /* > >>>>>> * From here, refcounts are updated so be _careful_ when > >>>>>> returning an error > >>>>>> * after this point. > >>>>>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct > >>>>>> lttng_consumer_stream *stream, > >>>>>> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); > >>>>>> rcu_read_unlock(); > >>>>>> > >>>>>> -error: > >>>>>> pthread_mutex_unlock(&consumer_data.lock); > >>>>>> return ret; > >>>>>> } > >>>>>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data) > >>>>>> int num_rdy, num_hup, high_prio, ret, i; > >>>>>> struct pollfd *pollfd = NULL; > >>>>>> /* local view of the streams */ > >>>>>> - struct lttng_consumer_stream **local_stream = NULL; > >>>>>> + struct lttng_consumer_stream **local_stream = NULL, *new_stream > >>>>>> = NULL; > >>>>>> /* local view of consumer_data.fds_count */ > >>>>>> int nb_fd = 0; > >>>>>> struct lttng_consumer_local_data *ctx = data; > >>>>>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data) > >>>>>> */ > >>>>>> if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { > >>>>>> size_t pipe_readlen; > >>>>>> - char tmp; > >>>>>> > >>>>>> DBG("consumer_poll_pipe wake up"); > >>>>>> /* Consume 1 byte of pipe data */ > >>>>>> do { > >>>>>> - pipe_readlen = > >>>>>> read(ctx->consumer_poll_pipe[0], &tmp, 1); > >>>>>> + pipe_readlen = > >>>>>> read(ctx->consumer_poll_pipe[0], &new_stream, > >>>>>> + sizeof(new_stream)); > >>>>>> } while (pipe_readlen == -1 && errno == EINTR); > >>>>>> + > >>>>>> + /* > >>>>>> + * If the stream is NULL, just ignore it. It's > >>>>>> also possible that > >>>>>> + * the sessiond poll thread changed the > >>>>>> consumer_quit state and is > >>>>>> + * waking us up to test it. > >>>>>> + */ > >>>>>> + if (new_stream == NULL) { > >>>>>> + continue; > >>>>>> + } > >>>>>> + > >>>>>> + ret = consumer_add_stream(new_stream); > >>>>>> + if (ret) { > >>>>>> + ERR("Consumer add stream %d failed. > >>>>>> Continuing", > >>>>>> + new_stream->key); > >>>>>> + /* > >>>>>> + * At this point, if the add_stream > >>>>>> fails, it is not in the > >>>>>> + * hash table thus passing the NULL > >>>>>> value here. > >>>>>> + */ > >>>>>> + consumer_del_stream(new_stream, NULL); > >>>>>> + } > >>>>>> + > >>>>>> + /* Continue to update the local streams and > >>>>>> handle prio ones */ > >>>>>> continue; > >>>>>> } > >>>>>> > >>>>>> @@ -2260,19 +2246,16 @@ end: > >>>>>> consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; > >>>>>> > >>>>>> /* > >>>>>> - * Wake-up the other end by writing a null byte in the pipe > >>>>>> - * (non-blocking). Important note: Because writing into the > >>>>>> - * pipe is non-blocking (and therefore we allow dropping wakeup > >>>>>> - * data, as long as there is wakeup data present in the pipe > >>>>>> - * buffer to wake up the other end), the other end should > >>>>>> - * perform the following sequence for waiting: > >>>>>> - * 1) empty the pipe (reads). > >>>>>> - * 2) perform update operation. > >>>>>> - * 3) wait on the pipe (poll). > >>>>>> + * Notify the data poll thread to poll back again and test the > >>>>>> + * consumer_quit state to quit gracefully. > >>>>>> */ > >>>>>> do { > >>>>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >>>>>> + struct lttng_consumer_stream *null_stream = NULL; > >>>>>> + > >>>>>> + ret = write(ctx->consumer_poll_pipe[1], &null_stream, > >>>>>> + sizeof(null_stream)); > >>>>>> } while (ret < 0 && errno == EINTR); > >>>>>> + > >>>>>> rcu_unregister_thread(); > >>>>>> return NULL; > >>>>>> } > >>>>>> diff --git a/src/common/consumer.h b/src/common/consumer.h > >>>>>> index 4b225e4..8e5891a 100644 > >>>>>> --- a/src/common/consumer.h > >>>>>> +++ b/src/common/consumer.h > >>>>>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair > >>>>>> *consumer_allocate_relayd_sock_pair( > >>>>>> struct consumer_relayd_sock_pair *consumer_find_relayd(int key); > >>>>>> int consumer_handle_stream_before_relayd(struct lttng_consumer_stream > >>>>>> *stream, > >>>>>> size_t data_size); > >>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht); > >>>>>> > >>>>>> extern struct lttng_consumer_local_data *lttng_consumer_create( > >>>>>> enum lttng_consumer_type type, > >>>>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c > >>>>>> b/src/common/kernel-consumer/kernel-consumer.c > >>>>>> index 13cbe21..444f5e0 100644 > >>>>>> --- a/src/common/kernel-consumer/kernel-consumer.c > >>>>>> +++ b/src/common/kernel-consumer/kernel-consumer.c > >>>>>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct > >>>>>> lttng_consumer_local_data *ctx, > >>>>>> consumer_del_stream(new_stream, NULL); > >>>>>> } > >>>>>> } else { > >>>>>> - ret = consumer_add_stream(new_stream); > >>>>>> - if (ret) { > >>>>>> - ERR("Consumer add stream %d failed. > >>>>>> Continuing", > >>>>>> - new_stream->key); > >>>>>> + do { > >>>>>> + ret = write(ctx->consumer_poll_pipe[1], > >>>>>> &new_stream, > >>>>>> + sizeof(new_stream)); > >>>>>> + } while (ret < 0 && errno == EINTR); > >>>>>> + if (ret < 0) { > >>>>>> + PERROR("write data pipe"); > >>>>>> consumer_del_stream(new_stream, NULL); > >>>>>> goto end_nosignal; > >>>>>> } > >>>>>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct > >>>>>> lttng_consumer_local_data *ctx, > >>>>>> goto end_nosignal; > >>>>>> } > >>>>>> > >>>>>> - /* > >>>>>> - * Wake-up the other end by writing a null byte in the pipe > >>>>>> (non-blocking). > >>>>>> - * Important note: Because writing into the pipe is > >>>>>> non-blocking (and > >>>>>> - * therefore we allow dropping wakeup data, as long as there is > >>>>>> wakeup data > >>>>>> - * present in the pipe buffer to wake up the other end), the > >>>>>> other end > >>>>>> - * should perform the following sequence for waiting: > >>>>>> - * > >>>>>> - * 1) empty the pipe (reads). > >>>>>> - * 2) perform update operation. > >>>>>> - * 3) wait on the pipe (poll). > >>>>>> - */ > >>>>>> - do { > >>>>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >>>>>> - } while (ret < 0 && errno == EINTR); > >>>>>> end_nosignal: > >>>>>> rcu_read_unlock(); > >>>>>> > >>>>>> diff --git a/src/common/ust-consumer/ust-consumer.c > >>>>>> b/src/common/ust-consumer/ust-consumer.c > >>>>>> index 1170687..4ca4b84 100644 > >>>>>> --- a/src/common/ust-consumer/ust-consumer.c > >>>>>> +++ b/src/common/ust-consumer/ust-consumer.c > >>>>>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct > >>>>>> lttng_consumer_local_data *ctx, > >>>>>> goto end_nosignal; > >>>>>> } > >>>>>> > >>>>>> + /* > >>>>>> + * This needs to be done as soon as we can so we don't > >>>>>> block the > >>>>>> + * application too long. > >>>>>> + */ > >>>>>> + ret = lttng_ustconsumer_add_stream(new_stream); > >>>>>> + if (ret) { > >>>>>> + consumer_del_stream(new_stream, NULL); > >>>>>> + goto end_nosignal; > >>>>>> + } > >>>>>> + /* Steal stream identifier to avoid having streams with > >>>>>> the same key */ > >>>>>> + consumer_steal_stream_key(new_stream->key, > >>>>>> consumer_data.stream_ht); > >>>>>> + > >>>>>> /* The stream is not metadata. Get relayd reference if > >>>>>> exists. */ > >>>>>> relayd = consumer_find_relayd(msg.u.stream.net_index); > >>>>>> if (relayd != NULL) { > >>>>>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct > >>>>>> lttng_consumer_local_data *ctx, > >>>>>> goto end_nosignal; > >>>>>> } > >>>>>> } else { > >>>>>> - ret = consumer_add_stream(new_stream); > >>>>>> - if (ret) { > >>>>>> - ERR("Consumer add stream %d failed. > >>>>>> Continuing", > >>>>>> - new_stream->key); > >>>>>> - /* > >>>>>> - * At this point, if the add_stream > >>>>>> fails, it is not in the > >>>>>> - * hash table thus passing the NULL > >>>>>> value here. > >>>>>> - */ > >>>>>> + do { > >>>>>> + ret = write(ctx->consumer_poll_pipe[1], > >>>>>> &new_stream, > >>>>>> + sizeof(new_stream)); > >>>>>> + } while (ret < 0 && errno == EINTR); > >>>>>> + if (ret < 0) { > >>>>>> + PERROR("write data pipe"); > >>>>>> consumer_del_stream(new_stream, NULL); > >>>>>> goto end_nosignal; > >>>>>> } > >>>>>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct > >>>>>> lttng_consumer_local_data *ctx, > >>>>>> break; > >>>>>> } > >>>>>> > >>>>>> - /* > >>>>>> - * Wake-up the other end by writing a null byte in the pipe > >>>>>> (non-blocking). > >>>>>> - * Important note: Because writing into the pipe is > >>>>>> non-blocking (and > >>>>>> - * therefore we allow dropping wakeup data, as long as there is > >>>>>> wakeup data > >>>>>> - * present in the pipe buffer to wake up the other end), the > >>>>>> other end > >>>>>> - * should perform the following sequence for waiting: > >>>>>> - * > >>>>>> - * 1) empty the pipe (reads). > >>>>>> - * 2) perform update operation. > >>>>>> - * 3) wait on the pipe (poll). > >>>>>> - */ > >>>>>> - do { > >>>>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >>>>>> - } while (ret < 0 && errno == EINTR); > >>>>>> end_nosignal: > >>>>>> rcu_read_unlock(); > >>>>>> > >>>>>> -- > >>>>>> 1.7.10.4 > >>>>>> > >>>>>> > >>>>>> _______________________________________________ > >>>>>> lttng-dev mailing list > >>>>>> [email protected] > >>>>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev > >>>>> > >>> > > -- Mathieu Desnoyers Operating System Efficiency R&D Consultant EfficiOS Inc. http://www.efficios.com _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
