* David Goulet ([email protected]) wrote:
> 
> 
> Mathieu Desnoyers:
> > * David Goulet ([email protected]) wrote:
> >>
> >>
> >> Mathieu Desnoyers:
> >>> * David Goulet ([email protected]) wrote:
> >>>>
> >>>>
> >>>> Mathieu Desnoyers:
> >>>>> * David Goulet ([email protected]) wrote:
> >>>>>> As a second step of refactoring, upon receiving a data stream, we send
> >>>>>> it to the data thread that is now in charge of handling it.
> >>>>>>
> >>>>>> Furthermore, in order for this to behave correctly, we have to make the
> >>>>>> ustctl actions on the stream upon before passing it to the right thread
> >>>>>> (the kernel does not need special actions.). This way, once the 
> >>>>>> sessiond
> >>>>>> thread reply back to the session daemon, the stream is sure to be open
> >>>>>> and ready for data to be recorded on the application side so we avoid a
> >>>>>> race between the application thinking the stream is ready and the 
> >>>>>> stream
> >>>>>> thread still scheduled out.
> >>>>>
> >>>>> Normally, as long as we have a reference on the SHM file descriptor, and
> >>>>> we have the wakeup FD, we should be good to fetch the data of buffers
> >>>>> belonging to an application that has already exited, even if it did so
> >>>>> before the ustctl calls are done.
> >>>>>
> >>>>> So I'm wondering why you do the ustctl calls in the sessiond thread ? It
> >>>>> seems to complexify the implementation needlessly: we could still do the
> >>>>> ustctl calls and output file open at the same location, the
> >>>>> data/metadata threads.
> >>>>
> >>>> Hmmm, it was my understanding that does
> >>>
> >>> does -> those
> >>>
> >>>> ustctl_* calls were needed
> >>>> before the trace could be recording thus making them quickly. Wrong?
> >>>
> >>> Can you rephrase your question ? I don't understand.
> >>>
> >>
> >> My understanding was that _those_ ustctl calls need to be done before
> >> the tracer could start recording data. This is why they were moved to
> >> the session daemon thread.
> >>
> >> Am I wrong here? When receiving an UST stream< on the consumer side, is
> >> the SHM reference already acquired?
> > 
> > yes, the reference to shm is already acquired: it's the FD that _has_
> > the reference.
> 
> Ok good. So just to be crystal clear here, the ustctl* calls can be
> delayed and done in the right thread? (data/metadata).

yes, I expect it should work.

Mathieu

