* Julien Desfossez ([email protected]) wrote:
[...]
>       /* function to call when data is available on a buffer */
>       int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd);
> +     /*
> +      * function to call when we receive a new fd, it receives a newly 
> allocated
> +      * kconsumerd_fd, if it returns the FD (as seen by the sessiond daemon :
> +      * sessiond_fd), the FD will be handled by the lib in the local FD list,
> +      * otherwise we assume the external consumer is taking care of it.
> +      */

Rather than returning the "same" fd, which is a little weird, we could
just return:

- > 0 (success, FD is kept by application)
- == 0 (success, FD is left to library)
- < 0 (error)

- also add an argument   int *fd   to the function though which is would
  be returned. I prefer not to use the return value for this, because
theoretically the fd number "0" could be used as a valid fd number if
the application close stdout.

Thanks,

Mathieu

> +     int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd);
> +     /*
> +      * function to call when a FD is getting updated by the session daemon,
> +      * this function receives the FD as seen by the session daemon
> +      * (sessiond_fd) and the new state, if it returns the fd, it will be
> +      * handled locally by the lib, otherwise we assume the consumer is 
> taking
> +      * care of it.
> +      */
> +     int (*on_update_fd)(int sessiond_fd, uint32_t state);
>       /* socket to communicate errors with sessiond */
>       int kconsumerd_error_socket;
>       /* socket to exchange commands with sessiond */
> @@ -98,15 +113,15 @@ struct lttng_kconsumerd_local_data {
>   * - create the should_quit pipe (for signal handler)
>   * - create the thread pipe (for splice)
>   *
> - * Takes a function pointer as argument, this function is called when data is
> - * available on a buffer. This function is responsible to do the
> - * kernctl_get_next_subbuf, read the data with mmap or splice depending on 
> the
> - * buffer configuration and then kernctl_put_next_subbuf at the end.
> + * Takes the function pointers to the on_buffer_ready, on_recv_fd, and
> + * on_update_fd callbacks.
>   *
>   * Returns a pointer to the new context or NULL on error.
>   */
>  extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
> -             int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd));
> +             int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +             int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +             int (*update_fd)(int sessiond_fd, uint32_t state));
>  
>  /*
>   * Close all fds associated with the instance and free the context.
> diff --git a/liblttngkconsumerd/lttngkconsumerd.c 
> b/liblttngkconsumerd/lttngkconsumerd.c
> index d36da9b..ed7951b 100644
> --- a/liblttngkconsumerd/lttngkconsumerd.c
> +++ b/liblttngkconsumerd/lttngkconsumerd.c
> @@ -125,22 +125,21 @@ static void kconsumerd_del_fd(struct 
> lttng_kconsumerd_fd *lcf)
>  }
>  
>  /*
> - * Add a fd to the global list protected by a mutex.
> + * Create a struct lttcomm_kconsumerd_msg from the
> + * information received on the receiving socket
>   */
> -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
> +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
> +             struct lttcomm_kconsumerd_msg *buf,
>               int consumerd_fd)
>  {
>       struct lttng_kconsumerd_fd *tmp_fd;
> -     int ret = 0;
>  
> -     pthread_mutex_lock(&kconsumerd_data.lock);
> -     /* Check if already exist */
> -     ret = kconsumerd_find_session_fd(buf->fd);
> -     if (ret == 1) {
> +     tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
> +     if (tmp_fd == NULL) {
> +             perror("malloc struct lttng_kconsumerd_fd");
>               goto end;
>       }
>  
> -     tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
>       tmp_fd->sessiond_fd = buf->fd;
>       tmp_fd->consumerd_fd = consumerd_fd;
>       tmp_fd->state = buf->state;
> @@ -152,42 +151,31 @@ static int kconsumerd_add_fd(struct 
> lttcomm_kconsumerd_msg *buf,
>       tmp_fd->output = buf->output;
>       strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
>       tmp_fd->path_name[PATH_MAX - 1] = '\0';
> +     DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
> +                     tmp_fd->path_name, tmp_fd->sessiond_fd,
> +                     tmp_fd->consumerd_fd, tmp_fd->out_fd);
>  
> -     /* Opening the tracefile in write mode */
> -     if (tmp_fd->path_name != NULL) {
> -             ret = open(tmp_fd->path_name,
> -                             O_WRONLY|O_CREAT|O_TRUNC, 
> S_IRWXU|S_IRWXG|S_IRWXO);
> -             if (ret < 0) {
> -                     ERR("Opening %s", tmp_fd->path_name);
> -                     perror("open");
> -                     goto end;
> -             }
> -             tmp_fd->out_fd = ret;
> -             DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
> -                             tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, 
> tmp_fd->out_fd);
> -     }
> +end:
> +     return tmp_fd;
> +}
>  
> -     if (tmp_fd->output == LTTNG_EVENT_MMAP) {
> -             /* get the len of the mmap region */
> -             ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, 
> &tmp_fd->mmap_len);
> -             if (ret != 0) {
> -                     ret = errno;
> -                     perror("kernctl_get_mmap_len");
> -                     goto end;
> -             }
> +/*
> + * Add a fd to the global list protected by a mutex.
> + */
> +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
> +{
> +     int ret;
>  
> -             tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
> -                             PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 
> 0);
> -             if (tmp_fd->mmap_base == MAP_FAILED) {
> -                     perror("Error mmaping");
> -                     ret = -1;
> -                     goto end;
> -             }
> +     pthread_mutex_lock(&kconsumerd_data.lock);
> +     /* Check if already exist */
> +     ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
> +     if (ret == 1) {
> +             goto end;
>       }
> -
>       cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
>       kconsumerd_data.fds_count++;
>       kconsumerd_data.need_update = 1;
> +
>  end:
>       pthread_mutex_unlock(&kconsumerd_data.lock);
>       return ret;
> @@ -263,6 +251,7 @@ static int kconsumerd_consumerd_recv_fd(
>       int nb_fd;
>       char recv_fd[CMSG_SPACE(sizeof(int))];
>       struct lttcomm_kconsumerd_msg lkm;
> +     struct lttng_kconsumerd_fd *new_fd;
>  
>       /* the number of fds we are about to receive */
>       nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
> @@ -313,14 +302,40 @@ static int kconsumerd_consumerd_recv_fd(
>                                       DBG("kconsumerd_add_fd %s (%d)", 
> lkm.path_name,
>                                                       ((int *) 
> CMSG_DATA(cmsg))[0]);
>  
> -                                     ret = kconsumerd_add_fd(&lkm, ((int *) 
> CMSG_DATA(cmsg))[0]);
> -                                     if (ret < 0) {
> +                                     new_fd = kconsumerd_allocate_fd(&lkm, 
> ((int *) CMSG_DATA(cmsg))[0]);
> +                                     if (new_fd == NULL) {
>                                               
> lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
>                                               goto end;
>                                       }
> +
> +                                     if (ctx->on_recv_fd != NULL) {
> +                                             ret = ctx->on_recv_fd(new_fd);
> +                                             /*
> +                                              * if we receive the FD back, 
> we insert it in the local
> +                                              * FD list, otherwise we assume 
> it is handled by the
> +                                              * external consumer.
> +                                              */
> +                                             if (ret == new_fd->sessiond_fd) 
> {
> +                                                     
> kconsumerd_add_fd(new_fd);
> +                                             }
> +                                     } else {
> +                                             kconsumerd_add_fd(new_fd);
> +                                     }
>                                       break;
>                               case UPDATE_STREAM:
> -                                     kconsumerd_change_fd_state(lkm.fd, 
> lkm.state);
> +                                     if (ctx->on_update_fd != NULL) {
> +                                             ret = ctx->on_update_fd(lkm.fd, 
> lkm.state);
> +                                             /*
> +                                              * if we receive the FD back, 
> we have to handle it locally,
> +                                              * otherwise we assume the 
> external consumer is taking care
> +                                              * of it.
> +                                              */
> +                                             if (ret == lkm.fd) {
> +                                                     
> kconsumerd_change_fd_state(lkm.fd, lkm.state);
> +                                             }
> +                                     } else {
> +                                                     
> kconsumerd_change_fd_state(lkm.fd, lkm.state);
> +                                     }
>                                       break;
>                               default:
>                                       break;
> @@ -756,7 +771,9 @@ end:
>   * Returns a pointer to the new context or NULL on error.
>   */
>  struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
> -             int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
> +             int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +             int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +             int (*update_fd)(int sessiond_fd, uint32_t state))
>  {
>       int ret;
>       struct lttng_kconsumerd_local_data *ctx;
> @@ -767,7 +784,10 @@ struct lttng_kconsumerd_local_data 
> *lttng_kconsumerd_create(
>               goto end;
>       }
>  
> +     /* assign the callbacks */
>       ctx->on_buffer_ready = buffer_ready;
> +     ctx->on_recv_fd = recv_fd;
> +     ctx->on_update_fd = update_fd;
>  
>       ret = pipe(ctx->kconsumerd_poll_pipe);
>       if (ret < 0) {
> diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
> index cd4b00e..f2bb211 100644
> --- a/ltt-kconsumerd/ltt-kconsumerd.c
> +++ b/ltt-kconsumerd/ltt-kconsumerd.c
> @@ -271,6 +271,47 @@ end:
>       return ret;
>  }
>  
> +static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
> +{
> +     int ret;
> +
> +     /* Opening the tracefile in write mode */
> +     if (kconsumerd_fd->path_name != NULL) {
> +             ret = open(kconsumerd_fd->path_name,
> +                             O_WRONLY|O_CREAT|O_TRUNC, 
> S_IRWXU|S_IRWXG|S_IRWXO);
> +             if (ret < 0) {
> +                     ERR("Opening %s", kconsumerd_fd->path_name);
> +                     perror("open");
> +                     goto error;
> +             }
> +             kconsumerd_fd->out_fd = ret;
> +     }
> +
> +     if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
> +             /* get the len of the mmap region */
> +             ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, 
> &kconsumerd_fd->mmap_len);
> +             if (ret != 0) {
> +                     ret = errno;
> +                     perror("kernctl_get_mmap_len");
> +                     goto error;
> +             }
> +
> +             kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
> +                             PROT_READ, MAP_PRIVATE, 
> kconsumerd_fd->consumerd_fd, 0);
> +             if (kconsumerd_fd->mmap_base == MAP_FAILED) {
> +                     perror("Error mmaping");
> +                     ret = -1;
> +                     goto error;
> +             }
> +     }
> +
> +     /* we return the FD back to the lib to let it handle the FD internally 
> */
> +     return kconsumerd_fd->sessiond_fd;
> +
> +error:
> +     return ret;
> +}
> +
>  /*
>   * main
>   */
> @@ -297,8 +338,8 @@ int main(int argc, char **argv)
>               snprintf(command_sock_path, PATH_MAX,
>                               KCONSUMERD_CMD_SOCK_PATH);
>       }
> -     /* create the pipe to wake to receiving thread when needed */
> -     ctx = lttng_kconsumerd_create(read_subbuffer);
> +     /* create the consumer instance with and assign the callbacks */
> +     ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
>       if (ctx == NULL) {
>               goto error;
>       }
> -- 
> 1.7.4.1
> 

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com

_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to