This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 488faff1d8f9c0499a27f017ea50d0f5ee5fdc58 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Thu Mar 21 16:15:07 2019 +0700 MAILBOX-388 Implement DeletedMessagesVault retention --- .../apache/james/vault/DeletedMessageVault.java | 3 + .../james/vault/memory/DeleteByQueryExecutor.java | 94 +++++++++++++++++ .../vault/memory/MemoryDeletedMessagesVault.java | 26 ++++- .../vault/memory/VaultGarbageCollectionTask.java | 115 +++++++++++++++++++++ .../apache/james/vault/DeletedMessageFixture.java | 15 +++ .../james/vault/DeletedMessageVaultContract.java | 69 +++++++++++++ .../james/vault/DeletedMessageVaultHookTest.java | 2 +- .../memory/MemoryDeletedMessagesVaultTest.java | 3 +- .../modules/vault/DeletedMessageVaultModule.java | 14 +++ .../vault/MailRepositoryDeletedMessageVault.java | 46 +++++++-- .../MailRepositoryDeletedMessageVaultTest.java | 3 +- .../routes/DeletedMessagesVaultRoutesTest.java | 4 +- 12 files changed, 379 insertions(+), 15 deletions(-) diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java index 54d3579..420c7cd 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVault.java @@ -23,6 +23,7 @@ import java.io.InputStream; import org.apache.james.core.User; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.task.Task; import org.apache.james.vault.search.Query; import org.reactivestreams.Publisher; @@ -36,4 +37,6 @@ public interface DeletedMessageVault { Publisher<DeletedMessage> search(User user, Query query); Publisher<User> usersWithVault(); + + Task deleteExpiredMessagesTask(); } diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/DeleteByQueryExecutor.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/DeleteByQueryExecutor.java new file mode 100644 index 0000000..02e6936 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/DeleteByQueryExecutor.java @@ -0,0 +1,94 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.memory; + +import org.apache.james.core.User; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.task.Task; +import org.apache.james.util.FunctionalUtils; +import org.apache.james.vault.DeletedMessageVault; +import org.apache.james.vault.search.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class DeleteByQueryExecutor { + @FunctionalInterface + interface Notifier { + void doNotify(); + } + + static class Notifiers { + private final Notifier userHandledNotifier; + private final Notifier searchErrorNotifier; + private final Notifier deletionErrorNotifier; + private final Notifier permanentlyDeletedMessageNotifyer; + + Notifiers(Notifier userHandledNotifier, Notifier searchErrorNotifier, Notifier deletionErrorNotifier, Notifier permanentlyDeletedMessageNotifyer) { + this.userHandledNotifier = userHandledNotifier; + this.searchErrorNotifier = searchErrorNotifier; + this.deletionErrorNotifier = deletionErrorNotifier; + this.permanentlyDeletedMessageNotifyer = permanentlyDeletedMessageNotifyer; + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(DeleteByQueryExecutor.class); + + private final DeletedMessageVault deletedMessageVault; + + public DeleteByQueryExecutor(DeletedMessageVault deletedMessageVault) { + this.deletedMessageVault = deletedMessageVault; + } + + public Task.Result deleteByQuery(Query query, Notifiers notifiers) { + return Flux.from(deletedMessageVault.usersWithVault()) + .flatMap(user -> deleteByQueryForUser(query, user, notifiers)) + .reduce(Task::combine) + .blockOptional() + .orElse(Task.Result.COMPLETED); + } + + private Mono<Task.Result> deleteByQueryForUser(Query query, User user, Notifiers notifiers) { + return Flux.from(deletedMessageVault.search(user, query)) + .flatMap(message -> deleteMessage(user, message.getMessageId(), notifiers)) + .onErrorResume(e -> { + LOGGER.error("Error encountered while searching old mails in {} vault", user.asString(), e); + notifiers.searchErrorNotifier.doNotify(); + return Mono.just(Task.Result.PARTIAL); + }) + .reduce(Task::combine) + + .map(FunctionalUtils.identityWithSideEffect(() -> LOGGER.info("Retention applied for {} vault", user.asString()))) + .map(FunctionalUtils.identityWithSideEffect(notifiers.userHandledNotifier::doNotify)); + } + + private Mono<Task.Result> deleteMessage(User user, MessageId messageId, Notifiers notifiers) { + return Mono.from(deletedMessageVault.delete(user, messageId)) + .then(Mono.fromRunnable(notifiers.permanentlyDeletedMessageNotifyer::doNotify)) + .thenReturn(Task.Result.COMPLETED) + .onErrorResume(e -> { + LOGGER.error("Error encountered while deleting a mail in {} vault: {}", user.asString(), messageId.serialize(), e); + notifiers.deletionErrorNotifier.doNotify(); + return Mono.just(Task.Result.PARTIAL); + }); + } +} diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java index 3389dcb..d48bfda 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/MemoryDeletedMessagesVault.java @@ -22,14 +22,19 @@ package org.apache.james.vault.memory; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.time.Clock; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.core.User; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.task.Task; import org.apache.james.vault.DeletedMessage; import org.apache.james.vault.DeletedMessageVault; +import org.apache.james.vault.RetentionConfiguration; import org.apache.james.vault.search.Query; import org.reactivestreams.Publisher; @@ -42,10 +47,16 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class MemoryDeletedMessagesVault implements DeletedMessageVault { + private final RetentionConfiguration retentionConfiguration; private final Table<User, MessageId, Pair<DeletedMessage, byte[]>> table; - - public MemoryDeletedMessagesVault() { - table = HashBasedTable.create(); + private final Clock clock; + private DeleteByQueryExecutor deleteByQueryExecutor; + + public MemoryDeletedMessagesVault(RetentionConfiguration retentionConfiguration, Clock clock) { + this.deleteByQueryExecutor = new DeleteByQueryExecutor(this); + this.retentionConfiguration = retentionConfiguration; + this.clock = clock; + this.table = HashBasedTable.create(); } @Override @@ -104,6 +115,15 @@ public class MemoryDeletedMessagesVault implements DeletedMessageVault { } } + public Task deleteExpiredMessagesTask() { + ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), ZoneOffset.UTC); + ZonedDateTime beginningOfRetentionPeriod = now.minus(retentionConfiguration.getRetentionPeriod()); + + return new VaultGarbageCollectionTask( + deleteByQueryExecutor, + beginningOfRetentionPeriod); + } + private Flux<DeletedMessage> listAll(User user) { synchronized (table) { return Flux.fromIterable(ImmutableList.copyOf(table.row(user).values())) diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/VaultGarbageCollectionTask.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/VaultGarbageCollectionTask.java new file mode 100644 index 0000000..93c8ce3 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/memory/VaultGarbageCollectionTask.java @@ -0,0 +1,115 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.memory; + +import java.time.ZonedDateTime; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; +import org.apache.james.vault.search.CriterionFactory; +import org.apache.james.vault.search.Query; + +public class VaultGarbageCollectionTask implements Task { + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final ZonedDateTime beginningOfRetentionPeriod; + private final long handledUserCount; + private final long permanantlyDeletedMessages; + private final long vaultSearchErrorCount; + private final long deletionErrorCount; + + AdditionalInformation(ZonedDateTime beginningOfRetentionPeriod, long handledUserCount, long permanantlyDeletedMessages, long vaultSearchErrorCount, long deletionErrorCount) { + this.beginningOfRetentionPeriod = beginningOfRetentionPeriod; + this.handledUserCount = handledUserCount; + this.permanantlyDeletedMessages = permanantlyDeletedMessages; + this.vaultSearchErrorCount = vaultSearchErrorCount; + this.deletionErrorCount = deletionErrorCount; + } + + public ZonedDateTime getBeginningOfRetentionPeriod() { + return beginningOfRetentionPeriod; + } + + public long getHandledUserCount() { + return handledUserCount; + } + + public long getPermanantlyDeletedMessages() { + return permanantlyDeletedMessages; + } + + public long getVaultSearchErrorCount() { + return vaultSearchErrorCount; + } + + public long getDeletionErrorCount() { + return deletionErrorCount; + } + } + + private static final String TYPE = "deletedMessages/garbageCollection"; + + private final DeleteByQueryExecutor deleteByQueryExecutor; + private final DeleteByQueryExecutor.Notifiers notifiers; + private final AtomicLong handledUserCount; + private final AtomicLong permanantlyDeletedMessages; + private final AtomicLong vaultSearchErrorCount; + private final AtomicLong deletionErrorCount; + private final ZonedDateTime beginningOfRetentionPeriod; + + public VaultGarbageCollectionTask(DeleteByQueryExecutor deleteByQueryExecutor, ZonedDateTime beginningOfRetentionPeriod) { + this.deleteByQueryExecutor = deleteByQueryExecutor; + this.beginningOfRetentionPeriod = beginningOfRetentionPeriod; + + this.handledUserCount = new AtomicLong(0); + this.permanantlyDeletedMessages = new AtomicLong(0); + this.vaultSearchErrorCount = new AtomicLong(0); + this.deletionErrorCount = new AtomicLong(0); + + this.notifiers = new DeleteByQueryExecutor.Notifiers( + handledUserCount::incrementAndGet, + vaultSearchErrorCount::incrementAndGet, + deletionErrorCount::incrementAndGet, + permanantlyDeletedMessages::incrementAndGet); + } + + @Override + public Result run() { + Query query = Query.of(CriterionFactory.deletionDate().beforeOrEquals(beginningOfRetentionPeriod)); + + return deleteByQueryExecutor.deleteByQuery(query, notifiers); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(new AdditionalInformation( + beginningOfRetentionPeriod, + handledUserCount.get(), + permanantlyDeletedMessages.get(), + vaultSearchErrorCount.get(), + deletionErrorCount.get())); + } +} diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java index 73c72fb..0144cac 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageFixture.java @@ -36,6 +36,7 @@ import org.apache.james.mailbox.inmemory.InMemoryMessageId; public interface DeletedMessageFixture { InMemoryMessageId MESSAGE_ID = InMemoryMessageId.of(42); + InMemoryMessageId OLD_MESSAGE_ID = InMemoryMessageId.of(58); InMemoryMessageId MESSAGE_ID_2 = InMemoryMessageId.of(45); InMemoryId MAILBOX_ID_1 = InMemoryId.of(43); InMemoryId MAILBOX_ID_2 = InMemoryId.of(44); @@ -44,6 +45,9 @@ public interface DeletedMessageFixture { User USER_2 = User.fromUsername("dimi...@apache.org"); ZonedDateTime DELIVERY_DATE = ZonedDateTime.parse("2014-10-30T14:12:00Z"); ZonedDateTime DELETION_DATE = ZonedDateTime.parse("2015-10-30T14:12:00Z"); + ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z"); + ZonedDateTime OLD_DELIVERY_DATE = ZonedDateTime.parse("2010-10-30T14:12:00Z"); + ZonedDateTime OLD_DELETION_DATE = ZonedDateTime.parse("2010-10-30T15:12:00Z"); Date INTERNAL_DATE = Date.from(DELIVERY_DATE.toInstant()); byte[] CONTENT = "header: value\r\n\r\ncontent".getBytes(StandardCharsets.UTF_8); String SUBJECT = "subject"; @@ -74,5 +78,16 @@ public interface DeletedMessageFixture { .subject(SUBJECT) .build(); DeletedMessage DELETED_MESSAGE = FINAL_STAGE.get().build(); + DeletedMessage OLD_DELETED_MESSAGE = DeletedMessage.builder() + .messageId(OLD_MESSAGE_ID) + .originMailboxes(MAILBOX_ID_1, MAILBOX_ID_2) + .user(USER) + .deliveryDate(OLD_DELIVERY_DATE) + .deletionDate(OLD_DELETION_DATE) + .sender(MaybeSender.of(SENDER)) + .recipients(RECIPIENT1, RECIPIENT2) + .hasAttachment(false) + .size(CONTENT.length) + .build(); DeletedMessage DELETED_MESSAGE_2 = DELETED_MESSAGE_GENERATOR.apply(MESSAGE_ID_2.getRawId()); } diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java index 0f4b3c6..c28f79f 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultContract.java @@ -24,6 +24,8 @@ import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE; import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_2; import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_GENERATOR; import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID; +import static org.apache.james.vault.DeletedMessageFixture.NOW; +import static org.apache.james.vault.DeletedMessageFixture.OLD_DELETED_MESSAGE; import static org.apache.james.vault.DeletedMessageFixture.USER; import static org.apache.james.vault.DeletedMessageFixture.USER_2; import static org.apache.james.vault.search.Query.ALL; @@ -31,9 +33,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.ByteArrayInputStream; +import java.time.Clock; import java.time.Duration; import org.apache.james.mailbox.inmemory.InMemoryMessageId; +import org.apache.james.task.Task; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.Test; @@ -41,6 +45,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface DeletedMessageVaultContract { + Clock CLOCK = Clock.fixed(NOW.toInstant(), NOW.getZone()); + DeletedMessageVault getVault(); @Test @@ -213,4 +219,67 @@ public interface DeletedMessageVaultContract { assertThat(Flux.from(getVault().search(USER, ALL)).collectList().block()) .isEmpty(); } + + @Test + default void deleteExpiredMessagesTaskShouldCompleteWhenNoMail() { + Task.Result result = getVault().deleteExpiredMessagesTask().run(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + } + + @Test + default void deleteExpiredMessagesTaskShouldCompleteWhenAllMailsDeleted() { + Mono.from(getVault().append(USER, DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + Mono.from(getVault().delete(USER, DELETED_MESSAGE.getMessageId())).block(); + + Task.Result result = getVault().deleteExpiredMessagesTask().run(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + } + + @Test + default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyRecentMails() { + Mono.from(getVault().append(USER, DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + Task.Result result = getVault().deleteExpiredMessagesTask().run(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + } + + @Test + default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyOldMails() { + Mono.from(getVault().append(USER, OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + Task.Result result = getVault().deleteExpiredMessagesTask().run(); + + assertThat(result).isEqualTo(Task.Result.COMPLETED); + } + + @Test + default void deleteExpiredMessagesTaskShouldDoNothingWhenEmpty() { + getVault().deleteExpiredMessagesTask().run(); + + assertThat(Flux.from(getVault().search(USER, ALL)).collectList().block()) + .isEmpty(); + } + + @Test + default void deleteExpiredMessagesTaskShouldNotDeleteRecentMails() { + Mono.from(getVault().append(USER, DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + getVault().deleteExpiredMessagesTask().run(); + + assertThat(Flux.from(getVault().search(USER, ALL)).collectList().block()) + .containsOnly(DELETED_MESSAGE); + } + + @Test + default void deleteExpiredMessagesTaskShouldDeleteOldMails() { + Mono.from(getVault().append(USER, OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + getVault().deleteExpiredMessagesTask().run(); + + assertThat(Flux.from(getVault().search(USER, ALL)).collectList().block()) + .isEmpty(); + } } diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java index 26c3ca4..324f20c 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java @@ -99,7 +99,7 @@ class DeletedMessageVaultHookTest { @BeforeEach void setUp() throws Exception { clock = Clock.fixed(DELETION_DATE.toInstant(), ZoneOffset.UTC); - messageVault = new MemoryDeletedMessagesVault(); + messageVault = new MemoryDeletedMessagesVault(RetentionConfiguration.DEFAULT, clock); DeletedMessageConverter deletedMessageConverter = new DeletedMessageConverter(); diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java index 9b58f29..0fec41e 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/memory/MemoryDeletedMessagesVaultTest.java @@ -22,6 +22,7 @@ package org.apache.james.vault.memory; import org.apache.james.vault.DeletedMessageVault; import org.apache.james.vault.DeletedMessageVaultContract; import org.apache.james.vault.DeletedMessageVaultSearchContract; +import org.apache.james.vault.RetentionConfiguration; import org.junit.jupiter.api.BeforeEach; public class MemoryDeletedMessagesVaultTest implements DeletedMessageVaultContract, DeletedMessageVaultSearchContract.AllContracts { @@ -30,7 +31,7 @@ public class MemoryDeletedMessagesVaultTest implements DeletedMessageVaultContra @BeforeEach void setUp() { - memoryDeletedMessagesVault = new MemoryDeletedMessagesVault(); + memoryDeletedMessagesVault = new MemoryDeletedMessagesVault(RetentionConfiguration.DEFAULT, CLOCK); } @Override diff --git a/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java b/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java index 9c99fcd..32741a6 100644 --- a/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java +++ b/server/container/guice/mailbox-plugin-deleted-messages-vault-guice/src/main/java/org/apache/james/modules/vault/DeletedMessageVaultModule.java @@ -19,6 +19,8 @@ package org.apache.james.modules.vault; +import java.io.FileNotFoundException; + import org.apache.commons.configuration.Configuration; import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.mailrepository.api.MailRepositoryUrl; @@ -27,6 +29,7 @@ import org.apache.james.mailrepository.memory.MailRepositoryStoreConfiguration; import org.apache.james.utils.PropertiesProvider; import org.apache.james.vault.DeletedMessageVault; import org.apache.james.vault.MailRepositoryDeletedMessageVault; +import org.apache.james.vault.RetentionConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,4 +61,15 @@ public class DeletedMessageVaultModule extends AbstractModule { MailRepositoryUrl.fromPathAndProtocol(defaultProtocol, DEFAULT_PATH)); } } + + @Provides + RetentionConfiguration providesRetentionConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException, org.apache.commons.configuration.ConfigurationException { + try { + Configuration configuration = propertiesProvider.getConfiguration("deletedMessageVault"); + return RetentionConfiguration.from(configuration); + } catch (FileNotFoundException e) { + LOGGER.warn("Error encountered while retrieving Deleted message vault configuration. Using default MailRepository RetentionTime (1 year) instead."); + return RetentionConfiguration.DEFAULT; + } + } } diff --git a/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java b/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java index a99014b..4d8ab8c 100644 --- a/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java +++ b/server/mailrepository/deleted-messages-vault-repository/src/main/java/org/apache/james/vault/MailRepositoryDeletedMessageVault.java @@ -20,6 +20,10 @@ package org.apache.james.vault; import java.io.InputStream; +import java.time.Clock; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.List; import javax.inject.Inject; import javax.mail.MessagingException; @@ -28,11 +32,13 @@ import org.apache.james.core.User; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; -import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.mailrepository.api.MailRepositoryStore; import org.apache.james.mailrepository.api.MailRepositoryUrl; import org.apache.james.server.core.MimeMessageInputStream; +import org.apache.james.task.Task; import org.apache.james.util.streams.Iterators; +import org.apache.james.vault.memory.DeleteByQueryExecutor; +import org.apache.james.vault.memory.VaultGarbageCollectionTask; import org.apache.james.vault.search.Query; import org.reactivestreams.Publisher; @@ -59,11 +65,17 @@ public class MailRepositoryDeletedMessageVault implements DeletedMessageVault { } private final MailRepositoryStore mailRepositoryStore; + private final RetentionConfiguration retentionConfiguration; private final Configuration configuration; private final MailConverter mailConverter; + private final DeleteByQueryExecutor deleteByQueryExecutor; + private final Clock clock; @Inject - MailRepositoryDeletedMessageVault(MailRepositoryStore mailRepositoryStore, Configuration configuration, MailConverter mailConverter) { + MailRepositoryDeletedMessageVault(MailRepositoryStore mailRepositoryStore, RetentionConfiguration retentionConfiguration, Configuration configuration, MailConverter mailConverter, Clock clock) { + this.retentionConfiguration = retentionConfiguration; + this.clock = clock; + this.deleteByQueryExecutor = new DeleteByQueryExecutor(this); this.mailRepositoryStore = mailRepositoryStore; this.configuration = configuration; this.mailConverter = mailConverter; @@ -130,12 +142,30 @@ public class MailRepositoryDeletedMessageVault implements DeletedMessageVault { @Override public Publisher<User> usersWithVault() { return Flux.fromStream(mailRepositoryStore.getUrls() - .filter(url -> url.hasPrefix(configuration.urlPrefix)) - .map(MailRepositoryUrl::getPath) - .map(MailRepositoryPath::parts) - .peek(parts -> Preconditions.checkState(!parts.isEmpty())) - .map(Iterables::getLast) - .map(User::fromUsername)); + .filter(this::isVault) + .map(this::userForRepository)); + } + + private boolean isVault(MailRepositoryUrl url) { + return url.hasPrefix(configuration.urlPrefix); + } + + private User userForRepository(MailRepositoryUrl url) { + Preconditions.checkArgument(isVault(url)); + + List<String> parts = url.getPath().parts(); + + return User.fromUsername(Iterables.getLast(parts)); + } + + @Override + public Task deleteExpiredMessagesTask() { + ZonedDateTime now = ZonedDateTime.ofInstant(clock.instant(), ZoneOffset.UTC); + ZonedDateTime beginningOfRetentionPeriod = now.minus(retentionConfiguration.getRetentionPeriod()); + + return new VaultGarbageCollectionTask( + deleteByQueryExecutor, + beginningOfRetentionPeriod); } private MailRepository repositoryForUser(User user) { diff --git a/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java b/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java index 87c6205..2cc58eb 100644 --- a/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java +++ b/server/mailrepository/deleted-messages-vault-repository/src/test/java/org/apache/james/vault/MailRepositoryDeletedMessageVaultTest.java @@ -55,8 +55,9 @@ public class MailRepositoryDeletedMessageVaultTest implements DeletedMessageVaul testee = new MailRepositoryDeletedMessageVault( mailRepositoryStore, + RetentionConfiguration.DEFAULT, new MailRepositoryDeletedMessageVault.Configuration(MailRepositoryUrl.from("memory://deletedMessages/vault/")), - new MailConverter(new InMemoryId.Factory(), new InMemoryMessageId.Factory())); + new MailConverter(new InMemoryId.Factory(), new InMemoryMessageId.Factory()), CLOCK); } @Override diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java index 82f84be..b08c456 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java @@ -57,6 +57,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.time.Clock; import java.util.List; import java.util.Optional; import java.util.stream.Stream; @@ -92,6 +93,7 @@ import org.apache.james.task.MemoryTaskManager; import org.apache.james.user.memory.MemoryUsersRepository; import org.apache.james.vault.DeletedMessage; import org.apache.james.vault.DeletedMessageZipper; +import org.apache.james.vault.RetentionConfiguration; import org.apache.james.vault.memory.MemoryDeletedMessagesVault; import org.apache.james.vault.search.Query; import org.apache.james.webadmin.WebAdminServer; @@ -147,7 +149,7 @@ class DeletedMessagesVaultRoutesTest { @BeforeEach void beforeEach() throws Exception { - vault = spy(new MemoryDeletedMessagesVault()); + vault = spy(new MemoryDeletedMessagesVault(RetentionConfiguration.DEFAULT, Clock.systemUTC())); InMemoryIntegrationResources inMemoryResource = InMemoryIntegrationResources.defaultResources(); mailboxManager = spy(inMemoryResource.getMailboxManager()); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org