This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c458cea86af5c48ccd844dff6d202834fc2be44d Author: Benoit Tellier <[email protected]> AuthorDate: Sat Mar 5 22:06:45 2022 +0700 JAMES-3723 Allow to not consume emails upon reprocessing --- ...dminServerTaskSerializationIntegrationTest.java | 1 + .../webadmin/routes/MailRepositoriesRoutes.java | 20 +++-- .../webadmin/service/ReprocessingAllMailsTask.java | 37 +++------ ...essingAllMailsTaskAdditionalInformationDTO.java | 18 ++++- .../service/ReprocessingAllMailsTaskDTO.java | 20 +++-- .../webadmin/service/ReprocessingOneMailTask.java | 38 +++------ ...cessingOneMailTaskAdditionalInformationDTO.java | 18 ++++- .../service/ReprocessingOneMailTaskDTO.java | 24 ++++-- .../webadmin/service/ReprocessingService.java | 44 ++++++++--- .../routes/MailRepositoriesRoutesTest.java | 91 ++++++++++++++++++++++ .../service/ReprocessingAllMailsTaskTest.java | 49 +++++++++--- .../service/ReprocessingOneMailTaskTest.java | 42 ++++++++-- .../webadmin/service/ReprocessingServiceTest.java | 12 +-- 13 files changed, 304 insertions(+), 110 deletions(-) diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java index 4e3ac97..4e5ba18 100644 --- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java +++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java @@ -344,6 +344,7 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest { .body("type", is("reprocessing-one")) .body("additionalInformation.repositoryPath", is(mailRepositoryUrl.asString())) .body("additionalInformation.targetQueue", is(notNullValue())) + .body("additionalInformation.consume", is(notNullValue())) .body("additionalInformation.mailKey", is(mailKey)) .body("additionalInformation.targetProcessor", is(nullValue())); } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java index aea7771..97c5634 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java @@ -322,11 +322,15 @@ public class MailRepositoriesRoutes implements Routes { private Task reprocessAll(Request request) throws UnsupportedEncodingException, MailRepositoryStore.MailRepositoryStoreException { MailRepositoryPath path = decodedRepositoryPath(request); - Optional<String> targetProcessor = parseTargetProcessor(request); - MailQueueName targetQueue = parseTargetQueue(request); Long repositorySize = repositoryStoreService.size(path).orElse(0L); - return new ReprocessingAllMailsTask(reprocessingService, repositorySize, path, targetQueue, targetProcessor); + return new ReprocessingAllMailsTask(reprocessingService, repositorySize, path, extractConfiguration(request)); + } + + private ReprocessingService.Configuration extractConfiguration(Request request) { + return new ReprocessingService.Configuration(parseTargetQueue(request), + parseTargetProcessor(request), + parseConsume(request).orElse(true)); } public void defineReprocessOne() { @@ -340,10 +344,7 @@ public class MailRepositoriesRoutes implements Routes { MailRepositoryPath path = decodedRepositoryPath(request); MailKey key = new MailKey(request.params("key")); - Optional<String> targetProcessor = parseTargetProcessor(request); - MailQueueName targetQueue = parseTargetQueue(request); - - return new ReprocessingOneMailTask(reprocessingService, path, targetQueue, key, targetProcessor, Clock.systemUTC()); + return new ReprocessingOneMailTask(reprocessingService, path, extractConfiguration(request), key, Clock.systemUTC()); } private Set<AdditionalField> extractAdditionalFields(String additionalFieldsParam) throws IllegalArgumentException { @@ -361,6 +362,11 @@ public class MailRepositoriesRoutes implements Routes { return Optional.ofNullable(request.queryParams("processor")); } + private Optional<Boolean> parseConsume(Request request) { + return Optional.ofNullable(request.queryParams("consume")) + .map(Boolean::parseBoolean); + } + private MailQueueName parseTargetQueue(Request request) { return Optional.ofNullable(request.queryParams("queue")) .map(MailQueueName::of) diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java index 5d8ea39..ef930f9 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java @@ -29,7 +29,6 @@ import javax.mail.MessagingException; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.mailrepository.api.MailRepositoryStore; -import org.apache.james.queue.api.MailQueueName; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskType; @@ -40,27 +39,21 @@ public class ReprocessingAllMailsTask implements Task { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { private final MailRepositoryPath repositoryPath; - private final String targetQueue; - private final Optional<String> targetProcessor; + private final ReprocessingService.Configuration configuration; private final long initialCount; private final long remainingCount; private final Instant timestamp; - public AdditionalInformation(MailRepositoryPath repositoryPath, MailQueueName targetQueue, Optional<String> targetProcessor, long initialCount, long remainingCount, Instant timestamp) { + public AdditionalInformation(MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration, long initialCount, long remainingCount, Instant timestamp) { this.repositoryPath = repositoryPath; - this.targetQueue = targetQueue.asString(); - this.targetProcessor = targetProcessor; + this.configuration = configuration; this.initialCount = initialCount; this.remainingCount = remainingCount; this.timestamp = timestamp; } - public String getTargetQueue() { - return targetQueue; - } - - public Optional<String> getTargetProcessor() { - return targetProcessor; + public ReprocessingService.Configuration getConfiguration() { + return configuration; } public String getRepositoryPath() { @@ -97,17 +90,15 @@ public class ReprocessingAllMailsTask implements Task { private final ReprocessingService reprocessingService; private final MailRepositoryPath repositoryPath; - private final MailQueueName targetQueue; - private final Optional<String> targetProcessor; + private final ReprocessingService.Configuration configuration; private final long repositorySize; private final AtomicLong processedCount; public ReprocessingAllMailsTask(ReprocessingService reprocessingService, long repositorySize, - MailRepositoryPath repositoryPath, MailQueueName targetQueue, Optional<String> targetProcessor) { + MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration) { this.reprocessingService = reprocessingService; this.repositoryPath = repositoryPath; - this.targetQueue = targetQueue; - this.targetProcessor = targetProcessor; + this.configuration = configuration; this.repositorySize = repositorySize; this.processedCount = new AtomicLong(0); } @@ -119,7 +110,7 @@ public class ReprocessingAllMailsTask implements Task { @Override public Result run() { try { - reprocessingService.reprocessAll(repositoryPath, targetProcessor, targetQueue, this::notifyProgress); + reprocessingService.reprocessAll(repositoryPath, configuration, this::notifyProgress); return Result.COMPLETED; } catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) { LOGGER.error("Encountered error while reprocessing repository", e); @@ -135,12 +126,8 @@ public class ReprocessingAllMailsTask implements Task { return repositorySize; } - Optional<String> getTargetProcessor() { - return targetProcessor; - } - - MailQueueName getTargetQueue() { - return targetQueue; + ReprocessingService.Configuration getConfiguration() { + return configuration; } @Override @@ -151,7 +138,7 @@ public class ReprocessingAllMailsTask implements Task { @Override public Optional<TaskExecutionDetails.AdditionalInformation> details() { return Optional.of(new AdditionalInformation( - repositoryPath, targetQueue, targetProcessor, repositorySize, repositorySize - processedCount.get(), + repositoryPath, configuration, repositorySize, repositorySize - processedCount.get(), Clock.systemUTC().instant())); } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java index adacac9..55dcdb7 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java @@ -35,16 +35,19 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio .convertToDTO(ReprocessingAllMailsTaskAdditionalInformationDTO.class) .toDomainObjectConverter(dto -> new ReprocessingAllMailsTask.AdditionalInformation( MailRepositoryPath.from(dto.repositoryPath), - MailQueueName.of(dto.targetQueue), - dto.targetProcessor, + new ReprocessingService.Configuration( + MailQueueName.of(dto.getTargetQueue()), + dto.getTargetProcessor(), + dto.isConsume()), dto.initialCount, dto.remainingCount, dto.timestamp)) .toDTOConverter((details, type) -> new ReprocessingAllMailsTaskAdditionalInformationDTO( type, details.getRepositoryPath(), - details.getTargetQueue(), - details.getTargetProcessor(), + details.getConfiguration().getMailQueueName().asString(), + details.getConfiguration().getTargetProcessor(), + Optional.of(details.getConfiguration().isConsume()), details.getInitialCount(), details.getRemainingCount(), details.timestamp())) @@ -56,6 +59,7 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio private final String repositoryPath; private final String targetQueue; private final Optional<String> targetProcessor; + private final boolean consume; private final long initialCount; private final long remainingCount; private final Instant timestamp; @@ -65,6 +69,7 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio @JsonProperty("repositoryPath") String repositoryPath, @JsonProperty("targetQueue") String targetQueue, @JsonProperty("targetProcessor") Optional<String> targetProcessor, + @JsonProperty("consume") Optional<Boolean> consume, @JsonProperty("initialCount") long initialCount, @JsonProperty("remainingCount") long remainingCount, @JsonProperty("timestamp") Instant timestamp) { @@ -75,6 +80,11 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio this.initialCount = initialCount; this.remainingCount = remainingCount; this.timestamp = timestamp; + this.consume = consume.orElse(true); + } + + public boolean isConsume() { + return consume; } @Override diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java index d81f2a8..7a7b722 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java @@ -46,9 +46,9 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO { typeName, domainObject.getRepositorySize(), domainObject.getRepositoryPath().urlEncoded(), - domainObject.getTargetQueue().asString(), - domainObject.getTargetProcessor() - ); + domainObject.getConfiguration().getMailQueueName().asString(), + Optional.of(domainObject.getConfiguration().isConsume()), + domainObject.getConfiguration().getTargetProcessor()); } catch (Exception e) { throw new ReprocessingAllMailsTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath()); } @@ -58,17 +58,20 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO { private final long repositorySize; private final String repositoryPath; private final String targetQueue; + private final boolean consume; private final Optional<String> targetProcessor; public ReprocessingAllMailsTaskDTO(@JsonProperty("type") String type, @JsonProperty("repositorySize") long repositorySize, @JsonProperty("repositoryPath") String repositoryPath, @JsonProperty("targetQueue") String targetQueue, + @JsonProperty("consume") Optional<Boolean> consume, @JsonProperty("targetProcessor") Optional<String> targetProcessor) { this.type = type; this.repositorySize = repositorySize; this.repositoryPath = repositoryPath; this.targetQueue = targetQueue; + this.consume = consume.orElse(true); this.targetProcessor = targetProcessor; } @@ -78,9 +81,10 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO { reprocessingService, repositorySize, MailRepositoryPath.fromEncoded(repositoryPath), - MailQueueName.of(targetQueue), - targetProcessor - ); + new ReprocessingService.Configuration( + MailQueueName.of(targetQueue), + targetProcessor, + consume)); } catch (Exception e) { throw new ReprocessingAllMailsTask.InvalidMailRepositoryPathDeserializationException(repositoryPath); } @@ -103,6 +107,10 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO { return targetQueue; } + public boolean isConsume() { + return consume; + } + public Optional<String> getTargetProcessor() { return targetProcessor; } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java index 2777ce2..bb81055 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java @@ -28,7 +28,6 @@ import javax.mail.MessagingException; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.mailrepository.api.MailRepositoryStore; -import org.apache.james.queue.api.MailQueueName; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskType; @@ -39,16 +38,14 @@ public class ReprocessingOneMailTask implements Task { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { private final MailRepositoryPath repositoryPath; - private final String targetQueue; + private final ReprocessingService.Configuration configuration; private final MailKey mailKey; - private final Optional<String> targetProcessor; private final Instant timestamp; - public AdditionalInformation(MailRepositoryPath repositoryPath, MailQueueName targetQueue, MailKey mailKey, Optional<String> targetProcessor, Instant timestamp) { + public AdditionalInformation(MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration, MailKey mailKey, Instant timestamp) { this.repositoryPath = repositoryPath; - this.targetQueue = targetQueue.asString(); + this.configuration = configuration; this.mailKey = mailKey; - this.targetProcessor = targetProcessor; this.timestamp = timestamp; } @@ -56,12 +53,8 @@ public class ReprocessingOneMailTask implements Task { return mailKey.asString(); } - public String getTargetQueue() { - return targetQueue; - } - - public Optional<String> getTargetProcessor() { - return targetProcessor; + public ReprocessingService.Configuration getConfiguration() { + return configuration; } public String getRepositoryPath() { @@ -90,29 +83,26 @@ public class ReprocessingOneMailTask implements Task { private final ReprocessingService reprocessingService; private final MailRepositoryPath repositoryPath; - private final MailQueueName targetQueue; + private final ReprocessingService.Configuration configuration; private final MailKey mailKey; - private final Optional<String> targetProcessor; private final AdditionalInformation additionalInformation; public ReprocessingOneMailTask(ReprocessingService reprocessingService, MailRepositoryPath repositoryPath, - MailQueueName targetQueue, + ReprocessingService.Configuration configuration, MailKey mailKey, - Optional<String> targetProcessor, Clock clock) { this.reprocessingService = reprocessingService; this.repositoryPath = repositoryPath; - this.targetQueue = targetQueue; + this.configuration = configuration; this.mailKey = mailKey; - this.targetProcessor = targetProcessor; - this.additionalInformation = new AdditionalInformation(repositoryPath, targetQueue, mailKey, targetProcessor, clock.instant()); + this.additionalInformation = new AdditionalInformation(repositoryPath, configuration, mailKey, clock.instant()); } @Override public Result run() { try { - reprocessingService.reprocess(repositoryPath, mailKey, targetProcessor, targetQueue); + reprocessingService.reprocess(repositoryPath, mailKey, configuration); return Result.COMPLETED; } catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) { LOGGER.error("Encountered error while reprocessing repository", e); @@ -129,16 +119,12 @@ public class ReprocessingOneMailTask implements Task { return repositoryPath; } - MailQueueName getTargetQueue() { - return targetQueue; - } - MailKey getMailKey() { return mailKey; } - Optional<String> getTargetProcessor() { - return targetProcessor; + public ReprocessingService.Configuration getConfiguration() { + return configuration; } @Override diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java index f84ae9d..40802d8 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java @@ -36,16 +36,19 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition .convertToDTO(ReprocessingOneMailTaskAdditionalInformationDTO.class) .toDomainObjectConverter(dto -> new ReprocessingOneMailTask.AdditionalInformation( MailRepositoryPath.from(dto.repositoryPath), - MailQueueName.of(dto.targetQueue), + new ReprocessingService.Configuration( + MailQueueName.of(dto.targetQueue), + dto.targetProcessor, + dto.isConsume()), new MailKey(dto.mailKey), - dto.targetProcessor, dto.timestamp)) .toDTOConverter((details, type) -> new ReprocessingOneMailTaskAdditionalInformationDTO( type, details.getRepositoryPath(), - details.getTargetQueue(), + details.getConfiguration().getMailQueueName().asString(), details.getMailKey(), - details.getTargetProcessor(), + Optional.of(details.getConfiguration().isConsume()), + details.getConfiguration().getTargetProcessor(), details.timestamp())) .typeName(ReprocessingOneMailTask.TYPE.asString()) .withFactory(AdditionalInformationDTOModule::new); @@ -56,15 +59,18 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition private final String targetQueue; private final String mailKey; private final Optional<String> targetProcessor; + private final boolean consume; private final Instant timestamp; public ReprocessingOneMailTaskAdditionalInformationDTO(@JsonProperty("type") String type, @JsonProperty("repositoryPath") String repositoryPath, @JsonProperty("targetQueue") String targetQueue, @JsonProperty("mailKey") String mailKey, + @JsonProperty("consume") Optional<Boolean> consume, @JsonProperty("targetProcessor") Optional<String> targetProcessor, @JsonProperty("timestamp") Instant timestamp) { this.type = type; + this.consume = consume.orElse(true); this.repositoryPath = repositoryPath; this.targetQueue = targetQueue; this.mailKey = mailKey; @@ -72,6 +78,10 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition this.timestamp = timestamp; } + public boolean isConsume() { + return consume; + } + @Override public String getType() { return type; diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java index ea09c18..9ced355 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java @@ -47,10 +47,10 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO { return new ReprocessingOneMailTaskDTO( typeName, domainObject.getRepositoryPath().urlEncoded(), - domainObject.getTargetQueue().asString(), + domainObject.getConfiguration().getMailQueueName().asString(), domainObject.getMailKey().asString(), - domainObject.getTargetProcessor() - ); + domainObject.getConfiguration().getTargetProcessor(), + Optional.of(domainObject.getConfiguration().isConsume())); } catch (Exception e) { throw new ReprocessingOneMailTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath()); } @@ -61,28 +61,32 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO { private final String targetQueue; private final String mailKey; private final Optional<String> targetProcessor; + private final boolean consume; public ReprocessingOneMailTaskDTO(@JsonProperty("type") String type, @JsonProperty("repositoryPath") String repositoryPath, @JsonProperty("targetQueue") String targetQueue, @JsonProperty("mailKey") String mailKey, - @JsonProperty("targetProcessor") Optional<String> targetProcessor) { + @JsonProperty("targetProcessor") Optional<String> targetProcessor, + @JsonProperty("boolean") Optional<Boolean> consume) { this.type = type; this.repositoryPath = repositoryPath; this.mailKey = mailKey; this.targetQueue = targetQueue; this.targetProcessor = targetProcessor; + this.consume = consume.orElse(true); } public ReprocessingOneMailTask fromDTO(ReprocessingService reprocessingService, Clock clock) { return new ReprocessingOneMailTask( reprocessingService, getMailRepositoryPath(), - MailQueueName.of(targetQueue), + new ReprocessingService.Configuration( + MailQueueName.of(targetQueue), + targetProcessor, + consume), new MailKey(mailKey), - targetProcessor, - clock - ); + clock); } private MailRepositoryPath getMailRepositoryPath() { @@ -110,6 +114,10 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO { return targetQueue; } + public boolean isConsume() { + return consume; + } + public Optional<String> getTargetProcessor() { return targetProcessor; } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java index 5b7ed6b..9c443d2 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java @@ -53,20 +53,46 @@ public class ReprocessingService { } } + public static class Configuration { + private final MailQueueName mailQueueName; + private final Optional<String> targetProcessor; + private final boolean consume; + + public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, boolean consume) { + this.mailQueueName = mailQueueName; + this.targetProcessor = targetProcessor; + this.consume = consume; + } + + public MailQueueName getMailQueueName() { + return mailQueueName; + } + + public Optional<String> getTargetProcessor() { + return targetProcessor; + } + + public boolean isConsume() { + return consume; + } + } + static class Reprocessor implements Closeable { private final MailQueue mailQueue; - private final Optional<String> targetProcessor; + private final Configuration configuration; - Reprocessor(MailQueue mailQueue, Optional<String> targetProcessor) { + Reprocessor(MailQueue mailQueue, Configuration configuration) { this.mailQueue = mailQueue; - this.targetProcessor = targetProcessor; + this.configuration = configuration; } private void reprocess(MailRepository repository, Mail mail) { try { - targetProcessor.ifPresent(mail::setState); + configuration.getTargetProcessor().ifPresent(mail::setState); mailQueue.enQueue(mail); - repository.remove(mail); + if (configuration.isConsume()) { + repository.remove(mail); + } } catch (Exception e) { throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e); } finally { @@ -94,8 +120,8 @@ public class ReprocessingService { this.mailRepositoryStoreService = mailRepositoryStoreService; } - public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, MailQueueName targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { - try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) { + public void reprocessAll(MailRepositoryPath path, Configuration configuration, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { + try (Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration)) { mailRepositoryStoreService .getRepositories(path) .forEach(Throwing.consumer((MailRepository repository) -> @@ -107,8 +133,8 @@ public class ReprocessingService { } } - public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, MailQueueName targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { - try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) { + public void reprocess(MailRepositoryPath path, MailKey key, Configuration configuration) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { + try (Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration)) { Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService .getRepositories(path) .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key))))) diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java index 95e8860..16bd869 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java @@ -1133,6 +1133,7 @@ class MailRepositoriesRoutesTest { .body("additionalInformation.remainingCount", is(0)) .body("additionalInformation.targetProcessor", is(emptyOrNullString())) .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL.asString())) + .body("additionalInformation.consume", is(true)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) .body("completedDate", is(notNullValue())); @@ -1157,6 +1158,7 @@ class MailRepositoriesRoutesTest { .param("action", "reprocess") .param("queue", CUSTOM_QUEUE.asString()) .param("processor", transport) + .param("consume", false) .patch(PATH_ESCAPED_MY_REPO + "/mails") .jsonPath() .get("taskId"); @@ -1174,6 +1176,7 @@ class MailRepositoriesRoutesTest { .body("additionalInformation.remainingCount", is(0)) .body("additionalInformation.targetProcessor", is(transport)) .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE.asString())) + .body("additionalInformation.consume", is(false)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) .body("completedDate", is(notNullValue())); @@ -1244,6 +1247,39 @@ class MailRepositoriesRoutesTest { } @Test + void reprocessingAllTaskShouldNotClearMailRepositoryWhenNotConsume() throws Exception { + MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO); + String name1 = "name1"; + String name2 = "name2"; + mailRepository.store(FakeMail.builder() + .name(name1) + .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES)) + .build()); + mailRepository.store(FakeMail.builder() + .name(name2) + .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES)) + .build()); + + String transport = "transport"; + String taskId = with() + .param("action", "reprocess") + .param("queue", CUSTOM_QUEUE.asString()) + .param("processor", transport) + .param("consume", false) + .patch(PATH_ESCAPED_MY_REPO + "/mails") + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(mailRepository.list()) + .toIterable() + .hasSize(2); + } + + @Test void reprocessingAllTaskShouldClearBothMailRepositoriesWhenSamePath() throws Exception { MailRepository mailRepository1 = mailRepositoryStore.create(URL_MY_REPO); MailRepository mailRepository2 = mailRepositoryStore.create(URL_MY_REPO_OTHER); @@ -1519,6 +1555,7 @@ class MailRepositoriesRoutesTest { .body("additionalInformation.mailKey", is(NAME_1)) .body("additionalInformation.targetProcessor", is(emptyOrNullString())) .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL.asString())) + .body("additionalInformation.consume", is(true)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) .body("completedDate", is(notNullValue())); @@ -1543,6 +1580,7 @@ class MailRepositoriesRoutesTest { .param("action", "reprocess") .param("queue", CUSTOM_QUEUE.asString()) .param("processor", transport) + .param("consume", false) .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1) .jsonPath() .get("taskId"); @@ -1559,6 +1597,7 @@ class MailRepositoriesRoutesTest { .body("additionalInformation.mailKey", is(NAME_1)) .body("additionalInformation.targetProcessor", is(transport)) .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE.asString())) + .body("additionalInformation.consume", is(false)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) .body("completedDate", is(notNullValue())); @@ -1782,6 +1821,58 @@ class MailRepositoriesRoutesTest { } @Test + void reprocessingOneTaskShouldNotRemoveEmailWhenNotConsume() throws Exception { + MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .param("action", "reprocess") + .param("queue", CUSTOM_QUEUE.asString()) + .param("consume", false) + .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(mailRepository.size()) + .isEqualTo(2); + } + + @Test + void reprocessingOneTaskShouldRemoveEmailWhenConsume() throws Exception { + MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .param("action", "reprocess") + .param("queue", CUSTOM_QUEUE.asString()) + .param("consume", true) + .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(mailRepository.size()) + .isEqualTo(1); + } + + @Test void reprocessingOneTaskShouldNotRemoveMailFromRepositoryWhenUnknownMailKey() throws Exception { MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO); mailRepository.store(FakeMail.builder() diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java index d9eb9ca..64b5fe3 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java @@ -19,6 +19,7 @@ package org.apache.james.webadmin.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -28,6 +29,7 @@ import java.util.Optional; import org.apache.james.JsonSerializationVerifier; import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.queue.api.MailQueueName; +import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer; import org.apache.james.server.task.json.JsonTaskSerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -41,15 +43,20 @@ class ReprocessingAllMailsTaskTest { private static final MailQueueName TARGET_QUEUE = MailQueueName.of("queue"); private static final Optional<String> SOME_TARGET_PROCESSOR = Optional.of("targetProcessor"); private static final long REMAINING_COUNT = 3L; - private static final String SERIALIZED_TASK_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}"; - private static final String SERIALIZED_TASK_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\"}"; - private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}"; - private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\", \"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}"; + private static final boolean CONSUME = true; + + private static final String SERIALIZED_TASK_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"consume\":true}"; + private static final String SERIALIZED_TASK_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"consume\":false}"; + private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\",\"consume\":true}"; + private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\", \"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\",\"consume\":false}"; + + private static final String OLD_SERIALIZED_TASK = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}"; + private static final String OLD_SERIALIZED_TASK_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}"; @Test void taskShouldBeSerializable() throws Exception { - ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, TARGET_QUEUE, SOME_TARGET_PROCESSOR); - ReprocessingAllMailsTask taskWithoutTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, TARGET_QUEUE, Optional.empty()); + ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME)); + ReprocessingAllMailsTask taskWithoutTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME)); JsonSerializationVerifier.dtoModule(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE)) .testCase(taskWithTargetProcessor, SERIALIZED_TASK_WITH_TARGET_PROCESSOR) @@ -58,7 +65,9 @@ class ReprocessingAllMailsTaskTest { } @ParameterizedTest - @ValueSource(strings = {"{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}", "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\"}"}) + @ValueSource(strings = { + "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}", + "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\"}"}) void taskShouldThrowOnDeserializationUrlDecodingError(String serialized) { JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE)); @@ -68,9 +77,11 @@ class ReprocessingAllMailsTaskTest { @Test void additionalInformationShouldBeSerializable() throws Exception { - ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, SOME_TARGET_PROCESSOR, + ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, + new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME), REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP); - ReprocessingAllMailsTask.AdditionalInformation detailsWithoutProcessor = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, Optional.empty(), + ReprocessingAllMailsTask.AdditionalInformation detailsWithoutProcessor = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, + new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME), REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP); JsonSerializationVerifier.dtoModule(ReprocessingAllMailsTaskAdditionalInformationDTO.module()) @@ -78,4 +89,24 @@ class ReprocessingAllMailsTaskTest { .testCase(detailsWithoutProcessor, SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR) .verify(); } + + @Test + void shouldDeserializePreviousTaskFormat() throws Exception { + ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME)); + JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE)); + + assertThat(testee.deserialize(OLD_SERIALIZED_TASK)) + .isEqualToComparingFieldByFieldRecursively(taskWithTargetProcessor); + } + + @Test + void shouldDeserializePreviousAdditionalInformationFormat() throws Exception { + ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, + new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME), + REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP); + JsonTaskAdditionalInformationSerializer testee = JsonTaskAdditionalInformationSerializer.of(ReprocessingAllMailsTaskAdditionalInformationDTO.module()); + + assertThat(testee.deserialize(OLD_SERIALIZED_TASK_ADDITIONAL_INFORMATION)) + .isEqualToComparingFieldByFieldRecursively(details); + } } \ No newline at end of file diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java index 1d89553..cf0746b 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java @@ -19,6 +19,7 @@ package org.apache.james.webadmin.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -32,6 +33,7 @@ import org.apache.james.JsonSerializationVerifier; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.queue.api.MailQueueName; +import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer; import org.apache.james.server.task.json.JsonTaskSerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -41,27 +43,32 @@ class ReprocessingOneMailTaskTest { private static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z"); private static final Clock CLOCK = Clock.fixed(TIMESTAMP, ZoneId.of("UTC")); private static final ReprocessingService REPROCESSING_SERVICE = mock(ReprocessingService.class); - private static final String SERIALIZED_TASK_1 = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}"; - private static final String SERIALIZED_TASK_1_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\"}"; + private static final String SERIALIZED_TASK_1 = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"consume\":true}"; + private static final String SERIALIZED_TASK_1_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\", \"consume\":true}"; + private static final String SERIALIZED_TASK_OLD = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}"; + private static final String SERIALIZED_TASK_OLD_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\"}"; private static final MailRepositoryPath REPOSITORY_PATH = MailRepositoryPath.from("a"); private static final MailQueueName TARGET_QUEUE = MailQueueName.of("queue"); private static final MailKey MAIL_KEY = new MailKey("myMail"); private static final Optional<String> TARGET_PROCESSOR = Optional.of("targetProcessor"); + public static final boolean CONSUME = true; @Test void taskShouldBeSerializable() throws Exception { - ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, TARGET_PROCESSOR, CLOCK); - ReprocessingOneMailTask taskWithoutTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, Optional.empty(), CLOCK); + ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, CONSUME), MAIL_KEY, CLOCK); + ReprocessingOneMailTask taskWithoutTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME), MAIL_KEY, CLOCK); JsonSerializationVerifier.dtoModule(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE)) .testCase(taskWithTargetProcessor, SERIALIZED_TASK_1) .testCase(taskWithoutTargetProcessor, - "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}") + "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\", \"consume\":false}") .verify(); } @ParameterizedTest - @ValueSource(strings = {"{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}", "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}"}) + @ValueSource(strings = { + "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}", + "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}"}) void taskShouldThrowOnDeserializationUrlDecodingError(String serialized) { JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE)); @@ -71,10 +78,31 @@ class ReprocessingOneMailTaskTest { @Test void additionalInformationShouldBeSerializable() throws IOException { - ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, TARGET_PROCESSOR, TIMESTAMP); + ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH, + new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, true), MAIL_KEY, TIMESTAMP); JsonSerializationVerifier.dtoModule(ReprocessingOneMailTaskAdditionalInformationDTO.module()) .bean(details) .json(SERIALIZED_TASK_1_ADDITIONAL_INFORMATION) .verify(); } + + + @Test + void shouldDeserializePreviousTaskFormat() throws Exception { + ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, CONSUME), MAIL_KEY, CLOCK); + JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE)); + + assertThat(testee.deserialize(SERIALIZED_TASK_OLD)) + .isEqualToComparingFieldByFieldRecursively(taskWithTargetProcessor); + } + + @Test + void shouldDeserializePreviousAdditionalInformationFormat() throws Exception { + ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH, + new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, true), MAIL_KEY, TIMESTAMP); + JsonTaskAdditionalInformationSerializer testee = JsonTaskAdditionalInformationSerializer.of(ReprocessingOneMailTaskAdditionalInformationDTO.module()); + + assertThat(testee.deserialize(SERIALIZED_TASK_OLD_ADDITIONAL_INFORMATION)) + .isEqualToComparingFieldByFieldRecursively(details); + } } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java index d2ad1c5..1ad1e9f 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java @@ -43,6 +43,7 @@ import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.apache.james.queue.memory.MemoryMailQueueFactory; import org.apache.james.util.MimeMessageUtil; +import org.apache.james.webadmin.service.ReprocessingService.Configuration; import org.apache.mailet.base.test.FakeMail; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,6 +65,7 @@ class ReprocessingServiceTest { private static final Consumer<MailKey> NOOP_CONSUMER = key -> { }; private static final Optional<String> NO_TARGET_PROCESSOR = Optional.empty(); private static final byte[] MESSAGE_BYTES = "header: value \r\n".getBytes(UTF_8); + public static final boolean CONSUME = true; private ReprocessingService reprocessingService; private MemoryMailRepositoryStore mailRepositoryStore; @@ -106,7 +108,7 @@ class ReprocessingServiceTest { repository.store(mail2); repository.store(mail3); - reprocessingService.reprocess(PATH, KEY_2, NO_TARGET_PROCESSOR, SPOOL); + reprocessingService.reprocess(PATH, KEY_2, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME)); assertThat(queueFactory.getQueue(SPOOL).get().browse()) .toIterable() @@ -121,7 +123,7 @@ class ReprocessingServiceTest { repository.store(mail2); repository.store(mail3); - reprocessingService.reprocess(PATH, KEY_2, NO_TARGET_PROCESSOR, SPOOL); + reprocessingService.reprocess(PATH, KEY_2, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME)); assertThat(repository.list()).toIterable() .containsOnly(KEY_1, KEY_3); @@ -134,7 +136,7 @@ class ReprocessingServiceTest { repository.store(mail2); repository.store(mail3); - reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, NOOP_CONSUMER); + reprocessingService.reprocessAll(PATH, new Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), NOOP_CONSUMER); assertThat(repository.list()).toIterable() .isEmpty(); @@ -147,7 +149,7 @@ class ReprocessingServiceTest { repository.store(mail2); repository.store(mail3); - reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, NOOP_CONSUMER); + reprocessingService.reprocessAll(PATH, new Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), NOOP_CONSUMER); assertThat(queueFactory.getQueue(SPOOL).get().browse()) .toIterable() @@ -178,7 +180,7 @@ class ReprocessingServiceTest { } }); - reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, concurrentRemoveConsumer); + reprocessingService.reprocessAll(PATH, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), concurrentRemoveConsumer); assertThat(queueFactory.getQueue(SPOOL).get().browse()) .toIterable() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
