This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 520cdac2f54674f96ea7edaca913a42778e8d4d1 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jun 12 10:36:48 2020 +0700 JAMES-3170 Fix metric measurement upon reactor publisher replay A single metric was used for all the retries leading to an inaccurate measurement. --- .../dropwizard/DropWizardMetricFactory.java | 15 ++++++----- .../metrics/dropwizard/DropWizardTimeMetric.java | 11 ++++++++ .../dropwizard/DropWizardMetricFactoryTest.java | 30 ++++++++++++++++++++++ .../james/metrics/logger/DefaultMetricFactory.java | 12 +++++---- .../metrics/tests/RecordingMetricFactory.java | 12 +++++---- .../metrics/tests/RecordingMetricFactoryTest.java | 26 +++++++++++++++++++ .../blob/cassandra/cache/CachedBlobStore.java | 10 +++----- .../blob/cassandra/cache/CachedBlobStoreTest.java | 30 ++++++++++++++++++++++ 8 files changed, 123 insertions(+), 23 deletions(-) diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java index 2b0081c..b18a04e 100644 --- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java +++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java @@ -28,13 +28,13 @@ import javax.inject.Inject; import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; -import org.apache.james.metrics.api.TimeMetric; import org.reactivestreams.Publisher; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jmx.JmxReporter; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class DropWizardMetricFactory implements MetricFactory, Startable { @@ -54,21 +54,22 @@ public class DropWizardMetricFactory implements MetricFactory, Startable { } @Override - public TimeMetric timer(String name) { + public DropWizardTimeMetric timer(String name) { return new DropWizardTimeMetric(name, metricRegistry.timer(name)); } @Override public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) { - TimeMetric timer = timer(name); - return Flux.from(publisher).doOnComplete(timer::stopAndPublish); + return Mono.fromCallable(() -> timer(name)) + .flatMapMany(timer -> Flux.from(publisher) + .doOnComplete(timer::stopAndPublish)); } @Override public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) { - TimeMetric timer = timer(name); - return Flux.from(publisher) - .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)); + return Mono.fromCallable(() -> timer(name)) + .flatMapMany(timer -> Flux.from(publisher) + .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))); } @PostConstruct diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java index adf2c7a..834da92 100644 --- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java +++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; public class DropWizardTimeMetric implements TimeMetric { @@ -73,6 +74,16 @@ public class DropWizardTimeMetric implements TimeMetric { this.context = this.timer.time(); } + @VisibleForTesting + Timer.Context getContext() { + return context; + } + + @VisibleForTesting + Timer getTimer() { + return timer; + } + @Override public String name() { return name; diff --git a/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java b/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java index 75aada0..368075c 100644 --- a/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java +++ b/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java @@ -19,12 +19,20 @@ package org.apache.james.metrics.dropwizard; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + import org.apache.james.metrics.api.MetricFactory; import org.apache.james.metrics.api.MetricFactoryContract; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import com.codahale.metrics.MetricRegistry; +import reactor.core.publisher.Mono; + class DropWizardMetricFactoryTest implements MetricFactoryContract { private DropWizardMetricFactory testee; @@ -38,4 +46,26 @@ class DropWizardMetricFactoryTest implements MetricFactoryContract { public MetricFactory testee() { return testee; } + + @Test + void decoratePublisherWithTimerMetricShouldRecordANewValueForEachRetry() { + Duration duration = Duration.ofMillis(100); + Mono.from(testee.decoratePublisherWithTimerMetric("any", Mono.delay(duration))) + .repeat(5) + .blockLast(); + + assertThat(testee.timer("any").getTimer().getSnapshot().get99thPercentile()) + .isLessThan(duration.get(ChronoUnit.NANOS) * 2); + } + + @Test + void decoratePublisherWithTimerMetricLogP99ShouldRecordANewValueForEachRetry() { + Duration duration = Duration.ofMillis(100); + Mono.from(testee.decoratePublisherWithTimerMetricLogP99("any", Mono.delay(duration))) + .repeat(5) + .blockLast(); + + assertThat(testee.timer("any").getTimer().getSnapshot().get99thPercentile()) + .isLessThan(duration.get(ChronoUnit.NANOS) * 2); + } } \ No newline at end of file diff --git a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java index 919363c..e0aa75f 100644 --- a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java +++ b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class DefaultMetricFactory implements MetricFactory { @@ -45,14 +46,15 @@ public class DefaultMetricFactory implements MetricFactory { @Override public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) { - TimeMetric timer = timer(name); - return Flux.from(publisher).doOnComplete(timer::stopAndPublish); + return Mono.fromCallable(() -> timer(name)) + .flatMapMany(timer -> Flux.from(publisher) + .doOnComplete(timer::stopAndPublish)); } @Override public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) { - TimeMetric timer = timer(name); - return Flux.from(publisher) - .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)); + return Mono.fromCallable(() -> timer(name)) + .flatMapMany(timer -> Flux.from(publisher) + .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))); } } diff --git a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java index 63a8922..78aeecd 100644 --- a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java +++ b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java @@ -38,6 +38,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class RecordingMetricFactory implements MetricFactory { private final Multimap<String, Duration> executionTimes = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); @@ -63,15 +64,16 @@ public class RecordingMetricFactory implements MetricFactory { @Override public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) { - TimeMetric timer = timer(name); - return Flux.from(publisher).doOnComplete(timer::stopAndPublish); + return Mono.fromCallable(() -> timer(name)) + .flatMapMany(timer -> Flux.from(publisher) + .doOnComplete(timer::stopAndPublish)); } @Override public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) { - TimeMetric timer = timer(name); - return Flux.from(publisher) - .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)); + return Mono.fromCallable(() -> timer(name)) + .flatMapMany(timer -> Flux.from(publisher) + .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))); } public Collection<Duration> executionTimesFor(String name) { diff --git a/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java b/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java index c125ea3..bbdcf92 100644 --- a/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java +++ b/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java @@ -32,6 +32,8 @@ import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + class RecordingMetricFactoryTest implements MetricFactoryContract { private static final String TIME_METRIC_NAME = "timerMetric"; @@ -118,4 +120,28 @@ class RecordingMetricFactoryTest implements MetricFactoryContract { assertThat(testee.countFor(METRIC_NAME)) .isEqualTo(5); } + + @Test + void decoratePublisherWithTimerMetricShouldRecordANewValueForEachRetry() { + Duration duration = Duration.ofMillis(100); + Mono.from(testee.decoratePublisherWithTimerMetric("any", Mono.delay(duration))) + .repeat(5) + .blockLast(); + + assertThat(testee.executionTimesFor("any")) + .hasSize(6) + .allSatisfy(timing -> assertThat(timing).isLessThan(duration.multipliedBy(2))); + } + + @Test + void decoratePublisherWithTimerMetricLogP99ShouldRecordANewValueForEachRetry() { + Duration duration = Duration.ofMillis(100); + Mono.from(testee.decoratePublisherWithTimerMetricLogP99("any", Mono.delay(duration))) + .repeat(5) + .blockLast(); + + assertThat(testee.executionTimesFor("any")) + .hasSize(6) + .allSatisfy(timing -> assertThat(timing).isLessThan(duration.multipliedBy(2))); + } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java index 48a8cfd..0e998c0 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java @@ -38,7 +38,6 @@ import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; -import org.apache.james.metrics.api.TimeMetric; import org.reactivestreams.Publisher; import com.google.common.base.Preconditions; @@ -277,10 +276,9 @@ public class CachedBlobStore implements BlobStore { } private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) { - TimeMetric timer = metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME); - - return Mono.from(backend.readBytes(bucketName, blobId)) - .doOnSuccess(any -> timer.stopAndPublish()) - .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish()); + return Mono.fromCallable(() -> metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) + .flatMap(timer -> Mono.from(backend.readBytes(bucketName, blobId)) + .doOnSuccess(any -> timer.stopAndPublish()) + .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish())); } } diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java index 5caff3f..1d2abcd 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java @@ -446,6 +446,36 @@ public class CachedBlobStoreTest implements BlobStoreContract { } @Test + void readBytesShouldRecordDistinctTimingsWhenRepeatAndBackendRead() { + BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block(); + + Duration delay = Duration.ofMillis(500); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)) + .then(Mono.delay(delay)) + .repeat(2) + .blockLast(); + + assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) + .hasSize(3) + .allSatisfy(timing -> assertThat(timing).isLessThan(delay)); + } + + @Test + void readBytesShouldRecordDistinctTimingsWhenRepeat() { + BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); + + Duration delay = Duration.ofMillis(500); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)) + .then(Mono.delay(delay)) + .repeat(2) + .blockLast(); + + assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME)) + .hasSize(3) + .allSatisfy(timing -> assertThat(timing).isLessThan(delay)); + } + + @Test void readBlobStoreCacheShouldCountWhenHit() { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org