This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 488faff1d8f9c0499a27f017ea50d0f5ee5fdc58
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Thu Mar 21 16:15:07 2019 +0700

    MAILBOX-388 Implement DeletedMessagesVault retention
---
 .../apache/james/vault/DeletedMessageVault.java    |   3 +
 .../james/vault/memory/DeleteByQueryExecutor.java  |  94 +++++++++++++++++
 .../vault/memory/MemoryDeletedMessagesVault.java   |  26 ++++-
 .../vault/memory/VaultGarbageCollectionTask.java   | 115 +++++++++++++++++++++
 .../apache/james/vault/DeletedMessageFixture.java  |  15 +++
 .../james/vault/DeletedMessageVaultContract.java   |  69 +++++++++++++
 .../james/vault/DeletedMessageVaultHookTest.java   |   2 +-
 .../memory/MemoryDeletedMessagesVaultTest.java     |   3 +-
 .../modules/vault/DeletedMessageVaultModule.java   |  14 +++
 .../vault/MailRepositoryDeletedMessageVault.java   |  46 +++++++--
 .../MailRepositoryDeletedMessageVaultTest.java     |   3 +-
 .../routes/DeletedMessagesVaultRoutesTest.java     |   4 +-
 12 files changed, 379 insertions(+), 15 deletions(-)

diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java
index 54d3579..420c7cd 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 
 import org.apache.james.core.User;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.task.Task;
 import org.apache.james.vault.search.Query;
 import org.reactivestreams.Publisher;
 
