* David Goulet ([email protected]) wrote:
> Improve the resilience of the consumer by cleaning up a relayd object
> and all associated streams when a write error is detected on a relayd
> socket.
> 
> Fixes #385
> 
> Signed-off-by: David Goulet <[email protected]>
> ---
>  src/common/consumer.c      |  259 
> ++++++++++++++++++++++++++++++++++++++++----
>  src/common/consumer.h      |   12 ++
>  src/common/relayd/relayd.c |    3 +
>  3 files changed, 254 insertions(+), 20 deletions(-)
> 
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index 53c6180..eeb2f59 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -70,6 +70,21 @@ struct lttng_ht *metadata_ht;
>  struct lttng_ht *data_ht;
>  
>  /*
> + * Notify a thread pipe to poll back again. This usually means that some 
> global
> + * state has changed so we just send back the thread in a poll wait call.
> + */
> +static void notify_thread_pipe(int wpipe)
> +{
> +     int ret;
> +
> +     do {
> +             struct lttng_consumer_stream *null_stream = NULL;
> +
> +             ret = write(wpipe, &null_stream, sizeof(null_stream));
> +     } while (ret < 0 && errno == EINTR);
> +}
> +
> +/*
>   * Find a stream. The consumer_data.lock must be locked during this
>   * call.
>   */
> @@ -182,6 +197,17 @@ static void consumer_rcu_free_relayd(struct rcu_head 
> *head)
>       struct consumer_relayd_sock_pair *relayd =
>               caa_container_of(node, struct consumer_relayd_sock_pair, node);
>  
> +     /*
> +      * Close all sockets. This is done in the call RCU since we don't want 
> the
> +      * socket fds to be reassigned thus potentially creating bad state of 
> the
> +      * relayd object.
> +      *
> +      * We do not have to lock the control socket mutex here since at this 
> stage
> +      * there is no one referencing to this relayd object.
> +      */
> +     (void) relayd_close(&relayd->control_sock);
> +     (void) relayd_close(&relayd->data_sock);
> +
>       free(relayd);
>  }
>  
> @@ -204,21 +230,89 @@ static void destroy_relayd(struct 
> consumer_relayd_sock_pair *relayd)
>       iter.iter.node = &relayd->node.node;
>       ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
>       if (ret != 0) {
> -             /* We assume the relayd was already destroyed */
> +             /* We assume the relayd is being or is destroyed */
>               return;
>       }
>  
> -     /* Close all sockets */
> -     pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> -     (void) relayd_close(&relayd->control_sock);
> -     pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
> -     (void) relayd_close(&relayd->data_sock);
> -
>       /* RCU free() call */
>       call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
>  }
>  
>  /*
> + * Update the end point status of all streams having the given network 
> sequence
> + * index (relayd index).
> + *
> + * It's atomically set without having the stream mutex locked so be aware of
> + * potential race when using it.

Please describe that we handle this race with a retry that will happen,
triggered by the pipe wakeup.

Other than that,

Acked-by: Mathieu Desnoyers <[email protected]>


> + */
> +static void update_endpoint_status_by_netidx(int net_seq_idx,
> +             enum consumer_endpoint_status status)
> +{
> +     struct lttng_ht_iter iter;
> +     struct lttng_consumer_stream *stream;
> +
> +     DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
> +
> +     rcu_read_lock();
> +
> +     /* Let's begin with metadata */
> +     cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) 
> {
> +             if (stream->net_seq_idx == net_seq_idx) {
> +                     uatomic_set(&stream->endpoint_status, status);
> +                     DBG("Delete flag set to metadata stream %d", 
> stream->wait_fd);
> +             }
> +     }
> +
> +     /* Follow up by the data streams */
> +     cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
> +             if (stream->net_seq_idx == net_seq_idx) {
> +                     uatomic_set(&stream->endpoint_status, status);
> +                     DBG("Delete flag set to data stream %d", 
> stream->wait_fd);
> +             }
> +     }
> +     rcu_read_unlock();
> +}
> +
> +/*
> + * Cleanup a relayd object by flagging every associated streams for deletion,
> + * destroying the object meaning removing it from the relayd hash table,
> + * closing the sockets and freeing the memory in a RCU call.
> + *
> + * If a local data context is available, notify the threads that the streams'
> + * state have changed.
> + */
> +static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
> +             struct lttng_consumer_local_data *ctx)
> +{
> +     int netidx;
> +
> +     assert(relayd);
> +
> +     /* Save the net sequence index before destroying the object */
> +     netidx = relayd->net_seq_idx;
> +
> +     /*
> +      * Delete the relayd from the relayd hash table, close the sockets and 
> free
> +      * the object in a RCU call.
> +      */
> +     destroy_relayd(relayd);
> +
> +     /* Set inactive endpoint to all streams */
> +     update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
> +
> +     /*
> +      * With a local data context, notify the threads that the streams' state
> +      * have changed. The write() action on the pipe acts as an "implicit"
> +      * memory barrier ordering the updates of the end point status from the
> +      * read of this status which happens AFTER receiving this notify.
> +      */
> +     if (ctx) {
> +             notify_thread_pipe(ctx->consumer_data_pipe[1]);
> +             notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
> +     }
> +}
> +
> +/*
>   * Flag a relayd socket pair for destruction. Destroy it if the refcount
>   * reaches zero.
>   *
> @@ -251,11 +345,14 @@ void consumer_del_stream(struct lttng_consumer_stream 
> *stream,
>  
>       assert(stream);
>  
> +     DBG("Consumer del stream %d", stream->wait_fd);
> +
>       if (ht == NULL) {
>               /* Means the stream was allocated but not successfully added */
>               goto free_stream;
>       }
>  
> +     pthread_mutex_lock(&stream->lock);
>       pthread_mutex_lock(&consumer_data.lock);
>  
>       switch (consumer_data.type) {
> @@ -349,6 +446,7 @@ void consumer_del_stream(struct lttng_consumer_stream 
> *stream,
>  end:
>       consumer_data.need_update = 1;
>       pthread_mutex_unlock(&consumer_data.lock);
> +     pthread_mutex_unlock(&stream->lock);
>  
>       if (free_chan) {
>               consumer_del_channel(free_chan);
> @@ -804,7 +902,17 @@ static int consumer_update_poll_array(
>       DBG("Updating poll fd array");
>       rcu_read_lock();
>       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
> -             if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
> +             /*
> +              * Only active streams with an active end point can be added to 
> the
> +              * poll set and local stream storage of the thread.
> +              *
> +              * There is a potential race here for endpoint_status to be 
> updated
> +              * just after the check. However, this is OK since the 
> stream(s) will
> +              * be deleted once the thread is notified that the end point 
> state has
> +              * changed where this function will be called back again.
> +              */
> +             if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
> +                             stream->endpoint_status) {
>                       continue;
>               }
>               DBG("Active FD %d", stream->wait_fd);
> @@ -1169,6 +1277,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>       /* Default is on the disk */
>       int outfd = stream->out_fd;
>       struct consumer_relayd_sock_pair *relayd = NULL;
> +     unsigned int relayd_hang_up = 0;
>  
>       /* RCU lock for the relayd pointer */
>       rcu_read_lock();
> @@ -1228,11 +1337,22 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>                               ret = write_relayd_metadata_id(outfd, stream, 
> relayd, padding);
>                               if (ret < 0) {
>                                       written = ret;
> +                                     /* Socket operation failed. We consider 
> the relayd dead */
> +                                     if (ret == -EPIPE || ret == -EINVAL) {
> +                                             relayd_hang_up = 1;
> +                                             goto write_error;
> +                                     }
>                                       goto end;
>                               }
>                       }
> +             } else {
> +                     /* Socket operation failed. We consider the relayd dead 
> */
> +                     if (ret == -EPIPE || ret == -EINVAL) {
> +                             relayd_hang_up = 1;
> +                             goto write_error;
> +                     }
> +                     /* Else, use the default set before which is the 
> filesystem. */
>               }
> -             /* Else, use the default set before which is the filesystem. */
>       } else {
>               /* No streaming, we have to set the len with the full padding */
>               len += padding;
> @@ -1248,6 +1368,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>                       if (written == 0) {
>                               written = ret;
>                       }
> +                     /* Socket operation failed. We consider the relayd dead 
> */
> +                     if (errno == EPIPE || errno == EINVAL) {
> +                             relayd_hang_up = 1;
> +                             goto write_error;
> +                     }
>                       goto end;
>               } else if (ret > len) {
>                       PERROR("Error in file write (ret %zd > len %lu)", ret, 
> len);
> @@ -1269,6 +1394,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>       }
>       lttng_consumer_sync_trace_file(stream, orig_offset);
>  
> +write_error:
> +     /*
> +      * This is a special case that the relayd has closed its socket. Let's
> +      * cleanup the relayd object and all associated streams.
> +      */
> +     if (relayd && relayd_hang_up) {
> +             cleanup_relayd(relayd, ctx);
> +     }
> +
>  end:
>       /* Unlock only if ctrl socket used */
>       if (relayd && stream->metadata_flag) {
> @@ -1298,6 +1432,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>       int outfd = stream->out_fd;
>       struct consumer_relayd_sock_pair *relayd = NULL;
>       int *splice_pipe;
> +     unsigned int relayd_hang_up = 0;
>  
>       switch (consumer_data.type) {
>       case LTTNG_CONSUMER_KERNEL:
> @@ -1350,6 +1485,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>                                       padding);
>                       if (ret < 0) {
>                               written = ret;
> +                             /* Socket operation failed. We consider the 
> relayd dead */
> +                             if (ret == -EBADF) {
> +                                     WARN("Remote relayd disconnected. 
> Stopping");
> +                                     relayd_hang_up = 1;
> +                                     goto write_error;
> +                             }
>                               goto end;
>                       }
>  
> @@ -1361,7 +1502,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>                       /* Use the returned socket. */
>                       outfd = ret;
>               } else {
> -                     ERR("Remote relayd disconnected. Stopping");
> +                     /* Socket operation failed. We consider the relayd dead 
> */
> +                     if (ret == -EBADF) {
> +                             WARN("Remote relayd disconnected. Stopping");
> +                             relayd_hang_up = 1;
> +                             goto write_error;
> +                     }
>                       goto end;
>               }
>       } else {
> @@ -1410,6 +1556,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>                       if (written == 0) {
>                               written = ret_splice;
>                       }
> +                     /* Socket operation failed. We consider the relayd dead 
> */
> +                     if (errno == EBADF) {
> +                             WARN("Remote relayd disconnected. Stopping");
> +                             relayd_hang_up = 1;
> +                             goto write_error;
> +                     }
>                       ret = errno;
>                       goto splice_error;
>               } else if (ret_splice > len) {
> @@ -1437,12 +1589,20 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>  
>       goto end;
>  
> +write_error:
> +     /*
> +      * This is a special case that the relayd has closed its socket. Let's
> +      * cleanup the relayd object and all associated streams.
> +      */
> +     if (relayd && relayd_hang_up) {
> +             cleanup_relayd(relayd, ctx);
> +             /* Skip splice error so the consumer does not fail */
> +             goto end;
> +     }
> +
>  splice_error:
>       /* send the appropriate error description to sessiond */
>       switch (ret) {
> -     case EBADF:
> -             lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
> -             break;
>       case EINVAL:
>               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
>               break;
> @@ -1604,6 +1764,8 @@ void consumer_del_metadata_stream(struct 
> lttng_consumer_stream *stream,
>               goto free_stream;
>       }
>  
> +     pthread_mutex_lock(&stream->lock);
> +
>       pthread_mutex_lock(&consumer_data.lock);
>       switch (consumer_data.type) {
>       case LTTNG_CONSUMER_KERNEL:
> @@ -1695,6 +1857,7 @@ void consumer_del_metadata_stream(struct 
> lttng_consumer_stream *stream,
>  
>  end:
>       pthread_mutex_unlock(&consumer_data.lock);
> +     pthread_mutex_unlock(&stream->lock);
>  
>       if (free_chan) {
>               consumer_del_channel(free_chan);
> @@ -1766,6 +1929,59 @@ static int consumer_add_metadata_stream(struct 
> lttng_consumer_stream *stream,
>  }
>  
>  /*
> + * Delete data stream that are flagged for deletion (endpoint_status).
> + */
> +static void validate_endpoint_status_data_stream(void)
> +{
> +     struct lttng_ht_iter iter;
> +     struct lttng_consumer_stream *stream;
> +
> +     DBG("Consumer delete flagged data stream");
> +
> +     rcu_read_lock();
> +     cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
> +             /* Validate delete flag of the stream */
> +             if (!stream->endpoint_status) {
> +                     continue;
> +             }
> +             /* Delete it right now */
> +             consumer_del_stream(stream, data_ht);
> +     }
> +     rcu_read_unlock();
> +}
> +
> +/*
> + * Delete metadata stream that are flagged for deletion (endpoint_status).
> + */
> +static void validate_endpoint_status_metadata_stream(
> +             struct lttng_poll_event *pollset)
> +{
> +     struct lttng_ht_iter iter;
> +     struct lttng_consumer_stream *stream;
> +
> +     DBG("Consumer delete flagged metadata stream");
> +
> +     assert(pollset);
> +
> +     rcu_read_lock();
> +     cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) 
> {
> +             /* Validate delete flag of the stream */
> +             if (!stream->endpoint_status) {
> +                     continue;
> +             }
> +             /*
> +              * Remove from pollset so the metadata thread can continue 
> without
> +              * blocking on a deleted stream.
> +              */
> +             lttng_poll_del(pollset, stream->wait_fd);
> +
> +             /* Delete it right now */
> +             consumer_del_metadata_stream(stream, metadata_ht);
> +     }
> +     rcu_read_unlock();
> +}
> +
> +/*
>   * Thread polls on metadata file descriptor and write them on disk or on the
>   * network.
>   */
> @@ -1856,6 +2072,13 @@ restart:
>                                               continue;
>                                       }
>  
> +                                     /* A NULL stream means that the state 
> has changed. */
> +                                     if (stream == NULL) {
> +                                             /* Check for deleted streams. */
> +                                             
> validate_endpoint_status_metadata_stream(&events);
> +                                             continue;
> +                                     }
> +
>                                       DBG("Adding metadata stream %d to poll 
> set",
>                                                       stream->wait_fd);
>  
> @@ -2063,6 +2286,7 @@ void *consumer_thread_data_poll(void *data)
>                        * waking us up to test it.
>                        */
>                       if (new_stream == NULL) {
> +                             validate_endpoint_status_data_stream();
>                               continue;
>                       }
>  
> @@ -2301,14 +2525,9 @@ end:
>  
>       /*
>        * Notify the data poll thread to poll back again and test the
> -      * consumer_quit state to quit gracefully.
> +      * consumer_quit state that we just set so to quit gracefully.
>        */
> -     do {
> -             struct lttng_consumer_stream *null_stream = NULL;
> -
> -             ret = write(ctx->consumer_data_pipe[1], &null_stream,
> -                             sizeof(null_stream));
> -     } while (ret < 0 && errno == EINTR);
> +     notify_thread_pipe(ctx->consumer_data_pipe[1]);
>  
>       rcu_unregister_thread();
>       return NULL;
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 53b6151..0334c49 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -74,6 +74,11 @@ enum lttng_consumer_type {
>       LTTNG_CONSUMER32_UST,
>  };
>  
> +enum consumer_endpoint_status {
> +     CONSUMER_ENDPOINT_ACTIVE,
> +     CONSUMER_ENDPOINT_INACTIVE,
> +};
> +
>  struct lttng_consumer_channel {
>       struct lttng_ht_node_ulong node;
>       int key;
> @@ -150,6 +155,13 @@ struct lttng_consumer_stream {
>       pthread_mutex_t lock;
>       /* Tracing session id */
>       uint64_t session_id;
> +     /*
> +      * Indicates if the stream end point is still active or not (network
> +      * streaming or local file system). The thread "owning" the stream is
> +      * handling this status and can be notified of a state change through 
> the
> +      * consumer data appropriate pipe.
> +      */
> +     enum consumer_endpoint_status endpoint_status;
>  };
>  
>  /*
> diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
> index 785d3dc..db47608 100644
> --- a/src/common/relayd/relayd.c
> +++ b/src/common/relayd/relayd.c
> @@ -67,6 +67,7 @@ static int send_command(struct lttcomm_sock *sock,
>  
>       ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
>       if (ret < 0) {
> +             ret = -errno;
>               goto error;
>       }
>  
> @@ -90,6 +91,7 @@ static int recv_reply(struct lttcomm_sock *sock, void 
> *data, size_t size)
>  
>       ret = sock->ops->recvmsg(sock, data, size, 0);
>       if (ret < 0) {
> +             ret = -errno;
>               goto error;
>       }
>  
> @@ -283,6 +285,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
>       /* Only send data header. */
>       ret = sock->ops->sendmsg(sock, hdr, size, 0);
>       if (ret < 0) {
> +             ret = -errno;
>               goto error;
>       }
>  
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> lttng-dev mailing list
> [email protected]
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to