Signed-off-by: Mathieu Desnoyers <mathieu.desnoy...@efficios.com> --- src/common/consumer-timer.c | 118 +++++++++++++++------------ src/common/consumer-timer.h | 3 + src/common/kernel-consumer/kernel-consumer.c | 17 +++- src/common/ust-consumer/ust-consumer.c | 2 +- 4 files changed, 85 insertions(+), 55 deletions(-)
diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index e17671b..c3a07ec 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -133,11 +133,48 @@ error: return ret; } -static int check_kernel_stream(struct lttng_consumer_stream *stream) +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream) { uint64_t ts, stream_id; int ret; + ret = kernctl_get_current_timestamp(stream->wait_fd, &ts); + if (ret < 0) { + ERR("Failed to get the current timestamp"); + goto end; + } + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + ret = kernctl_snapshot(stream->wait_fd); + if (ret < 0) { + if (errno != EAGAIN && errno != ENODATA) { + PERROR("live timer kernel snapshot"); + ret = -1; + goto end; + } + ret = kernctl_get_stream_id(stream->wait_fd, &stream_id); + if (ret < 0) { + PERROR("kernctl_get_stream_id"); + goto end; + } + DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); + ret = send_empty_index(stream, ts, stream_id); + if (ret < 0) { + goto end; + } + } + ret = 0; +end: + return ret; +} + +static int check_kernel_stream(struct lttng_consumer_stream *stream) +{ + int ret; + /* * While holding the stream mutex, try to take a snapshot, if it * succeeds, it means that data is ready to be sent, just let the data @@ -173,45 +210,53 @@ static int check_kernel_stream(struct lttng_consumer_stream *stream) } break; } - ret = kernctl_get_current_timestamp(stream->wait_fd, &ts); - if (ret < 0) { - ERR("Failed to get the current timestamp"); - goto error_unlock; + ret = consumer_flush_kernel_index(stream); + pthread_mutex_unlock(&stream->lock); +end: + return ret; +} + +int consumer_flush_ust_index(struct lttng_consumer_stream *stream) +{ + uint64_t ts, stream_id; + int ret; + + ret = cds_lfht_is_node_deleted(&stream->node.node); + if (ret) { + goto end; } - ret = kernctl_buffer_flush(stream->wait_fd); + + ret = lttng_ustconsumer_get_current_timestamp(stream, &ts); if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto error_unlock; + ERR("Failed to get the current timestamp"); + goto end; } - ret = kernctl_snapshot(stream->wait_fd); + lttng_ustconsumer_flush_buffer(stream, 1); + ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { - if (errno != EAGAIN && errno != ENODATA) { - PERROR("live timer kernel snapshot"); + if (ret != -EAGAIN) { + ERR("Taking UST snapshot"); ret = -1; - goto error_unlock; + goto end; } - ret = kernctl_get_stream_id(stream->wait_fd, &stream_id); + ret = lttng_ustconsumer_get_stream_id(stream, &stream_id); if (ret < 0) { - PERROR("kernctl_get_stream_id"); - goto error_unlock; + PERROR("ustctl_get_stream_id"); + goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { - goto error_unlock; + goto end; } } ret = 0; - -error_unlock: - pthread_mutex_unlock(&stream->lock); end: return ret; } static int check_ust_stream(struct lttng_consumer_stream *stream) { - uint64_t ts, stream_id; int ret; assert(stream); @@ -251,38 +296,7 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) } break; } - ret = cds_lfht_is_node_deleted(&stream->node.node); - if (ret) { - goto error_unlock; - } - - ret = lttng_ustconsumer_get_current_timestamp(stream, &ts); - if (ret < 0) { - ERR("Failed to get the current timestamp"); - goto error_unlock; - } - lttng_ustconsumer_flush_buffer(stream, 1); - ret = lttng_ustconsumer_take_snapshot(stream); - if (ret < 0) { - if (ret != -EAGAIN) { - ERR("Taking UST snapshot"); - ret = -1; - goto error_unlock; - } - ret = lttng_ustconsumer_get_stream_id(stream, &stream_id); - if (ret < 0) { - PERROR("ustctl_get_stream_id"); - goto error_unlock; - } - DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); - ret = send_empty_index(stream, ts, stream_id); - if (ret < 0) { - goto error_unlock; - } - } - ret = 0; - -error_unlock: + ret = consumer_flush_ust_index(stream); pthread_mutex_unlock(&stream->lock); end: return ret; diff --git a/src/common/consumer-timer.h b/src/common/consumer-timer.h index baaa82b..22e7457 100644 --- a/src/common/consumer-timer.h +++ b/src/common/consumer-timer.h @@ -52,4 +52,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel); void *consumer_timer_thread(void *data); int consumer_signal_init(void); +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream); +int consumer_flush_ust_index(struct lttng_consumer_stream *stream); + #endif /* CONSUMER_TIMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 74cdd7d..aae56f9 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1219,9 +1219,22 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* * In live, block until all the metadata is sent. */ - CMM_STORE_SHARED(stream->waiting_on_metadata, 1); + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + err = consumer_stream_sync_metadata(ctx, stream->session_id); - CMM_STORE_SHARED(stream->waiting_on_metadata, 0); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) consumer_flush_kernel_index(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } if (err < 0) { goto end; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index a1f1ced..d45707b 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2260,7 +2260,7 @@ retry: if (stream->missed_metadata_flush) { stream->missed_metadata_flush = false; pthread_mutex_unlock(&stream->metadata_timer_lock); - lttng_ustconsumer_flush_buffer(stream, 1); + (void) consumer_flush_ust_index(stream); } else { pthread_mutex_unlock(&stream->metadata_timer_lock); } -- 2.1.4 _______________________________________________ lttng-dev mailing list lttng-dev@lists.lttng.org http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev