* David Goulet ([email protected]) wrote: > > > Mathieu Desnoyers: > > * David Goulet ([email protected]) wrote: > >> > >> > >> Mathieu Desnoyers: > >>> * David Goulet ([email protected]) wrote: > >>>> As a second step of refactoring, upon receiving a data stream, we send > >>>> it to the data thread that is now in charge of handling it. > >>>> > >>>> Furthermore, in order for this to behave correctly, we have to make the > >>>> ustctl actions on the stream upon before passing it to the right thread > >>>> (the kernel does not need special actions.). This way, once the sessiond > >>>> thread reply back to the session daemon, the stream is sure to be open > >>>> and ready for data to be recorded on the application side so we avoid a > >>>> race between the application thinking the stream is ready and the stream > >>>> thread still scheduled out. > >>> > >>> Normally, as long as we have a reference on the SHM file descriptor, and > >>> we have the wakeup FD, we should be good to fetch the data of buffers > >>> belonging to an application that has already exited, even if it did so > >>> before the ustctl calls are done. > >>> > >>> So I'm wondering why you do the ustctl calls in the sessiond thread ? It > >>> seems to complexify the implementation needlessly: we could still do the > >>> ustctl calls and output file open at the same location, the > >>> data/metadata threads. > >> > >> Hmmm, it was my understanding that does > > > > does -> those > > > >> ustctl_* calls were needed > >> before the trace could be recording thus making them quickly. Wrong? > > > > Can you rephrase your question ? I don't understand. > > > > My understanding was that _those_ ustctl calls need to be done before > the tracer could start recording data. This is why they were moved to > the session daemon thread. > > Am I wrong here? When receiving an UST stream< on the consumer side, is > the SHM reference already acquired?
yes, the reference to shm is already acquired: it's the FD that _has_ the reference. Thanks, Mathieu > > David > > > Thanks, > > > > Mathieu > > > >> > >> David > >> > >>> > >>> Thanks, > >>> > >>> Mathieu > >>> > >>>> > >>>> This commit should speed up the add stream process for the session > >>>> daemon. There is still some actions to move out of the session daemon > >>>> poll thread to gain speed significantly, especially for network > >>>> streaming. > >>>> > >>>> Signed-off-by: David Goulet <[email protected]> > >>>> --- > >>>> src/common/consumer.c | 123 > >>>> +++++++++++--------------- > >>>> src/common/consumer.h | 1 + > >>>> src/common/kernel-consumer/kernel-consumer.c | 24 ++--- > >>>> src/common/ust-consumer/ust-consumer.c | 40 ++++----- > >>>> 4 files changed, 78 insertions(+), 110 deletions(-) > >>>> > >>>> diff --git a/src/common/consumer.c b/src/common/consumer.c > >>>> index 055de1b..1d2b1f7 100644 > >>>> --- a/src/common/consumer.c > >>>> +++ b/src/common/consumer.c > >>>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream > >>>> *consumer_find_stream(int key, > >>>> return stream; > >>>> } > >>>> > >>>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) > >>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht) > >>>> { > >>>> struct lttng_consumer_stream *stream; > >>>> > >>>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream > >>>> *consumer_allocate_stream( > >>>> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); > >>>> lttng_ht_node_init_ulong(&stream->node, stream->key); > >>>> > >>>> + /* > >>>> + * The cpu number is needed before using any ustctl_* actions. > >>>> Ignored for > >>>> + * the kernel so the value does not matter. > >>>> + */ > >>>> + pthread_mutex_lock(&consumer_data.lock); > >>>> + stream->cpu = stream->chan->cpucount++; > >>>> + pthread_mutex_unlock(&consumer_data.lock); > >>>> + > >>>> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, > >>>> mmap_len %llu," > >>>> " out_fd %d, net_seq_idx %d)", > >>>> stream->path_name, stream->key, > >>>> stream->shm_fd, stream->wait_fd, > >>>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct > >>>> lttng_consumer_stream *stream) > >>>> pthread_mutex_lock(&consumer_data.lock); > >>>> rcu_read_lock(); > >>>> > >>>> - switch (consumer_data.type) { > >>>> - case LTTNG_CONSUMER_KERNEL: > >>>> - break; > >>>> - case LTTNG_CONSUMER32_UST: > >>>> - case LTTNG_CONSUMER64_UST: > >>>> - stream->cpu = stream->chan->cpucount++; > >>>> - ret = lttng_ustconsumer_add_stream(stream); > >>>> - if (ret) { > >>>> - ret = -EINVAL; > >>>> - goto error; > >>>> - } > >>>> - > >>>> - /* Steal stream identifier only for UST */ > >>>> - consumer_steal_stream_key(stream->key, > >>>> consumer_data.stream_ht); > >>>> - break; > >>>> - default: > >>>> - ERR("Unknown consumer_data type"); > >>>> - assert(0); > >>>> - ret = -ENOSYS; > >>>> - goto error; > >>>> - } > >>>> - > >>>> lttng_ht_add_unique_ulong(consumer_data.stream_ht, > >>>> &stream->node); > >>>> > >>>> /* Check and cleanup relayd */ > >>>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream > >>>> *stream) > >>>> consumer_data.stream_count++; > >>>> consumer_data.need_update = 1; > >>>> > >>>> -error: > >>>> rcu_read_unlock(); > >>>> pthread_mutex_unlock(&consumer_data.lock); > >>>> > >>>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct > >>>> lttng_consumer_stream *stream, > >>>> > >>>> DBG3("Consumer delete metadata stream %d", stream->wait_fd); > >>>> > >>>> - if (ht == NULL) { > >>>> - /* Means the stream was allocated but not successfully > >>>> added */ > >>>> - goto free_stream; > >>>> - } > >>>> - > >>>> - rcu_read_lock(); > >>>> - iter.iter.node = &stream->waitfd_node.node; > >>>> - ret = lttng_ht_del(ht, &iter); > >>>> - assert(!ret); > >>>> - rcu_read_unlock(); > >>>> - > >>>> pthread_mutex_lock(&consumer_data.lock); > >>>> switch (consumer_data.type) { > >>>> case LTTNG_CONSUMER_KERNEL: > >>>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct > >>>> lttng_consumer_stream *stream, > >>>> goto end; > >>>> } > >>>> > >>>> + if (ht == NULL) { > >>>> + pthread_mutex_unlock(&consumer_data.lock); > >>>> + /* Means the stream was allocated but not successfully > >>>> added */ > >>>> + goto free_stream; > >>>> + } > >>>> + > >>>> + rcu_read_lock(); > >>>> + iter.iter.node = &stream->waitfd_node.node; > >>>> + ret = lttng_ht_del(ht, &iter); > >>>> + assert(!ret); > >>>> + rcu_read_unlock(); > >>>> + > >>>> if (stream->out_fd >= 0) { > >>>> ret = close(stream->out_fd); > >>>> if (ret) { > >>>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct > >>>> lttng_consumer_stream *stream, > >>>> > >>>> pthread_mutex_lock(&consumer_data.lock); > >>>> > >>>> - switch (consumer_data.type) { > >>>> - case LTTNG_CONSUMER_KERNEL: > >>>> - break; > >>>> - case LTTNG_CONSUMER32_UST: > >>>> - case LTTNG_CONSUMER64_UST: > >>>> - ret = lttng_ustconsumer_add_stream(stream); > >>>> - if (ret) { > >>>> - ret = -EINVAL; > >>>> - goto error; > >>>> - } > >>>> - > >>>> - /* Steal stream identifier only for UST */ > >>>> - consumer_steal_stream_key(stream->wait_fd, ht); > >>>> - break; > >>>> - default: > >>>> - ERR("Unknown consumer_data type"); > >>>> - assert(0); > >>>> - ret = -ENOSYS; > >>>> - goto error; > >>>> - } > >>>> - > >>>> /* > >>>> * From here, refcounts are updated so be _careful_ when > >>>> returning an error > >>>> * after this point. > >>>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct > >>>> lttng_consumer_stream *stream, > >>>> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); > >>>> rcu_read_unlock(); > >>>> > >>>> -error: > >>>> pthread_mutex_unlock(&consumer_data.lock); > >>>> return ret; > >>>> } > >>>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data) > >>>> int num_rdy, num_hup, high_prio, ret, i; > >>>> struct pollfd *pollfd = NULL; > >>>> /* local view of the streams */ > >>>> - struct lttng_consumer_stream **local_stream = NULL; > >>>> + struct lttng_consumer_stream **local_stream = NULL, *new_stream > >>>> = NULL; > >>>> /* local view of consumer_data.fds_count */ > >>>> int nb_fd = 0; > >>>> struct lttng_consumer_local_data *ctx = data; > >>>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data) > >>>> */ > >>>> if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { > >>>> size_t pipe_readlen; > >>>> - char tmp; > >>>> > >>>> DBG("consumer_poll_pipe wake up"); > >>>> /* Consume 1 byte of pipe data */ > >>>> do { > >>>> - pipe_readlen = > >>>> read(ctx->consumer_poll_pipe[0], &tmp, 1); > >>>> + pipe_readlen = > >>>> read(ctx->consumer_poll_pipe[0], &new_stream, > >>>> + sizeof(new_stream)); > >>>> } while (pipe_readlen == -1 && errno == EINTR); > >>>> + > >>>> + /* > >>>> + * If the stream is NULL, just ignore it. It's > >>>> also possible that > >>>> + * the sessiond poll thread changed the > >>>> consumer_quit state and is > >>>> + * waking us up to test it. > >>>> + */ > >>>> + if (new_stream == NULL) { > >>>> + continue; > >>>> + } > >>>> + > >>>> + ret = consumer_add_stream(new_stream); > >>>> + if (ret) { > >>>> + ERR("Consumer add stream %d failed. > >>>> Continuing", > >>>> + new_stream->key); > >>>> + /* > >>>> + * At this point, if the add_stream > >>>> fails, it is not in the > >>>> + * hash table thus passing the NULL > >>>> value here. > >>>> + */ > >>>> + consumer_del_stream(new_stream, NULL); > >>>> + } > >>>> + > >>>> + /* Continue to update the local streams and > >>>> handle prio ones */ > >>>> continue; > >>>> } > >>>> > >>>> @@ -2260,19 +2246,16 @@ end: > >>>> consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; > >>>> > >>>> /* > >>>> - * Wake-up the other end by writing a null byte in the pipe > >>>> - * (non-blocking). Important note: Because writing into the > >>>> - * pipe is non-blocking (and therefore we allow dropping wakeup > >>>> - * data, as long as there is wakeup data present in the pipe > >>>> - * buffer to wake up the other end), the other end should > >>>> - * perform the following sequence for waiting: > >>>> - * 1) empty the pipe (reads). > >>>> - * 2) perform update operation. > >>>> - * 3) wait on the pipe (poll). > >>>> + * Notify the data poll thread to poll back again and test the > >>>> + * consumer_quit state to quit gracefully. > >>>> */ > >>>> do { > >>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >>>> + struct lttng_consumer_stream *null_stream = NULL; > >>>> + > >>>> + ret = write(ctx->consumer_poll_pipe[1], &null_stream, > >>>> + sizeof(null_stream)); > >>>> } while (ret < 0 && errno == EINTR); > >>>> + > >>>> rcu_unregister_thread(); > >>>> return NULL; > >>>> } > >>>> diff --git a/src/common/consumer.h b/src/common/consumer.h > >>>> index 4b225e4..8e5891a 100644 > >>>> --- a/src/common/consumer.h > >>>> +++ b/src/common/consumer.h > >>>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair > >>>> *consumer_allocate_relayd_sock_pair( > >>>> struct consumer_relayd_sock_pair *consumer_find_relayd(int key); > >>>> int consumer_handle_stream_before_relayd(struct lttng_consumer_stream > >>>> *stream, > >>>> size_t data_size); > >>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht); > >>>> > >>>> extern struct lttng_consumer_local_data *lttng_consumer_create( > >>>> enum lttng_consumer_type type, > >>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c > >>>> b/src/common/kernel-consumer/kernel-consumer.c > >>>> index 13cbe21..444f5e0 100644 > >>>> --- a/src/common/kernel-consumer/kernel-consumer.c > >>>> +++ b/src/common/kernel-consumer/kernel-consumer.c > >>>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct > >>>> lttng_consumer_local_data *ctx, > >>>> consumer_del_stream(new_stream, NULL); > >>>> } > >>>> } else { > >>>> - ret = consumer_add_stream(new_stream); > >>>> - if (ret) { > >>>> - ERR("Consumer add stream %d failed. > >>>> Continuing", > >>>> - new_stream->key); > >>>> + do { > >>>> + ret = write(ctx->consumer_poll_pipe[1], > >>>> &new_stream, > >>>> + sizeof(new_stream)); > >>>> + } while (ret < 0 && errno == EINTR); > >>>> + if (ret < 0) { > >>>> + PERROR("write data pipe"); > >>>> consumer_del_stream(new_stream, NULL); > >>>> goto end_nosignal; > >>>> } > >>>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct > >>>> lttng_consumer_local_data *ctx, > >>>> goto end_nosignal; > >>>> } > >>>> > >>>> - /* > >>>> - * Wake-up the other end by writing a null byte in the pipe > >>>> (non-blocking). > >>>> - * Important note: Because writing into the pipe is > >>>> non-blocking (and > >>>> - * therefore we allow dropping wakeup data, as long as there is > >>>> wakeup data > >>>> - * present in the pipe buffer to wake up the other end), the > >>>> other end > >>>> - * should perform the following sequence for waiting: > >>>> - * > >>>> - * 1) empty the pipe (reads). > >>>> - * 2) perform update operation. > >>>> - * 3) wait on the pipe (poll). > >>>> - */ > >>>> - do { > >>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >>>> - } while (ret < 0 && errno == EINTR); > >>>> end_nosignal: > >>>> rcu_read_unlock(); > >>>> > >>>> diff --git a/src/common/ust-consumer/ust-consumer.c > >>>> b/src/common/ust-consumer/ust-consumer.c > >>>> index 1170687..4ca4b84 100644 > >>>> --- a/src/common/ust-consumer/ust-consumer.c > >>>> +++ b/src/common/ust-consumer/ust-consumer.c > >>>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct > >>>> lttng_consumer_local_data *ctx, > >>>> goto end_nosignal; > >>>> } > >>>> > >>>> + /* > >>>> + * This needs to be done as soon as we can so we don't > >>>> block the > >>>> + * application too long. > >>>> + */ > >>>> + ret = lttng_ustconsumer_add_stream(new_stream); > >>>> + if (ret) { > >>>> + consumer_del_stream(new_stream, NULL); > >>>> + goto end_nosignal; > >>>> + } > >>>> + /* Steal stream identifier to avoid having streams with > >>>> the same key */ > >>>> + consumer_steal_stream_key(new_stream->key, > >>>> consumer_data.stream_ht); > >>>> + > >>>> /* The stream is not metadata. Get relayd reference if > >>>> exists. */ > >>>> relayd = consumer_find_relayd(msg.u.stream.net_index); > >>>> if (relayd != NULL) { > >>>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct > >>>> lttng_consumer_local_data *ctx, > >>>> goto end_nosignal; > >>>> } > >>>> } else { > >>>> - ret = consumer_add_stream(new_stream); > >>>> - if (ret) { > >>>> - ERR("Consumer add stream %d failed. > >>>> Continuing", > >>>> - new_stream->key); > >>>> - /* > >>>> - * At this point, if the add_stream > >>>> fails, it is not in the > >>>> - * hash table thus passing the NULL > >>>> value here. > >>>> - */ > >>>> + do { > >>>> + ret = write(ctx->consumer_poll_pipe[1], > >>>> &new_stream, > >>>> + sizeof(new_stream)); > >>>> + } while (ret < 0 && errno == EINTR); > >>>> + if (ret < 0) { > >>>> + PERROR("write data pipe"); > >>>> consumer_del_stream(new_stream, NULL); > >>>> goto end_nosignal; > >>>> } > >>>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct > >>>> lttng_consumer_local_data *ctx, > >>>> break; > >>>> } > >>>> > >>>> - /* > >>>> - * Wake-up the other end by writing a null byte in the pipe > >>>> (non-blocking). > >>>> - * Important note: Because writing into the pipe is > >>>> non-blocking (and > >>>> - * therefore we allow dropping wakeup data, as long as there is > >>>> wakeup data > >>>> - * present in the pipe buffer to wake up the other end), the > >>>> other end > >>>> - * should perform the following sequence for waiting: > >>>> - * > >>>> - * 1) empty the pipe (reads). > >>>> - * 2) perform update operation. > >>>> - * 3) wait on the pipe (poll). > >>>> - */ > >>>> - do { > >>>> - ret = write(ctx->consumer_poll_pipe[1], "", 1); > >>>> - } while (ret < 0 && errno == EINTR); > >>>> end_nosignal: > >>>> rcu_read_unlock(); > >>>> > >>>> -- > >>>> 1.7.10.4 > >>>> > >>>> > >>>> _______________________________________________ > >>>> lttng-dev mailing list > >>>> [email protected] > >>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev > >>> > > -- Mathieu Desnoyers Operating System Efficiency R&D Consultant EfficiOS Inc. http://www.efficios.com _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
