* David Goulet ([email protected]) wrote:
> Also, an important change here is that this pipe is no longer in non
> block mode. Before sending stream's pointer over this pipe, only one
> byte was written thus making it unlikely to fail in a read/write race
> condition between threads. Now, 4 bytes are written so keeping this pipe
> non block with threads is a bit of a "looking for trouble situation".
> 
> The lttng pipe wrappers make sure that the read and write side are
> synchronized between threads using a mutex for each side. Furthermore,
> the read and write handle partial I/O and EINTR meaning that once the
> call returns we are sure that either everything was read/written or an
> error occured thus making it not possible for the read side to block
> indefinitely after a poll event.
> 
> Signed-off-by: David Goulet <[email protected]>
> ---
>  src/common/consumer.c                        |   53 
> ++++++++++++--------------
>  src/common/consumer.h                        |    3 +-
>  src/common/kernel-consumer/kernel-consumer.c |    3 +-
>  src/common/ust-consumer/ust-consumer.c       |    2 +-
>  4 files changed, 29 insertions(+), 32 deletions(-)
> 
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index 01266a7..bd618dd 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -91,6 +91,20 @@ static void notify_thread_pipe(int wpipe)
>       } while (ret < 0 && errno == EINTR);
>  }
>  
> +/*
> + * Notify a thread lttng pipe to poll back again. This usually means that 
> some
> + * global state has changed so we just send back the thread in a poll wait
> + * call.
> + */
> +static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
> +{
> +     struct lttng_consumer_stream *null_stream = NULL;
> +
> +     assert(pipe);
> +
> +     (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
> +}
> +
>  static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
>               struct lttng_consumer_channel *chan,
>               uint64_t key,
> @@ -406,7 +420,7 @@ static void cleanup_relayd(struct 
> consumer_relayd_sock_pair *relayd,
>        * read of this status which happens AFTER receiving this notify.
>        */
>       if (ctx) {
> -             notify_thread_pipe(ctx->consumer_data_pipe[1]);
> +             notify_thread_lttng_pipe(ctx->consumer_data_pipe);
>               notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
>       }
>  }
> @@ -971,7 +985,7 @@ static int update_poll_array(struct 
> lttng_consumer_local_data *ctx,
>        * Insert the consumer_data_pipe at the end of the array and don't
>        * increment i so nb_fd is the number of real FD.
>        */
> -     (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
> +     (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
>       (*pollfd)[i].events = POLLIN | POLLPRI;
>       return i;
>  }
> @@ -1167,26 +1181,11 @@ struct lttng_consumer_local_data 
> *lttng_consumer_create(
>       ctx->on_recv_stream = recv_stream;
>       ctx->on_update_stream = update_stream;
>  
> -     ret = pipe(ctx->consumer_data_pipe);
> -     if (ret < 0) {
> -             PERROR("Error creating poll pipe");
> +     ctx->consumer_data_pipe = lttng_pipe_open(0);
> +     if (!ctx->consumer_data_pipe) {
>               goto error_poll_pipe;
>       }
>  
> -     /* set read end of the pipe to non-blocking */
> -     ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
> -     if (ret < 0) {
> -             PERROR("fcntl O_NONBLOCK");
> -             goto error_poll_fcntl;
> -     }
> -
> -     /* set write end of the pipe to non-blocking */
> -     ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
> -     if (ret < 0) {
> -             PERROR("fcntl O_NONBLOCK");
> -             goto error_poll_fcntl;
> -     }
> -
>       ret = pipe(ctx->consumer_should_quit);
>       if (ret < 0) {
>               PERROR("Error creating recv pipe");
> @@ -1225,9 +1224,8 @@ error_channel_pipe:
>       utils_close_pipe(ctx->consumer_thread_pipe);
>  error_thread_pipe:
>       utils_close_pipe(ctx->consumer_should_quit);
> -error_poll_fcntl:
>  error_quit_pipe:
> -     utils_close_pipe(ctx->consumer_data_pipe);
> +     lttng_pipe_destroy(ctx->consumer_data_pipe);
>  error_poll_pipe:
>       free(ctx);
>  error:
> @@ -1253,7 +1251,7 @@ void lttng_consumer_destroy(struct 
> lttng_consumer_local_data *ctx)
>       }
>       utils_close_pipe(ctx->consumer_thread_pipe);
>       utils_close_pipe(ctx->consumer_channel_pipe);
> -     utils_close_pipe(ctx->consumer_data_pipe);
> +     lttng_pipe_destroy(ctx->consumer_data_pipe);
>       utils_close_pipe(ctx->consumer_should_quit);
>       utils_close_pipe(ctx->consumer_splice_metadata_pipe);
>  
> @@ -2402,13 +2400,10 @@ void *consumer_thread_data_poll(void *data)
>                       ssize_t pipe_readlen;
>  
>                       DBG("consumer_data_pipe wake up");
> -                     /* Consume 1 byte of pipe data */
> -                     do {
> -                             pipe_readlen = read(ctx->consumer_data_pipe[0], 
> &new_stream,
> -                                             sizeof(new_stream));
> -                     } while (pipe_readlen == -1 && errno == EINTR);
> +                     pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
> +                                     &new_stream, sizeof(new_stream));
>                       if (pipe_readlen < 0) {

can lttng_pipe_read return a value smaller than sizeof(new_stream) on
some error conditions ? (but higher than 0)

Thanks,

Mathieu

> -                             PERROR("read consumer data pipe");
> +                             ERR("Consumer data pipe ret %ld", pipe_readlen);
>                               /* Continue so we can at least handle the 
> current stream(s). */
>                               continue;
>                       }
> @@ -2968,7 +2963,7 @@ end:
>        * Notify the data poll thread to poll back again and test the
>        * consumer_quit state that we just set so to quit gracefully.
>        */
> -     notify_thread_pipe(ctx->consumer_data_pipe[1]);
> +     notify_thread_lttng_pipe(ctx->consumer_data_pipe);
>  
>       notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
>  
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 43989e4..91039e8 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -31,6 +31,7 @@
>  #include <common/compat/fcntl.h>
>  #include <common/compat/uuid.h>
>  #include <common/sessiond-comm/sessiond-comm.h>
> +#include <common/pipe.h>
>  
>  /* Commands for consumer */
>  enum lttng_consumer_command {
> @@ -346,7 +347,7 @@ struct lttng_consumer_local_data {
>       int consumer_channel_pipe[2];
>       int consumer_splice_metadata_pipe[2];
>       /* Data stream poll thread pipe. To transfer data stream to the thread 
> */
> -     int consumer_data_pipe[2];
> +     struct lttng_pipe *consumer_data_pipe;
>       /* to let the signal handler wake up the fd receiver thread */
>       int consumer_should_quit[2];
>       /* Metadata poll thread pipe. Transfer metadata stream to it */
> diff --git a/src/common/kernel-consumer/kernel-consumer.c 
> b/src/common/kernel-consumer/kernel-consumer.c
> index 2cf9ac1..d8aec49 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -34,6 +34,7 @@
>  #include <common/sessiond-comm/sessiond-comm.h>
>  #include <common/sessiond-comm/relayd.h>
>  #include <common/compat/fcntl.h>
> +#include <common/pipe.h>
>  #include <common/relayd/relayd.h>
>  #include <common/utils.h>
>  
> @@ -289,7 +290,7 @@ int lttng_kconsumer_recv_cmd(struct 
> lttng_consumer_local_data *ctx,
>               if (new_stream->metadata_flag) {
>                       stream_pipe = ctx->consumer_metadata_pipe[1];
>               } else {
> -                     stream_pipe = ctx->consumer_data_pipe[1];
> +                     stream_pipe = 
> lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>               }
>  
>               do {
> diff --git a/src/common/ust-consumer/ust-consumer.c 
> b/src/common/ust-consumer/ust-consumer.c
> index 031a7cb..ddf80da 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -191,7 +191,7 @@ static int send_stream_to_thread(struct 
> lttng_consumer_stream *stream,
>       if (stream->metadata_flag) {
>               stream_pipe = ctx->consumer_metadata_pipe[1];
>       } else {
> -             stream_pipe = ctx->consumer_data_pipe[1];
> +             stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>       }
>  
>       do {
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> lttng-dev mailing list
> [email protected]
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to