Mathieu Desnoyers:
> * David Goulet ([email protected]) wrote:
>>
>>
>> Mathieu Desnoyers:
>>> * David Goulet ([email protected]) wrote:
>>>>
>>>>
>>>> Mathieu Desnoyers:
>>>>> * David Goulet ([email protected]) wrote:
>>>>>> As a second step of refactoring, upon receiving a data stream, we send
>>>>>> it to the data thread that is now in charge of handling it.
>>>>>>
>>>>>> Furthermore, in order for this to behave correctly, we have to make the
>>>>>> ustctl actions on the stream upon before passing it to the right thread
>>>>>> (the kernel does not need special actions.). This way, once the sessiond
>>>>>> thread reply back to the session daemon, the stream is sure to be open
>>>>>> and ready for data to be recorded on the application side so we avoid a
>>>>>> race between the application thinking the stream is ready and the stream
>>>>>> thread still scheduled out.
>>>>>
>>>>> Normally, as long as we have a reference on the SHM file descriptor, and
>>>>> we have the wakeup FD, we should be good to fetch the data of buffers
>>>>> belonging to an application that has already exited, even if it did so
>>>>> before the ustctl calls are done.
>>>>>
>>>>> So I'm wondering why you do the ustctl calls in the sessiond thread ? It
>>>>> seems to complexify the implementation needlessly: we could still do the
>>>>> ustctl calls and output file open at the same location, the
>>>>> data/metadata threads.
>>>>
>>>> Hmmm, it was my understanding that does
>>>
>>> does -> those
>>>
>>>> ustctl_* calls were needed
>>>> before the trace could be recording thus making them quickly. Wrong?
>>>
>>> Can you rephrase your question ? I don't understand.
>>>
>>
>> My understanding was that _those_ ustctl calls need to be done before
>> the tracer could start recording data. This is why they were moved to
>> the session daemon thread.
>>
>> Am I wrong here? When receiving an UST stream< on the consumer side, is
>> the SHM reference already acquired?
> 
> yes, the reference to shm is already acquired: it's the FD that _has_
> the reference.

Ok good. So just to be crystal clear here, the ustctl* calls can be
delayed and done in the right thread? (data/metadata).

David

