This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f966193420d8bc198a4b3c805e68969e1c724337
Author: Tran Tien Duc <[email protected]>
AuthorDate: Thu Jul 18 15:30:09 2019 +0700

    JAMES-2836 BlobStore Vault plugging metric records
---
 mailbox/plugin/deleted-messages-vault/pom.xml      |  5 ++
 .../vault/blob/BlobStoreDeletedMessageVault.java   | 50 ++++++++++++++--
 .../blob/BlobStoreDeletedMessageVaultTest.java     | 69 +++++++++++++++++++++-
 .../apache/james/metrics/api/MetricFactory.java    |  6 ++
 4 files changed, 123 insertions(+), 7 deletions(-)

diff --git a/mailbox/plugin/deleted-messages-vault/pom.xml 
b/mailbox/plugin/deleted-messages-vault/pom.xml
index 23a22a9..5c65510 100644
--- a/mailbox/plugin/deleted-messages-vault/pom.xml
+++ b/mailbox/plugin/deleted-messages-vault/pom.xml
@@ -94,6 +94,11 @@
             <artifactId>james-server-task-json</artifactId>
         </dependency>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-tests</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
index e585d50..67290b4 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
@@ -32,6 +32,7 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.core.User;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.task.Task;
 import org.apache.james.vault.DeletedMessage;
 import org.apache.james.vault.DeletedMessageContentNotFoundException;
@@ -56,6 +57,14 @@ import reactor.core.scheduler.Schedulers;
 public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BlobStoreDeletedMessageVault.class);
 
+    private static final String BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC = 
"deletedMessageVault:blobStore:";
+    static final String APPEND_METRIC_NAME = 
BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "append";
+    static final String LOAD_MIME_MESSAGE_METRIC_NAME = 
BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "loadMimeMessage";
+    static final String SEARCH_METRIC_NAME = 
BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "search";
+    static final String DELETE_METRIC_NAME = 
BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "delete";
+    static final String DELETE_EXPIRED_MESSAGES_METRIC_NAME = 
BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "deleteExpiredMessages";
+
+    private final MetricFactory metricFactory;
     private final DeletedMessageMetadataVault messageMetadataVault;
     private final BlobStore blobStore;
     private final BucketNameGenerator nameGenerator;
