* David Goulet ([email protected]) wrote:
> The read() call in the metadata thread is also changed to use the lttng
> pipe read wrapper.


Acked-by: Mathieu Desnoyers <[email protected]>

> 
> Signed-off-by: David Goulet <[email protected]>
> ---
>  src/common/consumer.c                        |   58 
> +++++++++-----------------
>  src/common/consumer.h                        |    2 +-
>  src/common/kernel-consumer/kernel-consumer.c |    2 +-
>  src/common/ust-consumer/ust-consumer.c       |    2 +-
>  4 files changed, 22 insertions(+), 42 deletions(-)
> 
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index ca51ce0..c63e6e6 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -77,21 +77,6 @@ static struct lttng_ht *metadata_ht;
>  static struct lttng_ht *data_ht;
>  
>  /*
> - * Notify a thread pipe to poll back again. This usually means that some 
> global
> - * state has changed so we just send back the thread in a poll wait call.
> - */
> -static void notify_thread_pipe(int wpipe)
> -{
> -     int ret;
> -
> -     do {
> -             struct lttng_consumer_stream *null_stream = NULL;
> -
> -             ret = write(wpipe, &null_stream, sizeof(null_stream));
> -     } while (ret < 0 && errno == EINTR);
> -}
> -
> -/*
>   * Notify a thread lttng pipe to poll back again. This usually means that 
> some
>   * global state has changed so we just send back the thread in a poll wait
>   * call.
> @@ -423,7 +408,7 @@ static void cleanup_relayd(struct 
> consumer_relayd_sock_pair *relayd,
>        */
>       if (ctx) {
>               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
> -             notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
> +             notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
>       }
>  }
>  
> @@ -1206,8 +1191,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
>               goto error_channel_pipe;
>       }
>  
> -     ret = utils_create_pipe(ctx->consumer_metadata_pipe);
> -     if (ret < 0) {
> +     ctx->consumer_metadata_pipe = lttng_pipe_open(0);
> +     if (!ctx->consumer_metadata_pipe) {
>               goto error_metadata_pipe;
>       }
>  
> @@ -1219,7 +1204,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
>       return ctx;
>  
>  error_splice_pipe:
> -     utils_close_pipe(ctx->consumer_metadata_pipe);
> +     lttng_pipe_destroy(ctx->consumer_metadata_pipe);
>  error_metadata_pipe:
>       utils_close_pipe(ctx->consumer_channel_pipe);
>  error_channel_pipe:
> @@ -1254,6 +1239,7 @@ void lttng_consumer_destroy(struct 
> lttng_consumer_local_data *ctx)
>       utils_close_pipe(ctx->consumer_thread_pipe);
>       utils_close_pipe(ctx->consumer_channel_pipe);
>       lttng_pipe_destroy(ctx->consumer_data_pipe);
> +     lttng_pipe_destroy(ctx->consumer_metadata_pipe);
>       utils_close_pipe(ctx->consumer_should_quit);
>       utils_close_pipe(ctx->consumer_splice_metadata_pipe);
>  
> @@ -2134,7 +2120,8 @@ void *consumer_thread_metadata_poll(void *data)
>               goto end_poll;
>       }
>  
> -     ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
> +     ret = lttng_poll_add(&events,
> +                     lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), 
> LPOLLIN);
>       if (ret < 0) {
>               goto end;
>       }
> @@ -2172,30 +2159,26 @@ restart:
>                               continue;
>                       }
>  
> -                     if (pollfd == ctx->consumer_metadata_pipe[0]) {
> +                     if (pollfd == 
> lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
>                               if (revents & (LPOLLERR | LPOLLHUP )) {
>                                       DBG("Metadata thread pipe hung up");
>                                       /*
>                                        * Remove the pipe from the poll set 
> and continue the loop
>                                        * since their might be data to consume.
>                                        */
> -                                     lttng_poll_del(&events, 
> ctx->consumer_metadata_pipe[0]);
> -                                     ret = 
> close(ctx->consumer_metadata_pipe[0]);
> -                                     if (ret < 0) {
> -                                             PERROR("close metadata pipe");
> -                                     }
> +                                     lttng_poll_del(&events,
> +                                                     
> lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
> +                                     
> lttng_pipe_read_close(ctx->consumer_metadata_pipe);
>                                       continue;
>                               } else if (revents & LPOLLIN) {
> -                                     do {
> -                                             /* Get the stream pointer 
> received */
> -                                             ret = read(pollfd, &stream, 
> sizeof(stream));
> -                                     } while (ret < 0 && errno == EINTR);
> -                                     if (ret < 0 ||
> -                                                     ret < sizeof(struct 
> lttng_consumer_stream *)) {
> -                                             PERROR("read metadata stream");
> +                                     ssize_t pipe_len;
> +
> +                                     pipe_len = 
> lttng_pipe_read(ctx->consumer_metadata_pipe,
> +                                                     &stream, 
> sizeof(stream));
> +                                     if (pipe_len < 0) {
> +                                             ERR("read metadata stream, ret: 
> %ld", pipe_len);
>                                               /*
> -                                              * Let's continue here and hope 
> we can still work
> -                                              * without stopping the 
> consumer. XXX: Should we?
> +                                              * Continue here to handle the 
> rest of the streams.
>                                                */
>                                               continue;
>                                       }
> @@ -2543,10 +2526,7 @@ end:
>        * only tracked fd in the poll set. The thread will take care of closing
>        * the read side.
>        */
> -     ret = close(ctx->consumer_metadata_pipe[1]);
> -     if (ret < 0) {
> -             PERROR("close data pipe");
> -     }
> +     (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
>  
>       destroy_data_stream_ht(data_ht);
>  
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 91039e8..3726fd1 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -351,7 +351,7 @@ struct lttng_consumer_local_data {
>       /* to let the signal handler wake up the fd receiver thread */
>       int consumer_should_quit[2];
>       /* Metadata poll thread pipe. Transfer metadata stream to it */
> -     int consumer_metadata_pipe[2];
> +     struct lttng_pipe *consumer_metadata_pipe;
>  };
>  
>  /*
> diff --git a/src/common/kernel-consumer/kernel-consumer.c 
> b/src/common/kernel-consumer/kernel-consumer.c
> index d8aec49..f23fc9c 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -288,7 +288,7 @@ int lttng_kconsumer_recv_cmd(struct 
> lttng_consumer_local_data *ctx,
>  
>               /* Get the right pipe where the stream will be sent. */
>               if (new_stream->metadata_flag) {
> -                     stream_pipe = ctx->consumer_metadata_pipe[1];
> +                     stream_pipe = 
> lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
>               } else {
>                       stream_pipe = 
> lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>               }
> diff --git a/src/common/ust-consumer/ust-consumer.c 
> b/src/common/ust-consumer/ust-consumer.c
> index ddf80da..a81e9d4 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -189,7 +189,7 @@ static int send_stream_to_thread(struct 
> lttng_consumer_stream *stream,
>  
>       /* Get the right pipe where the stream will be sent. */
>       if (stream->metadata_flag) {
> -             stream_pipe = ctx->consumer_metadata_pipe[1];
> +             stream_pipe = 
> lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
>       } else {
>               stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>       }
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> lttng-dev mailing list
> [email protected]
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to