Mathieu Desnoyers:
> * David Goulet ([email protected]) wrote:
>> As a second step of refactoring, upon receiving a data stream, we send
>> it to the data thread that is now in charge of handling it.
>>
>> Furthermore, in order for this to behave correctly, we have to make the
>> ustctl actions on the stream upon before passing it to the right thread
>> (the kernel does not need special actions.). This way, once the sessiond
>> thread reply back to the session daemon, the stream is sure to be open
>> and ready for data to be recorded on the application side so we avoid a
>> race between the application thinking the stream is ready and the stream
>> thread still scheduled out.
> 
> Normally, as long as we have a reference on the SHM file descriptor, and
> we have the wakeup FD, we should be good to fetch the data of buffers
> belonging to an application that has already exited, even if it did so
> before the ustctl calls are done.
> 
> So I'm wondering why you do the ustctl calls in the sessiond thread ? It
> seems to complexify the implementation needlessly: we could still do the
> ustctl calls and output file open at the same location, the
> data/metadata threads.

Hmmm, it was my understanding that does ustctl_* calls were needed
before the trace could be recording thus making them quickly. Wrong?

David

> 
> Thanks,
> 
> Mathieu
> 
>>
>> This commit should speed up the add stream process for the session
>> daemon. There is still some actions to move out of the session daemon
>> poll thread to gain speed significantly, especially for network
>> streaming.
>>
>> Signed-off-by: David Goulet <[email protected]>
>> ---
>>  src/common/consumer.c                        |  123 
>> +++++++++++---------------
>>  src/common/consumer.h                        |    1 +
>>  src/common/kernel-consumer/kernel-consumer.c |   24 ++---
>>  src/common/ust-consumer/ust-consumer.c       |   40 ++++-----
>>  4 files changed, 78 insertions(+), 110 deletions(-)
>>
>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>> index 055de1b..1d2b1f7 100644
>> --- a/src/common/consumer.c
>> +++ b/src/common/consumer.c
>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream 
>> *consumer_find_stream(int key,
>>      return stream;
>>  }
>>  
>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>>  {
>>      struct lttng_consumer_stream *stream;
>>  
>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>>      lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
>>      lttng_ht_node_init_ulong(&stream->node, stream->key);
>>  
>> +    /*
>> +     * The cpu number is needed before using any ustctl_* actions. Ignored 
>> for
>> +     * the kernel so the value does not matter.
>> +     */
>> +    pthread_mutex_lock(&consumer_data.lock);
>> +    stream->cpu = stream->chan->cpucount++;
>> +    pthread_mutex_unlock(&consumer_data.lock);
>> +
>>      DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len 
>> %llu,"
>>                      " out_fd %d, net_seq_idx %d)", stream->path_name, 
>> stream->key,
>>                      stream->shm_fd, stream->wait_fd,
>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream 
>> *stream)
>>      pthread_mutex_lock(&consumer_data.lock);
>>      rcu_read_lock();
>>  
>> -    switch (consumer_data.type) {
>> -    case LTTNG_CONSUMER_KERNEL:
>> -            break;
>> -    case LTTNG_CONSUMER32_UST:
>> -    case LTTNG_CONSUMER64_UST:
>> -            stream->cpu = stream->chan->cpucount++;
>> -            ret = lttng_ustconsumer_add_stream(stream);
>> -            if (ret) {
>> -                    ret = -EINVAL;
>> -                    goto error;
>> -            }
>> -
>> -            /* Steal stream identifier only for UST */
>> -            consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>> -            break;
>> -    default:
>> -            ERR("Unknown consumer_data type");
>> -            assert(0);
>> -            ret = -ENOSYS;
>> -            goto error;
>> -    }
>> -
>>      lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>>  
>>      /* Check and cleanup relayd */
>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream 
>> *stream)
>>      consumer_data.stream_count++;
>>      consumer_data.need_update = 1;
>>  
>> -error:
>>      rcu_read_unlock();
>>      pthread_mutex_unlock(&consumer_data.lock);
>>  
>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct 
>> lttng_consumer_stream *stream,
>>  
>>      DBG3("Consumer delete metadata stream %d", stream->wait_fd);
>>  
>> -    if (ht == NULL) {
>> -            /* Means the stream was allocated but not successfully added */
>> -            goto free_stream;
>> -    }
>> -
>> -    rcu_read_lock();
>> -    iter.iter.node = &stream->waitfd_node.node;
>> -    ret = lttng_ht_del(ht, &iter);
>> -    assert(!ret);
>> -    rcu_read_unlock();
>> -
>>      pthread_mutex_lock(&consumer_data.lock);
>>      switch (consumer_data.type) {
>>      case LTTNG_CONSUMER_KERNEL:
>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct 
>> lttng_consumer_stream *stream,
>>              goto end;
>>      }
>>  
>> +    if (ht == NULL) {
>> +            pthread_mutex_unlock(&consumer_data.lock);
>> +            /* Means the stream was allocated but not successfully added */
>> +            goto free_stream;
>> +    }
>> +
>> +    rcu_read_lock();
>> +    iter.iter.node = &stream->waitfd_node.node;
>> +    ret = lttng_ht_del(ht, &iter);
>> +    assert(!ret);
>> +    rcu_read_unlock();
>> +
>>      if (stream->out_fd >= 0) {
>>              ret = close(stream->out_fd);
>>              if (ret) {
>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct 
>> lttng_consumer_stream *stream,
>>  
>>      pthread_mutex_lock(&consumer_data.lock);
>>  
>> -    switch (consumer_data.type) {
>> -    case LTTNG_CONSUMER_KERNEL:
>> -            break;
>> -    case LTTNG_CONSUMER32_UST:
>> -    case LTTNG_CONSUMER64_UST:
>> -            ret = lttng_ustconsumer_add_stream(stream);
>> -            if (ret) {
>> -                    ret = -EINVAL;
>> -                    goto error;
>> -            }
>> -
>> -            /* Steal stream identifier only for UST */
>> -            consumer_steal_stream_key(stream->wait_fd, ht);
>> -            break;
>> -    default:
>> -            ERR("Unknown consumer_data type");
>> -            assert(0);
>> -            ret = -ENOSYS;
>> -            goto error;
>> -    }
>> -
>>      /*
>>       * From here, refcounts are updated so be _careful_ when returning an 
>> error
>>       * after this point.
>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct 
>> lttng_consumer_stream *stream,
>>      lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
>>      rcu_read_unlock();
>>  
>> -error:
>>      pthread_mutex_unlock(&consumer_data.lock);
>>      return ret;
>>  }
>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data)
>>      int num_rdy, num_hup, high_prio, ret, i;
>>      struct pollfd *pollfd = NULL;
>>      /* local view of the streams */
>> -    struct lttng_consumer_stream **local_stream = NULL;
>> +    struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
>>      /* local view of consumer_data.fds_count */
>>      int nb_fd = 0;
>>      struct lttng_consumer_local_data *ctx = data;
>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data)
>>               */
>>              if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
>>                      size_t pipe_readlen;
>> -                    char tmp;
>>  
>>                      DBG("consumer_poll_pipe wake up");
>>                      /* Consume 1 byte of pipe data */
>>                      do {
>> -                            pipe_readlen = read(ctx->consumer_poll_pipe[0], 
>> &tmp, 1);
>> +                            pipe_readlen = read(ctx->consumer_poll_pipe[0], 
>> &new_stream,
>> +                                            sizeof(new_stream));
>>                      } while (pipe_readlen == -1 && errno == EINTR);
>> +
>> +                    /*
>> +                     * If the stream is NULL, just ignore it. It's also 
>> possible that
>> +                     * the sessiond poll thread changed the consumer_quit 
>> state and is
>> +                     * waking us up to test it.
>> +                     */
>> +                    if (new_stream == NULL) {
>> +                            continue;
>> +                    }
>> +
>> +                    ret = consumer_add_stream(new_stream);
>> +                    if (ret) {
>> +                            ERR("Consumer add stream %d failed. Continuing",
>> +                                            new_stream->key);
>> +                            /*
>> +                             * At this point, if the add_stream fails, it 
>> is not in the
>> +                             * hash table thus passing the NULL value here.
>> +                             */
>> +                            consumer_del_stream(new_stream, NULL);
>> +                    }
>> +
>> +                    /* Continue to update the local streams and handle prio 
>> ones */
>>                      continue;
>>              }
>>  
>> @@ -2260,19 +2246,16 @@ end:
>>      consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
>>  
>>      /*
>> -     * Wake-up the other end by writing a null byte in the pipe
>> -     * (non-blocking). Important note: Because writing into the
>> -     * pipe is non-blocking (and therefore we allow dropping wakeup
>> -     * data, as long as there is wakeup data present in the pipe
>> -     * buffer to wake up the other end), the other end should
>> -     * perform the following sequence for waiting:
>> -     * 1) empty the pipe (reads).
>> -     * 2) perform update operation.
>> -     * 3) wait on the pipe (poll).
>> +     * Notify the data poll thread to poll back again and test the
>> +     * consumer_quit state to quit gracefully.
>>       */
>>      do {
>> -            ret = write(ctx->consumer_poll_pipe[1], "", 1);
>> +            struct lttng_consumer_stream *null_stream = NULL;
>> +
>> +            ret = write(ctx->consumer_poll_pipe[1], &null_stream,
>> +                            sizeof(null_stream));
>>      } while (ret < 0 && errno == EINTR);
>> +
>>      rcu_unregister_thread();
>>      return NULL;
>>  }
>> diff --git a/src/common/consumer.h b/src/common/consumer.h
>> index 4b225e4..8e5891a 100644
>> --- a/src/common/consumer.h
>> +++ b/src/common/consumer.h
>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair 
>> *consumer_allocate_relayd_sock_pair(
>>  struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
>>  int consumer_handle_stream_before_relayd(struct lttng_consumer_stream 
>> *stream,
>>              size_t data_size);
>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht);
>>  
>>  extern struct lttng_consumer_local_data *lttng_consumer_create(
>>              enum lttng_consumer_type type,
>> diff --git a/src/common/kernel-consumer/kernel-consumer.c 
>> b/src/common/kernel-consumer/kernel-consumer.c
>> index 13cbe21..444f5e0 100644
>> --- a/src/common/kernel-consumer/kernel-consumer.c
>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct 
>> lttng_consumer_local_data *ctx,
>>                              consumer_del_stream(new_stream, NULL);
>>                      }
>>              } else {
>> -                    ret = consumer_add_stream(new_stream);
>> -                    if (ret) {
>> -                            ERR("Consumer add stream %d failed. Continuing",
>> -                                            new_stream->key);
>> +                    do {
>> +                            ret = write(ctx->consumer_poll_pipe[1], 
>> &new_stream,
>> +                                            sizeof(new_stream));
>> +                    } while (ret < 0 && errno == EINTR);
>> +                    if (ret < 0) {
>> +                            PERROR("write data pipe");
>>                              consumer_del_stream(new_stream, NULL);
>>                              goto end_nosignal;
>>                      }
>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct 
>> lttng_consumer_local_data *ctx,
>>              goto end_nosignal;
>>      }
>>  
>> -    /*
>> -     * Wake-up the other end by writing a null byte in the pipe 
>> (non-blocking).
>> -     * Important note: Because writing into the pipe is non-blocking (and
>> -     * therefore we allow dropping wakeup data, as long as there is wakeup 
>> data
>> -     * present in the pipe buffer to wake up the other end), the other end
>> -     * should perform the following sequence for waiting:
>> -     *
>> -     * 1) empty the pipe (reads).
>> -     * 2) perform update operation.
>> -     * 3) wait on the pipe (poll).
>> -     */
>> -    do {
>> -            ret = write(ctx->consumer_poll_pipe[1], "", 1);
>> -    } while (ret < 0 && errno == EINTR);
>>  end_nosignal:
>>      rcu_read_unlock();
>>  
>> diff --git a/src/common/ust-consumer/ust-consumer.c 
>> b/src/common/ust-consumer/ust-consumer.c
>> index 1170687..4ca4b84 100644
>> --- a/src/common/ust-consumer/ust-consumer.c
>> +++ b/src/common/ust-consumer/ust-consumer.c
>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct 
>> lttng_consumer_local_data *ctx,
>>                      goto end_nosignal;
>>              }
>>  
>> +            /*
>> +             * This needs to be done as soon as we can so we don't block the
>> +             * application too long.
>> +             */
>> +            ret = lttng_ustconsumer_add_stream(new_stream);
>> +            if (ret) {
>> +                    consumer_del_stream(new_stream, NULL);
>> +                    goto end_nosignal;
>> +            }
>> +            /* Steal stream identifier to avoid having streams with the 
>> same key */
>> +            consumer_steal_stream_key(new_stream->key, 
>> consumer_data.stream_ht);
>> +
>>              /* The stream is not metadata. Get relayd reference if exists. 
>> */
>>              relayd = consumer_find_relayd(msg.u.stream.net_index);
>>              if (relayd != NULL) {
>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct 
>> lttng_consumer_local_data *ctx,
>>                              goto end_nosignal;
>>                      }
>>              } else {
>> -                    ret = consumer_add_stream(new_stream);
>> -                    if (ret) {
>> -                            ERR("Consumer add stream %d failed. Continuing",
>> -                                            new_stream->key);
>> -                            /*
>> -                             * At this point, if the add_stream fails, it 
>> is not in the
>> -                             * hash table thus passing the NULL value here.
>> -                             */
>> +                    do {
>> +                            ret = write(ctx->consumer_poll_pipe[1], 
>> &new_stream,
>> +                                            sizeof(new_stream));
>> +                    } while (ret < 0 && errno == EINTR);
>> +                    if (ret < 0) {
>> +                            PERROR("write data pipe");
>>                              consumer_del_stream(new_stream, NULL);
>>                              goto end_nosignal;
>>                      }
>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct 
>> lttng_consumer_local_data *ctx,
>>              break;
>>      }
>>  
>> -    /*
>> -     * Wake-up the other end by writing a null byte in the pipe 
>> (non-blocking).
>> -     * Important note: Because writing into the pipe is non-blocking (and
>> -     * therefore we allow dropping wakeup data, as long as there is wakeup 
>> data
>> -     * present in the pipe buffer to wake up the other end), the other end
>> -     * should perform the following sequence for waiting:
>> -     *
>> -     * 1) empty the pipe (reads).
>> -     * 2) perform update operation.
>> -     * 3) wait on the pipe (poll).
>> -     */
>> -    do {
>> -            ret = write(ctx->consumer_poll_pipe[1], "", 1);
>> -    } while (ret < 0 && errno == EINTR);
>>  end_nosignal:
>>      rcu_read_unlock();
>>  
>> -- 
>> 1.7.10.4
>>
>>
>> _______________________________________________
>> lttng-dev mailing list
>> [email protected]
>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
> 

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to