> 
> Thanks,
> 
> Mathieu
> 
>>
>> David
>>
>>> Thanks,
>>>
>>> Mathieu
>>>
>>>>
>>>> David
>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mathieu
>>>>>
>>>>>>
>>>>>> This commit should speed up the add stream process for the session
>>>>>> daemon. There is still some actions to move out of the session daemon
>>>>>> poll thread to gain speed significantly, especially for network
>>>>>> streaming.
>>>>>>
>>>>>> Signed-off-by: David Goulet <[email protected]>
>>>>>> ---
>>>>>>  src/common/consumer.c                        |  123 
>>>>>> +++++++++++---------------
>>>>>>  src/common/consumer.h                        |    1 +
>>>>>>  src/common/kernel-consumer/kernel-consumer.c |   24 ++---
>>>>>>  src/common/ust-consumer/ust-consumer.c       |   40 ++++-----
>>>>>>  4 files changed, 78 insertions(+), 110 deletions(-)
>>>>>>
>>>>>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>>>>>> index 055de1b..1d2b1f7 100644
>>>>>> --- a/src/common/consumer.c
>>>>>> +++ b/src/common/consumer.c
>>>>>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream 
>>>>>> *consumer_find_stream(int key,
>>>>>>          return stream;
>>>>>>  }
>>>>>>  
>>>>>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>>>>>>  {
>>>>>>          struct lttng_consumer_stream *stream;
>>>>>>  
>>>>>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream 
>>>>>> *consumer_allocate_stream(
>>>>>>          lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
>>>>>>          lttng_ht_node_init_ulong(&stream->node, stream->key);
>>>>>>  
>>>>>> +        /*
>>>>>> +         * The cpu number is needed before using any ustctl_* actions. 
>>>>>> Ignored for
>>>>>> +         * the kernel so the value does not matter.
>>>>>> +         */
>>>>>> +        pthread_mutex_lock(&consumer_data.lock);
>>>>>> +        stream->cpu = stream->chan->cpucount++;
>>>>>> +        pthread_mutex_unlock(&consumer_data.lock);
>>>>>> +
>>>>>>          DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, 
>>>>>> mmap_len %llu,"
>>>>>>                          " out_fd %d, net_seq_idx %d)", 
>>>>>> stream->path_name, stream->key,
>>>>>>                          stream->shm_fd, stream->wait_fd,
>>>>>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct 
>>>>>> lttng_consumer_stream *stream)
>>>>>>          pthread_mutex_lock(&consumer_data.lock);
>>>>>>          rcu_read_lock();
>>>>>>  
>>>>>> -        switch (consumer_data.type) {
>>>>>> -        case LTTNG_CONSUMER_KERNEL:
>>>>>> -                break;
>>>>>> -        case LTTNG_CONSUMER32_UST:
>>>>>> -        case LTTNG_CONSUMER64_UST:
>>>>>> -                stream->cpu = stream->chan->cpucount++;
>>>>>> -                ret = lttng_ustconsumer_add_stream(stream);
>>>>>> -                if (ret) {
>>>>>> -                        ret = -EINVAL;
>>>>>> -                        goto error;
>>>>>> -                }
>>>>>> -
>>>>>> -                /* Steal stream identifier only for UST */
>>>>>> -                consumer_steal_stream_key(stream->key, 
>>>>>> consumer_data.stream_ht);
>>>>>> -                break;
>>>>>> -        default:
>>>>>> -                ERR("Unknown consumer_data type");
>>>>>> -                assert(0);
>>>>>> -                ret = -ENOSYS;
>>>>>> -                goto error;
>>>>>> -        }
>>>>>> -
>>>>>>          lttng_ht_add_unique_ulong(consumer_data.stream_ht, 
>>>>>> &stream->node);
>>>>>>  
>>>>>>          /* Check and cleanup relayd */
>>>>>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream 
>>>>>> *stream)
>>>>>>          consumer_data.stream_count++;
>>>>>>          consumer_data.need_update = 1;
>>>>>>  
>>>>>> -error:
>>>>>>          rcu_read_unlock();
>>>>>>          pthread_mutex_unlock(&consumer_data.lock);
>>>>>>  
>>>>>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct 
>>>>>> lttng_consumer_stream *stream,
>>>>>>  
>>>>>>          DBG3("Consumer delete metadata stream %d", stream->wait_fd);
>>>>>>  
>>>>>> -        if (ht == NULL) {
>>>>>> -                /* Means the stream was allocated but not successfully 
>>>>>> added */
>>>>>> -                goto free_stream;
>>>>>> -        }
>>>>>> -
>>>>>> -        rcu_read_lock();
>>>>>> -        iter.iter.node = &stream->waitfd_node.node;
>>>>>> -        ret = lttng_ht_del(ht, &iter);
>>>>>> -        assert(!ret);
>>>>>> -        rcu_read_unlock();
>>>>>> -
>>>>>>          pthread_mutex_lock(&consumer_data.lock);
>>>>>>          switch (consumer_data.type) {
>>>>>>          case LTTNG_CONSUMER_KERNEL:
>>>>>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct 
>>>>>> lttng_consumer_stream *stream,
>>>>>>                  goto end;
>>>>>>          }
>>>>>>  
>>>>>> +        if (ht == NULL) {
>>>>>> +                pthread_mutex_unlock(&consumer_data.lock);
>>>>>> +                /* Means the stream was allocated but not successfully 
>>>>>> added */
>>>>>> +                goto free_stream;
>>>>>> +        }
>>>>>> +
>>>>>> +        rcu_read_lock();
>>>>>> +        iter.iter.node = &stream->waitfd_node.node;
>>>>>> +        ret = lttng_ht_del(ht, &iter);
>>>>>> +        assert(!ret);
>>>>>> +        rcu_read_unlock();
>>>>>> +
>>>>>>          if (stream->out_fd >= 0) {
>>>>>>                  ret = close(stream->out_fd);
>>>>>>                  if (ret) {
>>>>>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct 
>>>>>> lttng_consumer_stream *stream,
>>>>>>  
>>>>>>          pthread_mutex_lock(&consumer_data.lock);
>>>>>>  
>>>>>> -        switch (consumer_data.type) {
>>>>>> -        case LTTNG_CONSUMER_KERNEL:
>>>>>> -                break;
>>>>>> -        case LTTNG_CONSUMER32_UST:
>>>>>> -        case LTTNG_CONSUMER64_UST:
>>>>>> -                ret = lttng_ustconsumer_add_stream(stream);
>>>>>> -                if (ret) {
>>>>>> -                        ret = -EINVAL;
>>>>>> -                        goto error;
>>>>>> -                }
>>>>>> -
>>>>>> -                /* Steal stream identifier only for UST */
>>>>>> -                consumer_steal_stream_key(stream->wait_fd, ht);
>>>>>> -                break;
>>>>>> -        default:
>>>>>> -                ERR("Unknown consumer_data type");
>>>>>> -                assert(0);
>>>>>> -                ret = -ENOSYS;
>>>>>> -                goto error;
>>>>>> -        }
>>>>>> -
>>>>>>          /*
>>>>>>           * From here, refcounts are updated so be _careful_ when 
>>>>>> returning an error
>>>>>>           * after this point.
>>>>>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct 
>>>>>> lttng_consumer_stream *stream,
>>>>>>          lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
>>>>>>          rcu_read_unlock();
>>>>>>  
>>>>>> -error:
>>>>>>          pthread_mutex_unlock(&consumer_data.lock);
>>>>>>          return ret;
>>>>>>  }
>>>>>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data)
>>>>>>          int num_rdy, num_hup, high_prio, ret, i;
>>>>>>          struct pollfd *pollfd = NULL;
>>>>>>          /* local view of the streams */
>>>>>> -        struct lttng_consumer_stream **local_stream = NULL;
>>>>>> +        struct lttng_consumer_stream **local_stream = NULL, *new_stream 
>>>>>> = NULL;
>>>>>>          /* local view of consumer_data.fds_count */
>>>>>>          int nb_fd = 0;
>>>>>>          struct lttng_consumer_local_data *ctx = data;
>>>>>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data)
>>>>>>                   */
>>>>>>                  if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
>>>>>>                          size_t pipe_readlen;
>>>>>> -                        char tmp;
>>>>>>  
>>>>>>                          DBG("consumer_poll_pipe wake up");
>>>>>>                          /* Consume 1 byte of pipe data */
>>>>>>                          do {
>>>>>> -                                pipe_readlen = 
>>>>>> read(ctx->consumer_poll_pipe[0], &tmp, 1);
>>>>>> +                                pipe_readlen = 
>>>>>> read(ctx->consumer_poll_pipe[0], &new_stream,
>>>>>> +                                                sizeof(new_stream));
>>>>>>                          } while (pipe_readlen == -1 && errno == EINTR);
>>>>>> +
>>>>>> +                        /*
>>>>>> +                         * If the stream is NULL, just ignore it. It's 
>>>>>> also possible that
>>>>>> +                         * the sessiond poll thread changed the 
>>>>>> consumer_quit state and is
>>>>>> +                         * waking us up to test it.
>>>>>> +                         */
>>>>>> +                        if (new_stream == NULL) {
>>>>>> +                                continue;
>>>>>> +                        }
>>>>>> +
>>>>>> +                        ret = consumer_add_stream(new_stream);
>>>>>> +                        if (ret) {
>>>>>> +                                ERR("Consumer add stream %d failed. 
>>>>>> Continuing",
>>>>>> +                                                new_stream->key);
>>>>>> +                                /*
>>>>>> +                                 * At this point, if the add_stream 
>>>>>> fails, it is not in the
>>>>>> +                                 * hash table thus passing the NULL 
>>>>>> value here.
>>>>>> +                                 */
>>>>>> +                                consumer_del_stream(new_stream, NULL);
>>>>>> +                        }
>>>>>> +
>>>>>> +                        /* Continue to update the local streams and 
>>>>>> handle prio ones */
>>>>>>                          continue;
>>>>>>                  }
>>>>>>  
>>>>>> @@ -2260,19 +2246,16 @@ end:
>>>>>>          consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
>>>>>>  
>>>>>>          /*
>>>>>> -         * Wake-up the other end by writing a null byte in the pipe
>>>>>> -         * (non-blocking). Important note: Because writing into the
>>>>>> -         * pipe is non-blocking (and therefore we allow dropping wakeup
>>>>>> -         * data, as long as there is wakeup data present in the pipe
>>>>>> -         * buffer to wake up the other end), the other end should
>>>>>> -         * perform the following sequence for waiting:
>>>>>> -         * 1) empty the pipe (reads).
>>>>>> -         * 2) perform update operation.
>>>>>> -         * 3) wait on the pipe (poll).
>>>>>> +         * Notify the data poll thread to poll back again and test the
>>>>>> +         * consumer_quit state to quit gracefully.
>>>>>>           */
>>>>>>          do {
>>>>>> -                ret = write(ctx->consumer_poll_pipe[1], "", 1);
>>>>>> +                struct lttng_consumer_stream *null_stream = NULL;
>>>>>> +
>>>>>> +                ret = write(ctx->consumer_poll_pipe[1], &null_stream,
>>>>>> +                                sizeof(null_stream));
>>>>>>          } while (ret < 0 && errno == EINTR);
>>>>>> +
>>>>>>          rcu_unregister_thread();
>>>>>>          return NULL;
>>>>>>  }
>>>>>> diff --git a/src/common/consumer.h b/src/common/consumer.h
>>>>>> index 4b225e4..8e5891a 100644
>>>>>> --- a/src/common/consumer.h
>>>>>> +++ b/src/common/consumer.h
>>>>>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair 
>>>>>> *consumer_allocate_relayd_sock_pair(
>>>>>>  struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
>>>>>>  int consumer_handle_stream_before_relayd(struct lttng_consumer_stream 
>>>>>> *stream,
>>>>>>                  size_t data_size);
>>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht);
>>>>>>  
>>>>>>  extern struct lttng_consumer_local_data *lttng_consumer_create(
>>>>>>                  enum lttng_consumer_type type,
>>>>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c 
>>>>>> b/src/common/kernel-consumer/kernel-consumer.c
>>>>>> index 13cbe21..444f5e0 100644
>>>>>> --- a/src/common/kernel-consumer/kernel-consumer.c
>>>>>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>>>>>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct 
>>>>>> lttng_consumer_local_data *ctx,
>>>>>>                                  consumer_del_stream(new_stream, NULL);
>>>>>>                          }
>>>>>>                  } else {
>>>>>> -                        ret = consumer_add_stream(new_stream);
>>>>>> -                        if (ret) {
>>>>>> -                                ERR("Consumer add stream %d failed. 
>>>>>> Continuing",
>>>>>> -                                                new_stream->key);
>>>>>> +                        do {
>>>>>> +                                ret = write(ctx->consumer_poll_pipe[1], 
>>>>>> &new_stream,
>>>>>> +                                                sizeof(new_stream));
>>>>>> +                        } while (ret < 0 && errno == EINTR);
>>>>>> +                        if (ret < 0) {
>>>>>> +                                PERROR("write data pipe");
>>>>>>                                  consumer_del_stream(new_stream, NULL);
>>>>>>                                  goto end_nosignal;
>>>>>>                          }
>>>>>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct 
>>>>>> lttng_consumer_local_data *ctx,
>>>>>>                  goto end_nosignal;
>>>>>>          }
>>>>>>  
>>>>>> -        /*
>>>>>> -         * Wake-up the other end by writing a null byte in the pipe 
>>>>>> (non-blocking).
>>>>>> -         * Important note: Because writing into the pipe is 
>>>>>> non-blocking (and
>>>>>> -         * therefore we allow dropping wakeup data, as long as there is 
>>>>>> wakeup data
>>>>>> -         * present in the pipe buffer to wake up the other end), the 
>>>>>> other end
>>>>>> -         * should perform the following sequence for waiting:
>>>>>> -         *
>>>>>> -         * 1) empty the pipe (reads).
>>>>>> -         * 2) perform update operation.
>>>>>> -         * 3) wait on the pipe (poll).
>>>>>> -         */
>>>>>> -        do {
>>>>>> -                ret = write(ctx->consumer_poll_pipe[1], "", 1);
>>>>>> -        } while (ret < 0 && errno == EINTR);
>>>>>>  end_nosignal:
>>>>>>          rcu_read_unlock();
>>>>>>  
>>>>>> diff --git a/src/common/ust-consumer/ust-consumer.c 
>>>>>> b/src/common/ust-consumer/ust-consumer.c
>>>>>> index 1170687..4ca4b84 100644
>>>>>> --- a/src/common/ust-consumer/ust-consumer.c
>>>>>> +++ b/src/common/ust-consumer/ust-consumer.c
>>>>>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct 
>>>>>> lttng_consumer_local_data *ctx,
>>>>>>                          goto end_nosignal;
>>>>>>                  }
>>>>>>  
>>>>>> +                /*
>>>>>> +                 * This needs to be done as soon as we can so we don't 
>>>>>> block the
>>>>>> +                 * application too long.
>>>>>> +                 */
>>>>>> +                ret = lttng_ustconsumer_add_stream(new_stream);
>>>>>> +                if (ret) {
>>>>>> +                        consumer_del_stream(new_stream, NULL);
>>>>>> +                        goto end_nosignal;
>>>>>> +                }
>>>>>> +                /* Steal stream identifier to avoid having streams with 
>>>>>> the same key */
>>>>>> +                consumer_steal_stream_key(new_stream->key, 
>>>>>> consumer_data.stream_ht);
>>>>>> +
>>>>>>                  /* The stream is not metadata. Get relayd reference if 
>>>>>> exists. */
>>>>>>                  relayd = consumer_find_relayd(msg.u.stream.net_index);
>>>>>>                  if (relayd != NULL) {
>>>>>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct 
>>>>>> lttng_consumer_local_data *ctx,
>>>>>>                                  goto end_nosignal;
>>>>>>                          }
>>>>>>                  } else {
>>>>>> -                        ret = consumer_add_stream(new_stream);
>>>>>> -                        if (ret) {
>>>>>> -                                ERR("Consumer add stream %d failed. 
>>>>>> Continuing",
>>>>>> -                                                new_stream->key);
>>>>>> -                                /*
>>>>>> -                                 * At this point, if the add_stream 
>>>>>> fails, it is not in the
>>>>>> -                                 * hash table thus passing the NULL 
>>>>>> value here.
>>>>>> -                                 */
>>>>>> +                        do {
>>>>>> +                                ret = write(ctx->consumer_poll_pipe[1], 
>>>>>> &new_stream,
>>>>>> +                                                sizeof(new_stream));
>>>>>> +                        } while (ret < 0 && errno == EINTR);
>>>>>> +                        if (ret < 0) {
>>>>>> +                                PERROR("write data pipe");
>>>>>>                                  consumer_del_stream(new_stream, NULL);
>>>>>>                                  goto end_nosignal;
>>>>>>                          }
>>>>>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct 
>>>>>> lttng_consumer_local_data *ctx,
>>>>>>                  break;
>>>>>>          }
>>>>>>  
>>>>>> -        /*
>>>>>> -         * Wake-up the other end by writing a null byte in the pipe 
>>>>>> (non-blocking).
>>>>>> -         * Important note: Because writing into the pipe is 
>>>>>> non-blocking (and
>>>>>> -         * therefore we allow dropping wakeup data, as long as there is 
>>>>>> wakeup data
>>>>>> -         * present in the pipe buffer to wake up the other end), the 
>>>>>> other end
>>>>>> -         * should perform the following sequence for waiting:
>>>>>> -         *
>>>>>> -         * 1) empty the pipe (reads).
>>>>>> -         * 2) perform update operation.
>>>>>> -         * 3) wait on the pipe (poll).
>>>>>> -         */
>>>>>> -        do {
>>>>>> -                ret = write(ctx->consumer_poll_pipe[1], "", 1);
>>>>>> -        } while (ret < 0 && errno == EINTR);
>>>>>>  end_nosignal:
>>>>>>          rcu_read_unlock();
>>>>>>  
>>>>>> -- 
>>>>>> 1.7.10.4
>>>>>>
>>>>>>
>>>>>> _______________________________________________
>>>>>> lttng-dev mailing list
>>>>>> [email protected]
>>>>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>>>>>
>>>
> 

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to