> 
> David
> 
> > 
> > Thanks,
> > 
> > Mathieu
> > 
> >>
> >> David
> >>
> >>> Thanks,
> >>>
> >>> Mathieu
> >>>
> >>>>
> >>>> David
> >>>>
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Mathieu
> >>>>>
> >>>>>>
> >>>>>> This commit should speed up the add stream process for the session
> >>>>>> daemon. There is still some actions to move out of the session daemon
> >>>>>> poll thread to gain speed significantly, especially for network
> >>>>>> streaming.
> >>>>>>
> >>>>>> Signed-off-by: David Goulet <[email protected]>
> >>>>>> ---
> >>>>>>  src/common/consumer.c                        |  123 
> >>>>>> +++++++++++---------------
> >>>>>>  src/common/consumer.h                        |    1 +
> >>>>>>  src/common/kernel-consumer/kernel-consumer.c |   24 ++---
> >>>>>>  src/common/ust-consumer/ust-consumer.c       |   40 ++++-----
> >>>>>>  4 files changed, 78 insertions(+), 110 deletions(-)
> >>>>>>
> >>>>>> diff --git a/src/common/consumer.c b/src/common/consumer.c
> >>>>>> index 055de1b..1d2b1f7 100644
> >>>>>> --- a/src/common/consumer.c
> >>>>>> +++ b/src/common/consumer.c
> >>>>>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream 
> >>>>>> *consumer_find_stream(int key,
> >>>>>>        return stream;
> >>>>>>  }
> >>>>>>  
> >>>>>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
> >>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht)
> >>>>>>  {
> >>>>>>        struct lttng_consumer_stream *stream;
> >>>>>>  
> >>>>>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream 
> >>>>>> *consumer_allocate_stream(
> >>>>>>        lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
> >>>>>>        lttng_ht_node_init_ulong(&stream->node, stream->key);
> >>>>>>  
> >>>>>> +      /*
> >>>>>> +       * The cpu number is needed before using any ustctl_* actions. 
> >>>>>> Ignored for
> >>>>>> +       * the kernel so the value does not matter.
> >>>>>> +       */
> >>>>>> +      pthread_mutex_lock(&consumer_data.lock);
> >>>>>> +      stream->cpu = stream->chan->cpucount++;
> >>>>>> +      pthread_mutex_unlock(&consumer_data.lock);
> >>>>>> +
> >>>>>>        DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, 
> >>>>>> mmap_len %llu,"
> >>>>>>                        " out_fd %d, net_seq_idx %d)", 
> >>>>>> stream->path_name, stream->key,
> >>>>>>                        stream->shm_fd, stream->wait_fd,
> >>>>>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct 
> >>>>>> lttng_consumer_stream *stream)
> >>>>>>        pthread_mutex_lock(&consumer_data.lock);
> >>>>>>        rcu_read_lock();
> >>>>>>  
> >>>>>> -      switch (consumer_data.type) {
> >>>>>> -      case LTTNG_CONSUMER_KERNEL:
> >>>>>> -              break;
> >>>>>> -      case LTTNG_CONSUMER32_UST:
> >>>>>> -      case LTTNG_CONSUMER64_UST:
> >>>>>> -              stream->cpu = stream->chan->cpucount++;
> >>>>>> -              ret = lttng_ustconsumer_add_stream(stream);
> >>>>>> -              if (ret) {
> >>>>>> -                      ret = -EINVAL;
> >>>>>> -                      goto error;
> >>>>>> -              }
> >>>>>> -
> >>>>>> -              /* Steal stream identifier only for UST */
> >>>>>> -              consumer_steal_stream_key(stream->key, 
> >>>>>> consumer_data.stream_ht);
> >>>>>> -              break;
> >>>>>> -      default:
> >>>>>> -              ERR("Unknown consumer_data type");
> >>>>>> -              assert(0);
> >>>>>> -              ret = -ENOSYS;
> >>>>>> -              goto error;
> >>>>>> -      }
> >>>>>> -
> >>>>>>        lttng_ht_add_unique_ulong(consumer_data.stream_ht, 
> >>>>>> &stream->node);
> >>>>>>  
> >>>>>>        /* Check and cleanup relayd */
> >>>>>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct 
> >>>>>> lttng_consumer_stream *stream)
> >>>>>>        consumer_data.stream_count++;
> >>>>>>        consumer_data.need_update = 1;
> >>>>>>  
> >>>>>> -error:
> >>>>>>        rcu_read_unlock();
> >>>>>>        pthread_mutex_unlock(&consumer_data.lock);
> >>>>>>  
> >>>>>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct 
> >>>>>> lttng_consumer_stream *stream,
> >>>>>>  
> >>>>>>        DBG3("Consumer delete metadata stream %d", stream->wait_fd);
> >>>>>>  
> >>>>>> -      if (ht == NULL) {
> >>>>>> -              /* Means the stream was allocated but not successfully 
> >>>>>> added */
> >>>>>> -              goto free_stream;
> >>>>>> -      }
> >>>>>> -
> >>>>>> -      rcu_read_lock();
> >>>>>> -      iter.iter.node = &stream->waitfd_node.node;
> >>>>>> -      ret = lttng_ht_del(ht, &iter);
> >>>>>> -      assert(!ret);
> >>>>>> -      rcu_read_unlock();
> >>>>>> -
> >>>>>>        pthread_mutex_lock(&consumer_data.lock);
> >>>>>>        switch (consumer_data.type) {
> >>>>>>        case LTTNG_CONSUMER_KERNEL:
> >>>>>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct 
> >>>>>> lttng_consumer_stream *stream,
> >>>>>>                goto end;
> >>>>>>        }
> >>>>>>  
> >>>>>> +      if (ht == NULL) {
> >>>>>> +              pthread_mutex_unlock(&consumer_data.lock);
> >>>>>> +              /* Means the stream was allocated but not successfully 
> >>>>>> added */
> >>>>>> +              goto free_stream;
> >>>>>> +      }
> >>>>>> +
> >>>>>> +      rcu_read_lock();
> >>>>>> +      iter.iter.node = &stream->waitfd_node.node;
> >>>>>> +      ret = lttng_ht_del(ht, &iter);
> >>>>>> +      assert(!ret);
> >>>>>> +      rcu_read_unlock();
> >>>>>> +
> >>>>>>        if (stream->out_fd >= 0) {
> >>>>>>                ret = close(stream->out_fd);
> >>>>>>                if (ret) {
> >>>>>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct 
> >>>>>> lttng_consumer_stream *stream,
> >>>>>>  
> >>>>>>        pthread_mutex_lock(&consumer_data.lock);
> >>>>>>  
> >>>>>> -      switch (consumer_data.type) {
> >>>>>> -      case LTTNG_CONSUMER_KERNEL:
> >>>>>> -              break;
> >>>>>> -      case LTTNG_CONSUMER32_UST:
> >>>>>> -      case LTTNG_CONSUMER64_UST:
> >>>>>> -              ret = lttng_ustconsumer_add_stream(stream);
> >>>>>> -              if (ret) {
> >>>>>> -                      ret = -EINVAL;
> >>>>>> -                      goto error;
> >>>>>> -              }
> >>>>>> -
> >>>>>> -              /* Steal stream identifier only for UST */
> >>>>>> -              consumer_steal_stream_key(stream->wait_fd, ht);
> >>>>>> -              break;
> >>>>>> -      default:
> >>>>>> -              ERR("Unknown consumer_data type");
> >>>>>> -              assert(0);
> >>>>>> -              ret = -ENOSYS;
> >>>>>> -              goto error;
> >>>>>> -      }
> >>>>>> -
> >>>>>>        /*
> >>>>>>         * From here, refcounts are updated so be _careful_ when 
> >>>>>> returning an error
> >>>>>>         * after this point.
> >>>>>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct 
> >>>>>> lttng_consumer_stream *stream,
> >>>>>>        lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
> >>>>>>        rcu_read_unlock();
> >>>>>>  
> >>>>>> -error:
> >>>>>>        pthread_mutex_unlock(&consumer_data.lock);
> >>>>>>        return ret;
> >>>>>>  }
> >>>>>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data)
> >>>>>>        int num_rdy, num_hup, high_prio, ret, i;
> >>>>>>        struct pollfd *pollfd = NULL;
> >>>>>>        /* local view of the streams */
> >>>>>> -      struct lttng_consumer_stream **local_stream = NULL;
> >>>>>> +      struct lttng_consumer_stream **local_stream = NULL, *new_stream 
> >>>>>> = NULL;
> >>>>>>        /* local view of consumer_data.fds_count */
> >>>>>>        int nb_fd = 0;
> >>>>>>        struct lttng_consumer_local_data *ctx = data;
> >>>>>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data)
> >>>>>>                 */
> >>>>>>                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
> >>>>>>                        size_t pipe_readlen;
> >>>>>> -                      char tmp;
> >>>>>>  
> >>>>>>                        DBG("consumer_poll_pipe wake up");
> >>>>>>                        /* Consume 1 byte of pipe data */
> >>>>>>                        do {
> >>>>>> -                              pipe_readlen = 
> >>>>>> read(ctx->consumer_poll_pipe[0], &tmp, 1);
> >>>>>> +                              pipe_readlen = 
> >>>>>> read(ctx->consumer_poll_pipe[0], &new_stream,
> >>>>>> +                                              sizeof(new_stream));
> >>>>>>                        } while (pipe_readlen == -1 && errno == EINTR);
> >>>>>> +
> >>>>>> +                      /*
> >>>>>> +                       * If the stream is NULL, just ignore it. It's 
> >>>>>> also possible that
> >>>>>> +                       * the sessiond poll thread changed the 
> >>>>>> consumer_quit state and is
> >>>>>> +                       * waking us up to test it.
> >>>>>> +                       */
> >>>>>> +                      if (new_stream == NULL) {
> >>>>>> +                              continue;
> >>>>>> +                      }
> >>>>>> +
> >>>>>> +                      ret = consumer_add_stream(new_stream);
> >>>>>> +                      if (ret) {
> >>>>>> +                              ERR("Consumer add stream %d failed. 
> >>>>>> Continuing",
> >>>>>> +                                              new_stream->key);
> >>>>>> +                              /*
> >>>>>> +                               * At this point, if the add_stream 
> >>>>>> fails, it is not in the
> >>>>>> +                               * hash table thus passing the NULL 
> >>>>>> value here.
> >>>>>> +                               */
> >>>>>> +                              consumer_del_stream(new_stream, NULL);
> >>>>>> +                      }
> >>>>>> +
> >>>>>> +                      /* Continue to update the local streams and 
> >>>>>> handle prio ones */
> >>>>>>                        continue;
> >>>>>>                }
> >>>>>>  
> >>>>>> @@ -2260,19 +2246,16 @@ end:
> >>>>>>        consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
> >>>>>>  
> >>>>>>        /*
> >>>>>> -       * Wake-up the other end by writing a null byte in the pipe
> >>>>>> -       * (non-blocking). Important note: Because writing into the
> >>>>>> -       * pipe is non-blocking (and therefore we allow dropping wakeup
> >>>>>> -       * data, as long as there is wakeup data present in the pipe
> >>>>>> -       * buffer to wake up the other end), the other end should
> >>>>>> -       * perform the following sequence for waiting:
> >>>>>> -       * 1) empty the pipe (reads).
> >>>>>> -       * 2) perform update operation.
> >>>>>> -       * 3) wait on the pipe (poll).
> >>>>>> +       * Notify the data poll thread to poll back again and test the
> >>>>>> +       * consumer_quit state to quit gracefully.
> >>>>>>         */
> >>>>>>        do {
> >>>>>> -              ret = write(ctx->consumer_poll_pipe[1], "", 1);
> >>>>>> +              struct lttng_consumer_stream *null_stream = NULL;
> >>>>>> +
> >>>>>> +              ret = write(ctx->consumer_poll_pipe[1], &null_stream,
> >>>>>> +                              sizeof(null_stream));
> >>>>>>        } while (ret < 0 && errno == EINTR);
> >>>>>> +
> >>>>>>        rcu_unregister_thread();
> >>>>>>        return NULL;
> >>>>>>  }
> >>>>>> diff --git a/src/common/consumer.h b/src/common/consumer.h
> >>>>>> index 4b225e4..8e5891a 100644
> >>>>>> --- a/src/common/consumer.h
> >>>>>> +++ b/src/common/consumer.h
> >>>>>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair 
> >>>>>> *consumer_allocate_relayd_sock_pair(
> >>>>>>  struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
> >>>>>>  int consumer_handle_stream_before_relayd(struct lttng_consumer_stream 
> >>>>>> *stream,
> >>>>>>                size_t data_size);
> >>>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht);
> >>>>>>  
> >>>>>>  extern struct lttng_consumer_local_data *lttng_consumer_create(
> >>>>>>                enum lttng_consumer_type type,
> >>>>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c 
> >>>>>> b/src/common/kernel-consumer/kernel-consumer.c
> >>>>>> index 13cbe21..444f5e0 100644
> >>>>>> --- a/src/common/kernel-consumer/kernel-consumer.c
> >>>>>> +++ b/src/common/kernel-consumer/kernel-consumer.c
> >>>>>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct 
> >>>>>> lttng_consumer_local_data *ctx,
> >>>>>>                                consumer_del_stream(new_stream, NULL);
> >>>>>>                        }
> >>>>>>                } else {
> >>>>>> -                      ret = consumer_add_stream(new_stream);
> >>>>>> -                      if (ret) {
> >>>>>> -                              ERR("Consumer add stream %d failed. 
> >>>>>> Continuing",
> >>>>>> -                                              new_stream->key);
> >>>>>> +                      do {
> >>>>>> +                              ret = write(ctx->consumer_poll_pipe[1], 
> >>>>>> &new_stream,
> >>>>>> +                                              sizeof(new_stream));
> >>>>>> +                      } while (ret < 0 && errno == EINTR);
> >>>>>> +                      if (ret < 0) {
> >>>>>> +                              PERROR("write data pipe");
> >>>>>>                                consumer_del_stream(new_stream, NULL);
> >>>>>>                                goto end_nosignal;
> >>>>>>                        }
> >>>>>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct 
> >>>>>> lttng_consumer_local_data *ctx,
> >>>>>>                goto end_nosignal;
> >>>>>>        }
> >>>>>>  
> >>>>>> -      /*
> >>>>>> -       * Wake-up the other end by writing a null byte in the pipe 
> >>>>>> (non-blocking).
> >>>>>> -       * Important note: Because writing into the pipe is 
> >>>>>> non-blocking (and
> >>>>>> -       * therefore we allow dropping wakeup data, as long as there is 
> >>>>>> wakeup data
> >>>>>> -       * present in the pipe buffer to wake up the other end), the 
> >>>>>> other end
> >>>>>> -       * should perform the following sequence for waiting:
> >>>>>> -       *
> >>>>>> -       * 1) empty the pipe (reads).
> >>>>>> -       * 2) perform update operation.
> >>>>>> -       * 3) wait on the pipe (poll).
> >>>>>> -       */
> >>>>>> -      do {
> >>>>>> -              ret = write(ctx->consumer_poll_pipe[1], "", 1);
> >>>>>> -      } while (ret < 0 && errno == EINTR);
> >>>>>>  end_nosignal:
> >>>>>>        rcu_read_unlock();
> >>>>>>  
> >>>>>> diff --git a/src/common/ust-consumer/ust-consumer.c 
> >>>>>> b/src/common/ust-consumer/ust-consumer.c
> >>>>>> index 1170687..4ca4b84 100644
> >>>>>> --- a/src/common/ust-consumer/ust-consumer.c
> >>>>>> +++ b/src/common/ust-consumer/ust-consumer.c
> >>>>>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct 
> >>>>>> lttng_consumer_local_data *ctx,
> >>>>>>                        goto end_nosignal;
> >>>>>>                }
> >>>>>>  
> >>>>>> +              /*
> >>>>>> +               * This needs to be done as soon as we can so we don't 
> >>>>>> block the
> >>>>>> +               * application too long.
> >>>>>> +               */
> >>>>>> +              ret = lttng_ustconsumer_add_stream(new_stream);
> >>>>>> +              if (ret) {
> >>>>>> +                      consumer_del_stream(new_stream, NULL);
> >>>>>> +                      goto end_nosignal;
> >>>>>> +              }
> >>>>>> +              /* Steal stream identifier to avoid having streams with 
> >>>>>> the same key */
> >>>>>> +              consumer_steal_stream_key(new_stream->key, 
> >>>>>> consumer_data.stream_ht);
> >>>>>> +
> >>>>>>                /* The stream is not metadata. Get relayd reference if 
> >>>>>> exists. */
> >>>>>>                relayd = consumer_find_relayd(msg.u.stream.net_index);
> >>>>>>                if (relayd != NULL) {
> >>>>>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct 
> >>>>>> lttng_consumer_local_data *ctx,
> >>>>>>                                goto end_nosignal;
> >>>>>>                        }
> >>>>>>                } else {
> >>>>>> -                      ret = consumer_add_stream(new_stream);
> >>>>>> -                      if (ret) {
> >>>>>> -                              ERR("Consumer add stream %d failed. 
> >>>>>> Continuing",
> >>>>>> -                                              new_stream->key);
> >>>>>> -                              /*
> >>>>>> -                               * At this point, if the add_stream 
> >>>>>> fails, it is not in the
> >>>>>> -                               * hash table thus passing the NULL 
> >>>>>> value here.
> >>>>>> -                               */
> >>>>>> +                      do {
> >>>>>> +                              ret = write(ctx->consumer_poll_pipe[1], 
> >>>>>> &new_stream,
> >>>>>> +                                              sizeof(new_stream));
> >>>>>> +                      } while (ret < 0 && errno == EINTR);
> >>>>>> +                      if (ret < 0) {
> >>>>>> +                              PERROR("write data pipe");
> >>>>>>                                consumer_del_stream(new_stream, NULL);
> >>>>>>                                goto end_nosignal;
> >>>>>>                        }
> >>>>>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct 
> >>>>>> lttng_consumer_local_data *ctx,
> >>>>>>                break;
> >>>>>>        }
> >>>>>>  
> >>>>>> -      /*
> >>>>>> -       * Wake-up the other end by writing a null byte in the pipe 
> >>>>>> (non-blocking).
> >>>>>> -       * Important note: Because writing into the pipe is 
> >>>>>> non-blocking (and
> >>>>>> -       * therefore we allow dropping wakeup data, as long as there is 
> >>>>>> wakeup data
> >>>>>> -       * present in the pipe buffer to wake up the other end), the 
> >>>>>> other end
> >>>>>> -       * should perform the following sequence for waiting:
> >>>>>> -       *
> >>>>>> -       * 1) empty the pipe (reads).
> >>>>>> -       * 2) perform update operation.
> >>>>>> -       * 3) wait on the pipe (poll).
> >>>>>> -       */
> >>>>>> -      do {
> >>>>>> -              ret = write(ctx->consumer_poll_pipe[1], "", 1);
> >>>>>> -      } while (ret < 0 && errno == EINTR);
> >>>>>>  end_nosignal:
> >>>>>>        rcu_read_unlock();
> >>>>>>  
> >>>>>> -- 
> >>>>>> 1.7.10.4
> >>>>>>
> >>>>>>
> >>>>>> _______________________________________________
> >>>>>> lttng-dev mailing list
> >>>>>> [email protected]
> >>>>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
> >>>>>
> >>>
> > 

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to