@@ -63,7 +72,11 @@ public class BlobStoreDeletedMessageVault implements 
DeletedMessageVault {
     private final RetentionConfiguration retentionConfiguration;
 
     @Inject
-    BlobStoreDeletedMessageVault(DeletedMessageMetadataVault 
messageMetadataVault, BlobStore blobStore, BucketNameGenerator nameGenerator, 
Clock clock, RetentionConfiguration retentionConfiguration) {
+    BlobStoreDeletedMessageVault(MetricFactory metricFactory, 
DeletedMessageMetadataVault messageMetadataVault,
+                                 BlobStore blobStore, BucketNameGenerator 
nameGenerator,
+                                 Clock clock,
+                                 RetentionConfiguration 
retentionConfiguration) {
+        this.metricFactory = metricFactory;
         this.messageMetadataVault = messageMetadataVault;
         this.blobStore = blobStore;
         this.nameGenerator = nameGenerator;
@@ -76,6 +89,13 @@ public class BlobStoreDeletedMessageVault implements 
DeletedMessageVault {
         Preconditions.checkNotNull(deletedMessage);
         Preconditions.checkNotNull(mimeMessage);
         BucketName bucketName = nameGenerator.currentBucket();
+
+        return metricFactory.runPublishingTimerMetric(
+            APPEND_METRIC_NAME,
+            appendMessage(deletedMessage, mimeMessage, bucketName));
+    }
+
+    private Mono<Void> appendMessage(DeletedMessage deletedMessage, 
InputStream mimeMessage, BucketName bucketName) {
         return blobStore.save(bucketName, mimeMessage)
             .map(blobId -> StorageInformation.builder()
                 .bucketName(bucketName)
@@ -89,8 +109,11 @@ public class BlobStoreDeletedMessageVault implements 
DeletedMessageVault {
     public Publisher<InputStream> loadMimeMessage(User user, MessageId 
messageId) {
         Preconditions.checkNotNull(user);
         Preconditions.checkNotNull(messageId);
-        return Mono.from(messageMetadataVault.retrieveStorageInformation(user, 
messageId))
-            .flatMap(storageInformation -> loadMimeMessage(storageInformation, 
user, messageId));
+
+        return metricFactory.runPublishingTimerMetric(
+            LOAD_MIME_MESSAGE_METRIC_NAME,
+            Mono.from(messageMetadataVault.retrieveStorageInformation(user, 
messageId))
+                .flatMap(storageInformation -> 
loadMimeMessage(storageInformation, user, messageId)));
     }
 
     private Mono<InputStream> loadMimeMessage(StorageInformation 
storageInformation, User user, MessageId messageId) {
@@ -104,6 +127,13 @@ public class BlobStoreDeletedMessageVault implements 
DeletedMessageVault {
     public Publisher<DeletedMessage> search(User user, Query query) {
         Preconditions.checkNotNull(user);
         Preconditions.checkNotNull(query);
+
+        return metricFactory.runPublishingTimerMetric(
+            SEARCH_METRIC_NAME,
+            searchOn(user, query));
+    }
+
+    private Flux<DeletedMessage> searchOn(User user, Query query) {
         return Flux.from(messageMetadataVault.listRelatedBuckets())
             .concatMap(bucketName -> 
Flux.from(messageMetadataVault.listMessages(bucketName, user)))
             .map(DeletedMessageWithStorageInformation::getDeletedMessage)
@@ -115,6 +145,12 @@ public class BlobStoreDeletedMessageVault implements 
DeletedMessageVault {
         Preconditions.checkNotNull(user);
         Preconditions.checkNotNull(messageId);
 
+        return metricFactory.runPublishingTimerMetric(
+            DELETE_METRIC_NAME,
+            deleteMessage(user, messageId));
+    }
+
+    private Mono<Void> deleteMessage(User user, MessageId messageId) {
         return Mono.from(messageMetadataVault.retrieveStorageInformation(user, 
messageId))
             .flatMap(storageInformation -> 
Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), user, 
messageId))
                 .thenReturn(storageInformation))
@@ -127,10 +163,12 @@ public class BlobStoreDeletedMessageVault implements 
DeletedMessageVault {
         ZonedDateTime now = ZonedDateTime.now(clock);
         ZonedDateTime beginningOfRetentionPeriod = 
now.minus(retentionConfiguration.getRetentionPeriod());
 
-        Flux<BucketName> deleteOperation = 
retentionQualifiedBuckets(beginningOfRetentionPeriod)
-            .flatMap(bucketName -> 
deleteBucketData(bucketName).then(Mono.just(bucketName)));
+        Flux<BucketName> metricAbleDeleteOperation = 
metricFactory.runPublishingTimerMetric(
+            DELETE_EXPIRED_MESSAGES_METRIC_NAME,
+            retentionQualifiedBuckets(beginningOfRetentionPeriod)
+                .flatMap(bucketName -> 
deleteBucketData(bucketName).then(Mono.just(bucketName))));
 
-        return new 
BlobStoreVaultGarbageCollectionTask(beginningOfRetentionPeriod, 
deleteOperation);
+        return new 
BlobStoreVaultGarbageCollectionTask(beginningOfRetentionPeriod, 
metricAbleDeleteOperation);
     }
 
     @Override
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java
index 9ba4729..a98bf1f 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java
@@ -20,9 +20,18 @@
 package org.apache.james.vault.blob;
 
 import static org.apache.james.vault.DeletedMessageFixture.CONTENT;
+import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE;
 import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_2;
+import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID;
 import static org.apache.james.vault.DeletedMessageFixture.NOW;
 import static org.apache.james.vault.DeletedMessageFixture.OLD_DELETED_MESSAGE;
+import static org.apache.james.vault.DeletedMessageFixture.USER;
+import static 
org.apache.james.vault.blob.BlobStoreDeletedMessageVault.APPEND_METRIC_NAME;
+import static 
org.apache.james.vault.blob.BlobStoreDeletedMessageVault.DELETE_METRIC_NAME;
+import static 
org.apache.james.vault.blob.BlobStoreDeletedMessageVault.LOAD_MIME_MESSAGE_METRIC_NAME;
+import static 
org.apache.james.vault.blob.BlobStoreDeletedMessageVault.DELETE_EXPIRED_MESSAGES_METRIC_NAME;
+import static 
org.apache.james.vault.blob.BlobStoreDeletedMessageVault.SEARCH_METRIC_NAME;
+import static org.apache.james.vault.search.Query.ALL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.ByteArrayInputStream;
@@ -32,6 +41,7 @@ import java.time.ZonedDateTime;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.memory.MemoryBlobStore;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.apache.james.vault.DeletedMessageVault;
 import org.apache.james.vault.DeletedMessageVaultContract;
@@ -47,11 +57,13 @@ import reactor.core.publisher.Mono;
 class BlobStoreDeletedMessageVaultTest implements DeletedMessageVaultContract, 
DeletedMessageVaultSearchContract.AllContracts {
     private BlobStoreDeletedMessageVault messageVault;
     private UpdatableTickingClock clock;
+    private RecordingMetricFactory metricFactory;
 
     @BeforeEach
     void setUp() {
         clock = new UpdatableTickingClock(NOW.toInstant());
-        messageVault = new BlobStoreDeletedMessageVault(new 
MemoryDeletedMessageMetadataVault(),
+        metricFactory = new RecordingMetricFactory();
+        messageVault = new BlobStoreDeletedMessageVault(metricFactory, new 
MemoryDeletedMessageMetadataVault(),
             new MemoryBlobStore(new HashBlobId.Factory()),
             new BucketNameGenerator(clock), clock, 
RetentionConfiguration.DEFAULT);
     }
@@ -90,4 +102,59 @@ class BlobStoreDeletedMessageVaultTest implements 
DeletedMessageVaultContract, D
                 BucketName.of("deleted-messages-2007-12-01"),
                 BucketName.of("deleted-messages-2008-01-01"));
     }
+
+    @Test
+    void appendShouldPublishAppendTimerMetrics() {
+        Mono.from(messageVault.append(DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT)))
+            .block();
+        Mono.from(messageVault.append(DELETED_MESSAGE_2, new 
ByteArrayInputStream(CONTENT)))
+            .block();
+
+        assertThat(metricFactory.executionTimesFor(APPEND_METRIC_NAME))
+            .hasSize(2);
+    }
+
+    @Test
+    void searchShouldPublishSearchTimerMetrics() {
+        Mono.from(messageVault.search(USER, ALL))
+            .block();
+        Mono.from(messageVault.search(USER, ALL))
+            .block();
+
+        assertThat(metricFactory.executionTimesFor(SEARCH_METRIC_NAME))
+            .hasSize(2);
+    }
+
+    @Test
+    void loadMimeMessageShouldPublishLoadMimeMessageTimerMetrics() {
+        Mono.from(messageVault.loadMimeMessage(USER, MESSAGE_ID))
+            .block();
+        Mono.from(messageVault.loadMimeMessage(USER, MESSAGE_ID))
+            .block();
+
+        
assertThat(metricFactory.executionTimesFor(LOAD_MIME_MESSAGE_METRIC_NAME))
+            .hasSize(2);
+    }
+
+    @Test
+    void deleteShouldPublishDeleteTimerMetrics() {
+        Mono.from(messageVault.delete(USER, MESSAGE_ID))
+            .block();
+        Mono.from(messageVault.delete(USER, MESSAGE_ID))
+            .block();
+
+        assertThat(metricFactory.executionTimesFor(DELETE_METRIC_NAME))
+            .hasSize(2);
+    }
+
+    @Test
+    void deleteExpiredMessagesTaskShouldPublishRetentionTimerMetrics() throws 
Exception {
+        Mono.from(getVault().append(DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT))).block();
+        Mono.from(getVault().delete(USER, 
DELETED_MESSAGE.getMessageId())).block();
+
+        getVault().deleteExpiredMessagesTask().run();
+
+        
assertThat(metricFactory.executionTimesFor(DELETE_EXPIRED_MESSAGES_METRIC_NAME))
+            .hasSize(1);
+    }
 }
\ No newline at end of file
diff --git 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
index 18c34f7..670c637 100644
--- 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
+++ 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
@@ -21,6 +21,7 @@ package org.apache.james.metrics.api;
 
 import java.util.function.Supplier;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface MetricFactory {
@@ -43,6 +44,11 @@ public interface MetricFactory {
         return mono.doOnSuccess(success -> timer.stopAndPublish());
     }
 
+    default <T> Flux<T> runPublishingTimerMetric(String name, Flux<T> flux) {
+        TimeMetric timer = timer(name);
+        return flux.doOnComplete(timer::stopAndPublish);
+    }
+
     default void runPublishingTimerMetric(String name, Runnable runnable) {
         runPublishingTimerMetric(name, () -> {
             runnable.run();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to