@@ -36,4 +37,6 @@ public interface DeletedMessageVault {
     Publisher<DeletedMessage> search(User user, Query query);
 
     Publisher<User> usersWithVault();
+
+    Task deleteExpiredMessagesTask();
 }
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/DeleteByQueryExecutor.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/DeleteByQueryExecutor.java
new file mode 100644
index 0000000..02e6936
--- /dev/null
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/DeleteByQueryExecutor.java
@@ -0,0 +1,94 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.vault.memory;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.task.Task;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.vault.DeletedMessageVault;
+import org.apache.james.vault.search.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class DeleteByQueryExecutor {
+    @FunctionalInterface
+    interface Notifier {
+        void doNotify();
+    }
+
+    static class Notifiers {
+        private final Notifier userHandledNotifier;
+        private final Notifier searchErrorNotifier;
+        private final Notifier deletionErrorNotifier;
+        private final Notifier permanentlyDeletedMessageNotifyer;
+
+        Notifiers(Notifier userHandledNotifier, Notifier searchErrorNotifier, 
Notifier deletionErrorNotifier, Notifier permanentlyDeletedMessageNotifyer) {
+            this.userHandledNotifier = userHandledNotifier;
+            this.searchErrorNotifier = searchErrorNotifier;
+            this.deletionErrorNotifier = deletionErrorNotifier;
+            this.permanentlyDeletedMessageNotifyer = 
permanentlyDeletedMessageNotifyer;
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DeleteByQueryExecutor.class);
+
+    private final DeletedMessageVault deletedMessageVault;
+
+    public DeleteByQueryExecutor(DeletedMessageVault deletedMessageVault) {
+        this.deletedMessageVault = deletedMessageVault;
+    }
+
+    public Task.Result deleteByQuery(Query query, Notifiers notifiers) {
+        return Flux.from(deletedMessageVault.usersWithVault())
+            .flatMap(user -> deleteByQueryForUser(query, user, notifiers))
+            .reduce(Task::combine)
+            .blockOptional()
+            .orElse(Task.Result.COMPLETED);
+    }
+
+    private Mono<Task.Result> deleteByQueryForUser(Query query, User user, 
Notifiers notifiers) {
+        return Flux.from(deletedMessageVault.search(user, query))
+            .flatMap(message -> deleteMessage(user, message.getMessageId(), 
notifiers))
+            .onErrorResume(e -> {
+                LOGGER.error("Error encountered while searching old mails in 
{} vault", user.asString(), e);
+                notifiers.searchErrorNotifier.doNotify();
+                return Mono.just(Task.Result.PARTIAL);
+            })
+            .reduce(Task::combine)
+
+            .map(FunctionalUtils.identityWithSideEffect(() -> 
LOGGER.info("Retention applied for {} vault", user.asString())))
+            
.map(FunctionalUtils.identityWithSideEffect(notifiers.userHandledNotifier::doNotify));
+    }
+
+    private Mono<Task.Result> deleteMessage(User user, MessageId messageId, 
Notifiers notifiers) {
+        return Mono.from(deletedMessageVault.delete(user, messageId))
+            
.then(Mono.fromRunnable(notifiers.permanentlyDeletedMessageNotifyer::doNotify))
+            .thenReturn(Task.Result.COMPLETED)
+            .onErrorResume(e -> {
+                LOGGER.error("Error encountered while deleting a mail in {} 
vault: {}", user.asString(), messageId.serialize(), e);
+                notifiers.deletionErrorNotifier.doNotify();
+                return Mono.just(Task.Result.PARTIAL);
+            });
+    }
+}
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java
index 3389dcb..d48bfda 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java
@@ -22,14 +22,19 @@ package org.apache.james.vault.memory;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.core.User;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.task.Task;
 import org.apache.james.vault.DeletedMessage;
 import org.apache.james.vault.DeletedMessageVault;
+import org.apache.james.vault.RetentionConfiguration;
 import org.apache.james.vault.search.Query;
 import org.reactivestreams.Publisher;
 
@@ -42,10 +47,16 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class MemoryDeletedMessagesVault implements DeletedMessageVault {
+    private final RetentionConfiguration retentionConfiguration;
     private final Table<User, MessageId, Pair<DeletedMessage, byte[]>> table;
-
-    public MemoryDeletedMessagesVault() {
-        table = HashBasedTable.create();
+    private final Clock clock;
+    private DeleteByQueryExecutor deleteByQueryExecutor;
+
+    public MemoryDeletedMessagesVault(RetentionConfiguration 
retentionConfiguration, Clock clock) {
+        this.deleteByQueryExecutor = new DeleteByQueryExecutor(this);
+        this.retentionConfiguration = retentionConfiguration;
+        this.clock = clock;
+        this.table = HashBasedTable.create();
     }
 
     @Override
@@ -104,6 +115,15 @@ public class MemoryDeletedMessagesVault implements 
DeletedMessageVault {
         }
     }
 
+    public Task deleteExpiredMessagesTask() {
+        ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), 
ZoneOffset.UTC);
+        ZonedDateTime beginningOfRetentionPeriod = 
now.minus(retentionConfiguration.getRetentionPeriod());
+
+        return new VaultGarbageCollectionTask(
+            deleteByQueryExecutor,
+            beginningOfRetentionPeriod);
+    }
+
     private Flux<DeletedMessage> listAll(User user) {
         synchronized (table) {
             return 
Flux.fromIterable(ImmutableList.copyOf(table.row(user).values()))
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/VaultGarbageCollectionTask.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/VaultGarbageCollectionTask.java
new file mode 100644
index 0000000..93c8ce3
--- /dev/null
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/VaultGarbageCollectionTask.java
@@ -0,0 +1,115 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.vault.memory;
+
+import java.time.ZonedDateTime;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.vault.search.CriterionFactory;
+import org.apache.james.vault.search.Query;
+
+public class VaultGarbageCollectionTask implements Task {
+    public static class AdditionalInformation implements 
TaskExecutionDetails.AdditionalInformation {
+        private final ZonedDateTime beginningOfRetentionPeriod;
+        private final long handledUserCount;
+        private final long permanantlyDeletedMessages;
+        private final long vaultSearchErrorCount;
+        private final long deletionErrorCount;
+
+        AdditionalInformation(ZonedDateTime beginningOfRetentionPeriod, long 
handledUserCount, long permanantlyDeletedMessages, long vaultSearchErrorCount, 
long deletionErrorCount) {
+            this.beginningOfRetentionPeriod = beginningOfRetentionPeriod;
+            this.handledUserCount = handledUserCount;
+            this.permanantlyDeletedMessages = permanantlyDeletedMessages;
+            this.vaultSearchErrorCount = vaultSearchErrorCount;
+            this.deletionErrorCount = deletionErrorCount;
+        }
+
+        public  ZonedDateTime getBeginningOfRetentionPeriod() {
+            return beginningOfRetentionPeriod;
+        }
+
+        public long getHandledUserCount() {
+            return handledUserCount;
+        }
+
+        public long getPermanantlyDeletedMessages() {
+            return permanantlyDeletedMessages;
+        }
+
+        public long getVaultSearchErrorCount() {
+            return vaultSearchErrorCount;
+        }
+
+        public long getDeletionErrorCount() {
+            return deletionErrorCount;
+        }
+    }
+
+    private static final String TYPE = "deletedMessages/garbageCollection";
+
+    private final DeleteByQueryExecutor deleteByQueryExecutor;
+    private final DeleteByQueryExecutor.Notifiers notifiers;
+    private final AtomicLong handledUserCount;
+    private final AtomicLong permanantlyDeletedMessages;
+    private final AtomicLong vaultSearchErrorCount;
+    private final AtomicLong deletionErrorCount;
+    private final ZonedDateTime beginningOfRetentionPeriod;
+
+    public VaultGarbageCollectionTask(DeleteByQueryExecutor 
deleteByQueryExecutor, ZonedDateTime beginningOfRetentionPeriod) {
+        this.deleteByQueryExecutor = deleteByQueryExecutor;
+        this.beginningOfRetentionPeriod = beginningOfRetentionPeriod;
+
+        this.handledUserCount = new AtomicLong(0);
+        this.permanantlyDeletedMessages = new AtomicLong(0);
+        this.vaultSearchErrorCount = new AtomicLong(0);
+        this.deletionErrorCount = new AtomicLong(0);
+
+        this.notifiers = new DeleteByQueryExecutor.Notifiers(
+            handledUserCount::incrementAndGet,
+            vaultSearchErrorCount::incrementAndGet,
+            deletionErrorCount::incrementAndGet,
+            permanantlyDeletedMessages::incrementAndGet);
+    }
+
+    @Override
+    public Result run() {
+        Query query = 
Query.of(CriterionFactory.deletionDate().beforeOrEquals(beginningOfRetentionPeriod));
+
+        return deleteByQueryExecutor.deleteByQuery(query, notifiers);
+    }
+
+    @Override
+    public String type() {
+        return TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(new AdditionalInformation(
+            beginningOfRetentionPeriod,
+            handledUserCount.get(),
+            permanantlyDeletedMessages.get(),
+            vaultSearchErrorCount.get(),
+            deletionErrorCount.get()));
+    }
+}
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java
index 73c72fb..0144cac 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.inmemory.InMemoryMessageId;
 
 public interface DeletedMessageFixture {
     InMemoryMessageId MESSAGE_ID = InMemoryMessageId.of(42);
+    InMemoryMessageId OLD_MESSAGE_ID = InMemoryMessageId.of(58);
     InMemoryMessageId MESSAGE_ID_2 = InMemoryMessageId.of(45);
     InMemoryId MAILBOX_ID_1 = InMemoryId.of(43);
     InMemoryId MAILBOX_ID_2 = InMemoryId.of(44);
@@ -44,6 +45,9 @@ public interface DeletedMessageFixture {
     User USER_2 = User.fromUsername("dimi...@apache.org");
     ZonedDateTime DELIVERY_DATE = ZonedDateTime.parse("2014-10-30T14:12:00Z");
     ZonedDateTime DELETION_DATE = ZonedDateTime.parse("2015-10-30T14:12:00Z");
+    ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    ZonedDateTime OLD_DELIVERY_DATE = 
ZonedDateTime.parse("2010-10-30T14:12:00Z");
+    ZonedDateTime OLD_DELETION_DATE = 
ZonedDateTime.parse("2010-10-30T15:12:00Z");
     Date INTERNAL_DATE = Date.from(DELIVERY_DATE.toInstant());
     byte[] CONTENT = "header: 
value\r\n\r\ncontent".getBytes(StandardCharsets.UTF_8);
     String SUBJECT = "subject";
@@ -74,5 +78,16 @@ public interface DeletedMessageFixture {
         .subject(SUBJECT)
         .build();
     DeletedMessage DELETED_MESSAGE = FINAL_STAGE.get().build();
+    DeletedMessage OLD_DELETED_MESSAGE = DeletedMessage.builder()
+        .messageId(OLD_MESSAGE_ID)
+        .originMailboxes(MAILBOX_ID_1, MAILBOX_ID_2)
+        .user(USER)
+        .deliveryDate(OLD_DELIVERY_DATE)
+        .deletionDate(OLD_DELETION_DATE)
+        .sender(MaybeSender.of(SENDER))
+        .recipients(RECIPIENT1, RECIPIENT2)
+        .hasAttachment(false)
+        .size(CONTENT.length)
+        .build();
     DeletedMessage DELETED_MESSAGE_2 = 
DELETED_MESSAGE_GENERATOR.apply(MESSAGE_ID_2.getRawId());
 }
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java
index 0f4b3c6..c28f79f 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java
@@ -24,6 +24,8 @@ import static 
org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE;
 import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_2;
 import static 
org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_GENERATOR;
 import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID;
+import static org.apache.james.vault.DeletedMessageFixture.NOW;
+import static org.apache.james.vault.DeletedMessageFixture.OLD_DELETED_MESSAGE;
 import static org.apache.james.vault.DeletedMessageFixture.USER;
 import static org.apache.james.vault.DeletedMessageFixture.USER_2;
 import static org.apache.james.vault.search.Query.ALL;
@@ -31,9 +33,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
+import java.time.Clock;
 import java.time.Duration;
 
 import org.apache.james.mailbox.inmemory.InMemoryMessageId;
+import org.apache.james.task.Task;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.Test;
 
@@ -41,6 +45,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface DeletedMessageVaultContract {
+    Clock CLOCK = Clock.fixed(NOW.toInstant(), NOW.getZone());
+
     DeletedMessageVault getVault();
 
     @Test
@@ -213,4 +219,67 @@ public interface DeletedMessageVaultContract {
         assertThat(Flux.from(getVault().search(USER, 
ALL)).collectList().block())
             .isEmpty();
     }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldCompleteWhenNoMail() {
+        Task.Result result = getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+    }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldCompleteWhenAllMailsDeleted() {
+        Mono.from(getVault().append(USER, DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT))).block();
+        Mono.from(getVault().delete(USER, 
DELETED_MESSAGE.getMessageId())).block();
+
+        Task.Result result = getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+    }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyRecentMails() {
+        Mono.from(getVault().append(USER, DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT))).block();
+
+        Task.Result result = getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+    }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyOldMails() {
+        Mono.from(getVault().append(USER, OLD_DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT))).block();
+
+        Task.Result result = getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+    }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldDoNothingWhenEmpty() {
+        getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(Flux.from(getVault().search(USER, 
ALL)).collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldNotDeleteRecentMails() {
+        Mono.from(getVault().append(USER, DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT))).block();
+
+        getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(Flux.from(getVault().search(USER, 
ALL)).collectList().block())
+            .containsOnly(DELETED_MESSAGE);
+    }
+
+    @Test
+    default void deleteExpiredMessagesTaskShouldDeleteOldMails() {
+        Mono.from(getVault().append(USER, OLD_DELETED_MESSAGE, new 
ByteArrayInputStream(CONTENT))).block();
+
+        getVault().deleteExpiredMessagesTask().run();
+
+        assertThat(Flux.from(getVault().search(USER, 
ALL)).collectList().block())
+            .isEmpty();
+    }
 }
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 26c3ca4..324f20c 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -99,7 +99,7 @@ class DeletedMessageVaultHookTest {
     @BeforeEach
     void setUp() throws Exception {
         clock = Clock.fixed(DELETION_DATE.toInstant(), ZoneOffset.UTC);
-        messageVault = new MemoryDeletedMessagesVault();
+        messageVault = new 
MemoryDeletedMessagesVault(RetentionConfiguration.DEFAULT, clock);
 
         DeletedMessageConverter deletedMessageConverter = new 
DeletedMessageConverter();
 
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java
index 9b58f29..0fec41e 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java
@@ -22,6 +22,7 @@ package org.apache.james.vault.memory;
 import org.apache.james.vault.DeletedMessageVault;
 import org.apache.james.vault.DeletedMessageVaultContract;
 import org.apache.james.vault.DeletedMessageVaultSearchContract;
+import org.apache.james.vault.RetentionConfiguration;
 import org.junit.jupiter.api.BeforeEach;
 
 public class MemoryDeletedMessagesVaultTest implements 
DeletedMessageVaultContract, DeletedMessageVaultSearchContract.AllContracts {
@@ -30,7 +31,7 @@ public class MemoryDeletedMessagesVaultTest implements 
DeletedMessageVaultContra
 
     @BeforeEach
     void setUp() {
-        memoryDeletedMessagesVault = new MemoryDeletedMessagesVault();
+        memoryDeletedMessagesVault = new 
MemoryDeletedMessagesVault(RetentionConfiguration.DEFAULT, CLOCK);
     }
 
     @Override
diff --git 
a/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java
 
b/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java
index 9c99fcd..32741a6 100644
--- 
a/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java
+++ 
b/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.modules.vault;
 
+import java.io.FileNotFoundException;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
@@ -27,6 +29,7 @@ import 
org.apache.james.mailrepository.memory.MailRepositoryStoreConfiguration;
 import org.apache.james.utils.PropertiesProvider;
 import org.apache.james.vault.DeletedMessageVault;
 import org.apache.james.vault.MailRepositoryDeletedMessageVault;
+import org.apache.james.vault.RetentionConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,4 +61,15 @@ public class DeletedMessageVaultModule extends 
AbstractModule {
                 MailRepositoryUrl.fromPathAndProtocol(defaultProtocol, 
DEFAULT_PATH));
         }
     }
+
+    @Provides
+    RetentionConfiguration providesRetentionConfiguration(PropertiesProvider 
propertiesProvider) throws ConfigurationException, 
org.apache.commons.configuration.ConfigurationException {
+        try {
+            Configuration configuration = 
propertiesProvider.getConfiguration("deletedMessageVault");
+            return RetentionConfiguration.from(configuration);
+        } catch (FileNotFoundException e) {
+            LOGGER.warn("Error encountered while retrieving Deleted message 
vault configuration. Using default MailRepository RetentionTime (1 year) 
instead.");
+            return RetentionConfiguration.DEFAULT;
+        }
+    }
 }
diff --git 
a/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java
 
b/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java
index a99014b..4d8ab8c 100644
--- 
a/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java
+++ 
b/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java
@@ -20,6 +20,10 @@
 package org.apache.james.vault;
 
 import java.io.InputStream;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.List;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -28,11 +32,13 @@ import org.apache.james.core.User;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
-import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.mailrepository.api.MailRepositoryStore;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.apache.james.server.core.MimeMessageInputStream;
+import org.apache.james.task.Task;
 import org.apache.james.util.streams.Iterators;
+import org.apache.james.vault.memory.DeleteByQueryExecutor;
+import org.apache.james.vault.memory.VaultGarbageCollectionTask;
 import org.apache.james.vault.search.Query;
 import org.reactivestreams.Publisher;
 
@@ -59,11 +65,17 @@ public class MailRepositoryDeletedMessageVault implements 
DeletedMessageVault {
     }
 
     private final MailRepositoryStore mailRepositoryStore;
+    private final RetentionConfiguration retentionConfiguration;
     private final Configuration configuration;
     private final MailConverter mailConverter;
+    private final DeleteByQueryExecutor deleteByQueryExecutor;
+    private final Clock clock;
 
     @Inject
-    MailRepositoryDeletedMessageVault(MailRepositoryStore mailRepositoryStore, 
Configuration configuration, MailConverter mailConverter) {
+    MailRepositoryDeletedMessageVault(MailRepositoryStore mailRepositoryStore, 
RetentionConfiguration retentionConfiguration, Configuration configuration, 
MailConverter mailConverter, Clock clock) {
+        this.retentionConfiguration = retentionConfiguration;
+        this.clock = clock;
+        this.deleteByQueryExecutor = new DeleteByQueryExecutor(this);
         this.mailRepositoryStore = mailRepositoryStore;
         this.configuration = configuration;
         this.mailConverter = mailConverter;
@@ -130,12 +142,30 @@ public class MailRepositoryDeletedMessageVault implements 
DeletedMessageVault {
     @Override
     public Publisher<User> usersWithVault() {
         return Flux.fromStream(mailRepositoryStore.getUrls()
-            .filter(url -> url.hasPrefix(configuration.urlPrefix))
-            .map(MailRepositoryUrl::getPath)
-            .map(MailRepositoryPath::parts)
-            .peek(parts -> Preconditions.checkState(!parts.isEmpty()))
-            .map(Iterables::getLast)
-            .map(User::fromUsername));
+            .filter(this::isVault)
+            .map(this::userForRepository));
+    }
+
+    private boolean isVault(MailRepositoryUrl url) {
+        return url.hasPrefix(configuration.urlPrefix);
+    }
+
+    private User userForRepository(MailRepositoryUrl url) {
+        Preconditions.checkArgument(isVault(url));
+
+        List<String> parts = url.getPath().parts();
+
+        return User.fromUsername(Iterables.getLast(parts));
+    }
+
+    @Override
+    public Task deleteExpiredMessagesTask() {
+        ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), 
ZoneOffset.UTC);
+        ZonedDateTime beginningOfRetentionPeriod = 
now.minus(retentionConfiguration.getRetentionPeriod());
+
+        return new VaultGarbageCollectionTask(
+            deleteByQueryExecutor,
+            beginningOfRetentionPeriod);
     }
 
     private MailRepository repositoryForUser(User user) {
diff --git 
a/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java
 
b/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java
index 87c6205..2cc58eb 100644
--- 
a/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java
+++ 
b/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java
@@ -55,8 +55,9 @@ public class MailRepositoryDeletedMessageVaultTest implements 
DeletedMessageVaul
 
         testee = new MailRepositoryDeletedMessageVault(
             mailRepositoryStore,
+            RetentionConfiguration.DEFAULT,
             new 
MailRepositoryDeletedMessageVault.Configuration(MailRepositoryUrl.from("memory://deletedMessages/vault/")),
-            new MailConverter(new InMemoryId.Factory(), new 
InMemoryMessageId.Factory()));
+            new MailConverter(new InMemoryId.Factory(), new 
InMemoryMessageId.Factory()), CLOCK);
     }
 
     @Override
diff --git 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
index 82f84be..b08c456 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
@@ -57,6 +57,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.time.Clock;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Stream;
@@ -92,6 +93,7 @@ import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.user.memory.MemoryUsersRepository;
 import org.apache.james.vault.DeletedMessage;
 import org.apache.james.vault.DeletedMessageZipper;
+import org.apache.james.vault.RetentionConfiguration;
 import org.apache.james.vault.memory.MemoryDeletedMessagesVault;
 import org.apache.james.vault.search.Query;
 import org.apache.james.webadmin.WebAdminServer;
@@ -147,7 +149,7 @@ class DeletedMessagesVaultRoutesTest {
 
     @BeforeEach
     void beforeEach() throws Exception {
-        vault = spy(new MemoryDeletedMessagesVault());
+        vault = spy(new 
MemoryDeletedMessagesVault(RetentionConfiguration.DEFAULT, Clock.systemUTC()));
         InMemoryIntegrationResources inMemoryResource = 
InMemoryIntegrationResources.defaultResources();
         mailboxManager = spy(inMemoryResource.getMailboxManager());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to