This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 84454ded73591f899b7166cd036da6e306f260ef Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Wed Feb 19 16:10:23 2020 +0100 JAMES-3070 Don't use cache for RabbitMQ Mailqueue Consumer Make MailQueue Closeable to free resources when using non-cached implementations. For that we needed to take care about MailQueue lifecycle at every usage place. --- .../mailetcontainer/impl/JamesMailSpooler.java | 9 ++- .../mailetcontainer/impl/JamesMailetContext.java | 15 ++++- .../james/transport/mailets/RemoteDelivery.java | 6 ++ .../org/apache/james/fetchmail/FetchScheduler.java | 7 ++- .../SetMessagesOutboxFlagUpdateTest.java | 4 ++ .../apache/james/jmap/draft/send/MailSpool.java | 19 ++++++- .../apache/james/smtpserver/SendMailHandler.java | 11 ++++ .../apache/james/smtpserver/SMTPServerTest.java | 2 +- .../james/webadmin/routes/MailQueueRoutes.java | 45 +++++++++------ .../james/webadmin/service/ClearMailQueueTask.java | 47 ++++++++++++---- .../webadmin/service/ClearMailQueueTaskDTO.java | 15 +++-- .../service/DeleteMailsFromMailQueueTask.java | 64 ++++++++++++++-------- .../service/DeleteMailsFromMailQueueTaskDTO.java | 12 ++-- .../james/webadmin/routes/MailQueueRoutesTest.java | 35 ++++++------ .../webadmin/service/ClearMailQueueTaskTest.java | 10 ++-- .../service/DeleteMailsFromMailQueueTaskTest.java | 21 ++++--- .../webadmin/service/ReprocessingService.java | 60 ++++++++++++-------- ...lQueue.java => ActiveMQCacheableMailQueue.java} | 18 +++--- .../queue/activemq/ActiveMQMailQueueFactory.java | 6 +- .../queue/activemq/ActiveMQMailQueueBlobTest.java | 4 +- .../queue/activemq/ActiveMQMailQueueTest.java | 6 +- .../java/org/apache/james/queue/api/MailQueue.java | 3 +- ...eMailQueue.java => FileCacheableMailQueue.java} | 13 +++-- .../james/queue/file/FileMailQueueFactory.java | 21 +++---- ...java => FileCacheableMailQueueFactoryTest.java} | 2 +- ...ueTest.java => FileCacheableMailQueueTest.java} | 6 +- ...MSMailQueue.java => JMSCacheableMailQueue.java} | 18 ++++-- .../james/queue/jms/JMSMailQueueFactory.java | 6 +- .../apache/james/queue/jms/JMSMailQueueItem.java | 6 +- .../queue/library/AbstractMailQueueFactory.java | 10 ++-- ....java => JMSCacheableMailQueueFactoryTest.java} | 2 +- ...eueTest.java => JMSCacheableMailQueueTest.java} | 8 +-- .../library/AbstractMailQueueFactoryTest.java | 2 +- .../james/queue/memory/MemoryMailQueueFactory.java | 24 ++++---- ...va => MemoryCacheableMailQueueFactoryTest.java} | 2 +- ...Test.java => MemoryCacheableMailQueueTest.java} | 6 +- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 17 +++++- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 5 ++ .../queue/rabbitmq/RabbitMQMailQueueFactory.java | 28 ++-------- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 17 ------ 40 files changed, 370 insertions(+), 242 deletions(-) diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 29d5395..28383cd 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -21,6 +21,7 @@ package org.apache.james.mailetcontainer.impl; import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD; +import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -59,7 +60,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class); public static final String SPOOL_PROCESSING = "spoolProcessing"; - private MailQueue queue; /** * The number of threads used to move mail through the spool. @@ -79,6 +79,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private reactor.core.Disposable disposable; private Scheduler spooler; private int parallelismLevel; + private MailQueue queue; + @Inject public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) { @@ -173,6 +175,11 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB LOGGER.info("start dispose() ..."); disposable.dispose(); spooler.dispose(); + try { + queue.close(); + } catch (IOException e) { + LOGGER.debug("error closing queue", e); + } LOGGER.info("thread shutdown completed."); } diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java index bceea55..e1c3483 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java @@ -19,6 +19,7 @@ package org.apache.james.mailetcontainer.impl; +import java.io.IOException; import java.util.Collection; import java.util.Date; import java.util.HashSet; @@ -30,6 +31,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.mail.Address; import javax.mail.Message; @@ -50,6 +52,7 @@ import org.apache.james.dnsservice.library.MXHostAddressIterator; import org.apache.james.domainlist.api.DomainList; import org.apache.james.domainlist.api.DomainListException; import org.apache.james.lifecycle.api.Configurable; +import org.apache.james.lifecycle.api.Disposable; import org.apache.james.lifecycle.api.LifecycleUtil; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueFactory; @@ -66,7 +69,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -public class JamesMailetContext implements MailetContext, Configurable { +public class JamesMailetContext implements MailetContext, Configurable, Disposable { private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailetContext.class); /** @@ -89,6 +92,16 @@ public class JamesMailetContext implements MailetContext, Configurable { this.mailQueueFactory = mailQueueFactory; } + @PreDestroy + @Override + public void dispose() { + try { + rootMailQueue.close(); + } catch (IOException e) { + LOGGER.debug("error closing queue", e); + } + } + @Override public Collection<String> getMailServers(Domain host) { try { diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java index ab597e9..5cb3889 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java @@ -19,6 +19,7 @@ package org.apache.james.transport.mailets; +import java.io.IOException; import java.net.UnknownHostException; import java.util.Collection; import java.util.Map; @@ -258,6 +259,11 @@ public class RemoteDelivery extends GenericMailet { if (startThreads == ThreadState.START_THREADS) { deliveryRunnable.dispose(); } + try { + queue.close(); + } catch (IOException e) { + LOGGER.debug("error closing queue", e); + } } } diff --git a/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java b/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java index a8175b6..869e5c1 100644 --- a/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java +++ b/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java @@ -19,6 +19,7 @@ package org.apache.james.fetchmail; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -67,6 +68,7 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable { private MailQueueFactory<?> queueFactory; private DomainList domainList; + private MailQueue queue; @Inject public void setMailQueueFactory(MailQueueFactory<?> queueFactory) { @@ -105,7 +107,7 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable { The scheduler service that is used to trigger fetch tasks. */ ScheduledExecutorService scheduler = new JMXEnabledScheduledThreadPoolExecutor(numThreads, jmxPath, "scheduler"); - MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL); + queue = queueFactory.createQueue(MailQueueFactory.SPOOL); List<HierarchicalConfiguration<ImmutableNode>> fetchConfs = conf.configurationsAt("fetch"); for (HierarchicalConfiguration<ImmutableNode> fetchConf : fetchConfs) { @@ -132,9 +134,10 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable { } @PreDestroy - public void dispose() { + public void dispose() throws IOException { if (enabled) { LOGGER.info("FetchMail dispose..."); + queue.close(); for (ScheduledFuture<?> scheduler1 : schedulers) { scheduler1.cancel(false); } diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java index e9141ea..db8ea1e 100644 --- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java +++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java @@ -80,6 +80,10 @@ public abstract class SetMessagesOutboxFlagUpdateTest { public MailQueue createQueue(String name) { return new MailQueue() { @Override + public void close() throws IOException { + } + + @Override public String getName() { return name; } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java index c652bc9..afda197 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java @@ -19,8 +19,12 @@ package org.apache.james.jmap.draft.send; +import java.io.IOException; + +import javax.annotation.PreDestroy; import javax.inject.Inject; +import org.apache.james.lifecycle.api.Disposable; import org.apache.james.lifecycle.api.Startable; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueue.MailQueueException; @@ -28,10 +32,14 @@ import org.apache.james.queue.api.MailQueueFactory; import org.apache.mailet.Attribute; import org.apache.mailet.AttributeValue; import org.apache.mailet.Mail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -public class MailSpool implements Startable { +public class MailSpool implements Startable, Disposable { + + private static final Logger LOGGER = LoggerFactory.getLogger(MailSpool.class); private final MailQueueFactory<?> queueFactory; private MailQueue queue; @@ -45,6 +53,15 @@ public class MailSpool implements Startable { queue = queueFactory.createQueue(MailQueueFactory.SPOOL); } + @PreDestroy + public void dispose() { + try { + queue.close(); + } catch (IOException e) { + LOGGER.debug("error closing queue", e); + } + } + public void send(Mail mail, MailMetadata metadata) throws MailQueueException { mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_MESSAGE_ID_ATTRIBUTE, AttributeValue.of(metadata.getMessageId().serialize()))); mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(metadata.getUsername()))); diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java index 4ef8201..ea82d37 100644 --- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java +++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java @@ -19,6 +19,8 @@ package org.apache.james.smtpserver; +import java.io.IOException; + import javax.inject.Inject; import javax.mail.MessagingException; @@ -52,6 +54,15 @@ public class SendMailHandler implements JamesMessageHook { queue = queueFactory.createQueue(MailQueueFactory.SPOOL); } + @Override + public void destroy() { + try { + queue.close(); + } catch (IOException e) { + LOGGER.debug("error close queue", e); + } + } + /** * Adds header to the message */ diff --git a/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java b/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java index 8b4b19d..e3ae55e 100644 --- a/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java +++ b/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java @@ -199,7 +199,7 @@ public class SMTPServerTest { protected Configuration configuration; protected MockProtocolHandlerLoader chain; protected MemoryMailQueueFactory queueFactory; - protected MemoryMailQueueFactory.MemoryMailQueue queue; + protected MemoryMailQueueFactory.MemoryCacheableMailQueue queue; protected MemoryRecipientRewriteTable rewriteTable; private AliasReverseResolver aliasReverseResolver; protected CanSendFrom canSendFrom; diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java index c2f6064..0338071 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java @@ -21,6 +21,8 @@ package org.apache.james.webadmin.routes; import static org.apache.james.webadmin.Constants.SEPARATOR; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; @@ -32,6 +34,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import org.apache.james.core.MailAddress; +import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueue.MailQueueException; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.ManageableMailQueue; @@ -223,7 +226,7 @@ public class MailQueueRoutes implements Routes { private List<MailQueueItemDTO> listMails(Request request) { String mailQueueName = request.params(MAIL_QUEUE_NAME); return mailQueueFactory.getQueue(mailQueueName) - .map(name -> listMails(name, isDelayed(request.queryParams(DELAYED_QUERY_PARAM)), ParametersExtractor.extractLimit(request))) + .map(queue -> listMails(queue, isDelayed(request.queryParams(DELAYED_QUERY_PARAM)), ParametersExtractor.extractLimit(request))) .orElseThrow( () -> ErrorResponder.builder() .message("%s can not be found", mailQueueName) @@ -238,7 +241,7 @@ public class MailQueueRoutes implements Routes { } private List<MailQueueItemDTO> listMails(ManageableMailQueue queue, Optional<Boolean> isDelayed, Limit limit) { - try { + try (MailQueue closeable = queue) { return limit.applyOnStream(Iterators.toStream(queue.browse())) .map(Throwing.function(MailQueueItemDTO::from).sneakyThrow()) .filter(item -> filter(item, isDelayed)) @@ -250,6 +253,8 @@ public class MailQueueRoutes implements Routes { .message("Invalid request for listing the mails from the mail queue %s", queue) .cause(e) .haltError(); + } catch (IOException e) { + throw new UncheckedIOException(e); } } @@ -304,17 +309,10 @@ public class MailQueueRoutes implements Routes { private Task deleteMails(Request request) { String mailQueueName = request.params(MAIL_QUEUE_NAME); - return mailQueueFactory.getQueue(mailQueueName) - .map(name -> deleteMailsTask(name, + return deleteMailsTask(mailQueueName, sender(request.queryParams(SENDER_QUERY_PARAM)), name(request.queryParams(NAME_QUERY_PARAM)), - recipient(request.queryParams(RECIPIENT_QUERY_PARAM)))) - .orElseThrow( - () -> ErrorResponder.builder() - .message("%s can not be found", mailQueueName) - .statusCode(HttpStatus.NOT_FOUND_404) - .type(ErrorResponder.ErrorType.NOT_FOUND) - .haltError()); + recipient(request.queryParams(RECIPIENT_QUERY_PARAM))); } private Optional<MailAddress> sender(String senderAsString) throws HaltException { @@ -383,10 +381,11 @@ public class MailQueueRoutes implements Routes { private String forceDelayedMailsDelivery(Request request, Response response) throws JsonExtractException, MailQueueException { assertDelayedParamIsTrue(request); assertPayloadContainsDelayedEntry(request); - ManageableMailQueue mailQueue = assertMailQueueExists(request); - - mailQueue.flush(); - + try (ManageableMailQueue mailQueue = assertMailQueueExists(request)) { + mailQueue.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } return Responses.returnNoContent(response); } @@ -421,13 +420,13 @@ public class MailQueueRoutes implements Routes { } } - private Task deleteMailsTask(ManageableMailQueue queue, Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) { + private Task deleteMailsTask(String queueName, Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) { int paramCount = Booleans.countTrue(maybeSender.isPresent(), maybeName.isPresent(), maybeRecipient.isPresent()); switch (paramCount) { case 0: - return new ClearMailQueueTask(queue); + return new ClearMailQueueTask(queueName, this::getQueueOrThrow); case 1: - return new DeleteMailsFromMailQueueTask(queue, maybeSender, maybeName, maybeRecipient); + return new DeleteMailsFromMailQueueTask(queueName, this::getQueueOrThrow, maybeSender, maybeName, maybeRecipient); default: throw ErrorResponder.builder() .statusCode(HttpStatus.BAD_REQUEST_400) @@ -438,6 +437,16 @@ public class MailQueueRoutes implements Routes { } } + private ManageableMailQueue getQueueOrThrow(String queueName) { + return mailQueueFactory.getQueue(queueName) + .orElseThrow( + () -> ErrorResponder.builder() + .message("%s can not be found", queueName) + .statusCode(HttpStatus.NOT_FOUND_404) + .type(ErrorResponder.ErrorType.NOT_FOUND) + .haltError()); + } + private void assertDelayedParamIsTrue(Request request) { if (!isDelayed(request.queryParams(DELAYED_QUERY_PARAM)).orElse(false)) { throw ErrorResponder.builder() diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java index ff4d777..ab9e25b 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java @@ -19,6 +19,7 @@ package org.apache.james.webadmin.service; +import java.io.IOException; import java.time.Clock; import java.time.Instant; import java.util.Optional; @@ -31,9 +32,12 @@ import org.apache.james.task.TaskType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + public class ClearMailQueueTask implements Task { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final String mailQueueName; private final long initialCount; private final long remainingCount; @@ -62,30 +66,44 @@ public class ClearMailQueueTask implements Task { public Instant timestamp() { return timestamp; } + } public static class UnknownSerializedQueue extends RuntimeException { + public UnknownSerializedQueue(String queueName) { super("Unable to retrieve '" + queueName + "' queue"); } } + @FunctionalInterface + public interface MailQueueFactory { + ManageableMailQueue create(String mailQueueName); + } + private static final Logger LOGGER = LoggerFactory.getLogger(ClearMailQueueTask.class); public static final TaskType TYPE = TaskType.of("clear-mail-queue"); - private final ManageableMailQueue queue; - private final long initialCount; + private final String queueName; + private final ClearMailQueueTask.MailQueueFactory factory; + private Optional<Long> initialCount; + private Optional<ManageableMailQueue> queue; - public ClearMailQueueTask(ManageableMailQueue queue) { - this.queue = queue; - initialCount = getRemainingSize(); + public ClearMailQueueTask(String queueName, ClearMailQueueTask.MailQueueFactory factory) { + this.queueName = queueName; + this.factory = factory; + this.initialCount = Optional.empty(); + this.queue = Optional.empty(); } @Override public Result run() { - try { + try (ManageableMailQueue queue = factory.create(queueName)) { + this.initialCount = Optional.of(getRemainingSize(queue)); + this.queue = Optional.of(queue); queue.clear(); - } catch (MailQueue.MailQueueException e) { + this.queue = Optional.empty(); + } catch (MailQueue.MailQueueException | IOException e) { LOGGER.error("Clear MailQueue got an exception", e); return Result.PARTIAL; } @@ -100,16 +118,21 @@ public class ClearMailQueueTask implements Task { @Override public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(new AdditionalInformation(queue.getName(), initialCount, getRemainingSize(), Clock.systemUTC().instant())); + return queue.map(q -> new AdditionalInformation(queueName, initialCount.get(), getRemainingSize(q), Clock.systemUTC().instant())); + } + + String getQueueName() { + return queueName; } - ManageableMailQueue getQueue() { - return queue; + @VisibleForTesting + Optional<Long> initialCount() { + return initialCount; } - private long getRemainingSize() { + private long getRemainingSize(ManageableMailQueue mailQueue) { try { - return queue.getSize(); + return mailQueue.getSize(); } catch (MailQueue.MailQueueException e) { throw new RuntimeException(e); } diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java index 8ba59f4..f4af298 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java @@ -39,19 +39,22 @@ public class ClearMailQueueTaskDTO implements TaskDTO { } public static ClearMailQueueTaskDTO toDTO(ClearMailQueueTask domainObject, String typeName) { - return new ClearMailQueueTaskDTO(typeName, domainObject.getQueue().getName()); + return new ClearMailQueueTaskDTO(typeName, domainObject.getQueueName()); } private final String type; - private final String queue; + private final String queueName; - public ClearMailQueueTaskDTO(@JsonProperty("type") String type, @JsonProperty("queue") String queue) { + public ClearMailQueueTaskDTO(@JsonProperty("type") String type, @JsonProperty("queue") String queueName) { this.type = type; - this.queue = queue; + this.queueName = queueName; } public ClearMailQueueTask fromDTO(MailQueueFactory<? extends ManageableMailQueue> mailQueueFactory) { - return new ClearMailQueueTask(mailQueueFactory.getQueue(queue).orElseThrow(() -> new ClearMailQueueTask.UnknownSerializedQueue(queue))); + return new ClearMailQueueTask(queueName, + name -> mailQueueFactory + .getQueue(name) + .orElseThrow(() -> new ClearMailQueueTask.UnknownSerializedQueue(queueName))); } @Override @@ -60,6 +63,6 @@ public class ClearMailQueueTaskDTO implements TaskDTO { } public String getQueue() { - return queue; + return queueName; } } diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java index e55b7d6..0a1a184 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java @@ -19,6 +19,8 @@ package org.apache.james.webadmin.service; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Clock; import java.time.Instant; import java.util.Optional; @@ -95,38 +97,60 @@ public class DeleteMailsFromMailQueueTask implements Task { } } + @FunctionalInterface + public interface MailQueueFactory { + ManageableMailQueue create(String mailQueueName); + } + public static final TaskType TYPE = TaskType.of("delete-mails-from-mail-queue"); - private final ManageableMailQueue queue; private final Optional<MailAddress> maybeSender; private final Optional<String> maybeName; private final Optional<MailAddress> maybeRecipient; + private final MailQueueFactory factory; + private final String queueName; + private Optional<Long> initialCount; + private Optional<ManageableMailQueue> queue; - private final long initialCount; - - public DeleteMailsFromMailQueueTask(ManageableMailQueue queue, Optional<MailAddress> maybeSender, + public DeleteMailsFromMailQueueTask(String queueName, MailQueueFactory factory, + Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) { + this.factory = factory; + this.queueName = queueName; Preconditions.checkArgument( Booleans.countTrue(maybeSender.isPresent(), maybeName.isPresent(), maybeRecipient.isPresent()) == 1, "You should provide one and only one of the query parameters 'sender', 'name' or 'recipient'."); - this.queue = queue; this.maybeSender = maybeSender; this.maybeName = maybeName; this.maybeRecipient = maybeRecipient; - this.initialCount = getRemainingSize(); + this.initialCount = Optional.empty(); + this.queue = Optional.empty(); + } @Override public Result run() { - maybeSender.ifPresent(Throwing.consumer( - (MailAddress sender) -> queue.remove(ManageableMailQueue.Type.Sender, sender.asString()))); - maybeName.ifPresent(Throwing.consumer( - (String name) -> queue.remove(ManageableMailQueue.Type.Name, name))); - maybeRecipient.ifPresent(Throwing.consumer( - (MailAddress recipient) -> queue.remove(ManageableMailQueue.Type.Recipient, recipient.asString()))); - - return Result.COMPLETED; + try (ManageableMailQueue queue = factory.create(queueName)) { + this.initialCount = Optional.of(getRemainingSize(queue)); + this.queue = Optional.of(queue); + maybeSender.ifPresent(Throwing.consumer( + (MailAddress sender) -> queue.remove(ManageableMailQueue.Type.Sender, sender.asString()))); + maybeName.ifPresent(Throwing.consumer( + (String name) -> queue.remove(ManageableMailQueue.Type.Name, name))); + maybeRecipient.ifPresent(Throwing.consumer( + (MailAddress recipient) -> queue.remove(ManageableMailQueue.Type.Recipient, recipient.asString()))); + + this.queue = Optional.empty(); + + return Result.COMPLETED; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public String getQueueName() { + return queueName; } @Override @@ -134,10 +158,6 @@ public class DeleteMailsFromMailQueueTask implements Task { return TYPE; } - ManageableMailQueue getQueue() { - return queue; - } - Optional<String> getMaybeName() { return maybeName; } @@ -152,13 +172,13 @@ public class DeleteMailsFromMailQueueTask implements Task { @Override public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(new AdditionalInformation(queue.getName(), - initialCount, - getRemainingSize(), maybeSender, + return this.queue.map(queue -> new AdditionalInformation(queueName, + initialCount.get(), + getRemainingSize(queue), maybeSender, maybeName, maybeRecipient, Clock.systemUTC().instant())); } - public long getRemainingSize() { + public long getRemainingSize(ManageableMailQueue queue) { try { return queue.getSize(); } catch (MailQueue.MailQueueException e) { diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java index e5dfa9e..12f3485 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java @@ -27,7 +27,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO { public static DeleteMailsFromMailQueueTaskDTO toDTO(DeleteMailsFromMailQueueTask domainObject, String typeName) { return new DeleteMailsFromMailQueueTaskDTO( typeName, - domainObject.getQueue().getName(), + domainObject.getQueueName(), domainObject.getMaybeSender().map(MailAddress::asString), domainObject.getMaybeName(), domainObject.getMaybeRecipient().map(MailAddress::asString) @@ -35,18 +35,18 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO { } private final String type; - private final String queue; + private final String queueName; private final Optional<String> sender; private final Optional<String> name; private final Optional<String> recipient; public DeleteMailsFromMailQueueTaskDTO(@JsonProperty("type") String type, - @JsonProperty("queue") String queue, + @JsonProperty("queue") String queueName, @JsonProperty("sender") Optional<String> sender, @JsonProperty("name") Optional<String> name, @JsonProperty("recipient") Optional<String> recipient) { this.type = type; - this.queue = queue; + this.queueName = queueName; this.sender = sender; this.name = name; this.recipient = recipient; @@ -54,7 +54,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO { public DeleteMailsFromMailQueueTask fromDTO(MailQueueFactory<? extends ManageableMailQueue> mailQueueFactory) { return new DeleteMailsFromMailQueueTask( - mailQueueFactory.getQueue(queue).orElseThrow(() -> new DeleteMailsFromMailQueueTask.UnknownSerializedQueue(queue)), + queueName, name -> mailQueueFactory.getQueue(name).orElseThrow(() -> new DeleteMailsFromMailQueueTask.UnknownSerializedQueue(queueName)), sender.map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()), name, recipient.map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()) @@ -67,7 +67,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO { } public String getQueue() { - return queue; + return queueName; } public Optional<String> getSender() { diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java index 2cd4f49..f40f1ac 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java @@ -42,7 +42,6 @@ import org.apache.james.queue.api.Mails; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.apache.james.queue.memory.MemoryMailQueueFactory; -import org.apache.james.queue.memory.MemoryMailQueueFactory.MemoryMailQueue; import org.apache.james.task.Hostname; import org.apache.james.task.MemoryTaskManager; import org.apache.james.task.TaskManager; @@ -229,7 +228,7 @@ class MailQueueRoutesTest { @Test public void listMailsShouldReturnMailsWhenSome() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(Mails.defaultMail().name("name").build()); queue.enQueue(Mails.defaultMail().name("name").build()); @@ -243,7 +242,7 @@ class MailQueueRoutesTest { @Test public void listMailsShouldReturnMailDetailsWhenSome() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); FakeMail mail = Mails.defaultMail().name("name").build(); queue.enQueue(mail); @@ -265,7 +264,7 @@ class MailQueueRoutesTest { @Test public void listMailsShouldReturnEmptyWhenNoDelayedMailsAndAskFor() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); FakeMail mail = Mails.defaultMail().name("name").build(); queue.enQueue(mail); @@ -281,7 +280,7 @@ class MailQueueRoutesTest { @Test public void listMailsShouldReturnCurrentMailsWhenMailsAndAskForNotDelayed() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); FakeMail mail = Mails.defaultMail().name("name").build(); queue.enQueue(mail); @@ -297,7 +296,7 @@ class MailQueueRoutesTest { @Test public void listMailsShouldReturnDelayedMailsWhenAskFor() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); FakeMail mail = Mails.defaultMail().name("name").build(); queue.enQueue(mail, 10, TimeUnit.MINUTES); @@ -313,7 +312,7 @@ class MailQueueRoutesTest { @Test public void listMailsShouldReturnOneMailWhenMailsAndAskForALimitOfOne() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); FakeMail mail = Mails.defaultMail().name("name").build(); queue.enQueue(mail); queue.enQueue(mail); @@ -336,7 +335,7 @@ class MailQueueRoutesTest { @Test public void getMailQueueShouldReturnTheMailQueueDataWhenMailQueueExists() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(Mails.defaultMail().name("name").build()); when() @@ -509,7 +508,7 @@ class MailQueueRoutesTest { class SideEffects { @Test public void forcingDelayedMailsDeliveryShouldActuallyChangePropertyOnMails() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); FakeMail mail = Mails.defaultMail().name("name").build(); queue.enQueue(mail, 10L, TimeUnit.MINUTES); queue.enQueue(mail, 10L, TimeUnit.MINUTES); @@ -656,7 +655,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsTasksShouldHaveDetailsWhenSenderIsGiven() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -693,7 +692,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsTasksShouldHaveDetailsWhenNameIsGiven() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -728,7 +727,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsTasksShouldHaveDetailsWhenRecipientIsGiven() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -774,7 +773,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsShouldDeleteMailsWhenSenderIsGiven() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -808,7 +807,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsShouldDeleteMailsWhenNameIsGiven() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -840,7 +839,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsShouldDeleteMailsWhenRecipientIsGiven() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -880,7 +879,7 @@ class MailQueueRoutesTest { @Test public void deleteMailsShouldDeleteMailsWhenTheyAreMatching() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); String recipient = "recipi...@james.org"; queue.enQueue(Mails.defaultMail() .name("name") @@ -945,7 +944,7 @@ class MailQueueRoutesTest { @Test public void clearMailQueueShouldHaveDetailsWhenNoQueryParameters() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) @@ -982,7 +981,7 @@ class MailQueueRoutesTest { @Test void clearMailQueueShouldDeleteAllMailsInQueueWhenNoQueryParameters() throws Exception { - MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); + MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE); queue.enQueue(FakeMail.builder() .name(FAKE_MAIL_NAME_1) diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java index 6d04e01..94b4432 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java @@ -17,6 +17,7 @@ package org.apache.james.webadmin.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -46,23 +47,24 @@ class ClearMailQueueTaskTest { when(mockedQueue.getName()).thenReturn(QUEUE_NAME); when(mailQueueFactory.getQueue(anyString())).thenAnswer(arg -> Optional.of(mockedQueue)); - ManageableMailQueue queue = mailQueueFactory.getQueue(QUEUE_NAME).get(); - ClearMailQueueTask task = new ClearMailQueueTask(queue); + ClearMailQueueTask.MailQueueFactory factory = queueName -> mailQueueFactory.getQueue(queueName).orElseThrow(RuntimeException::new); + ClearMailQueueTask task = new ClearMailQueueTask(QUEUE_NAME, factory); JsonSerializationVerifier.dtoModule(ClearMailQueueTaskDTO.module(mailQueueFactory)) .bean(task) .json(SERIALIZED) + .equalityTester((a, b) -> assertThat(a.getQueueName()).isEqualTo(b.getQueueName())) .verify(); } @Test - void taskShouldThrowWhenDeserializeAnUnknownQueue() { + void taskShouldThrowWhenRunOnAnUnknownQueue() { MailQueueFactory<ManageableMailQueue> mailQueueFactory = mock(MailQueueFactory.class); when(mailQueueFactory.getQueue(anyString())).thenReturn(Optional.empty()); JsonTaskSerializer testee = JsonTaskSerializer.of(ClearMailQueueTaskDTO.module(mailQueueFactory)); String serializedJson = "{\"type\": \"clear-mail-queue\", \"queue\": \"anyQueue\"}"; - assertThatThrownBy(() -> testee.deserialize(serializedJson)) + assertThatThrownBy(() -> testee.deserialize(serializedJson).run()) .isInstanceOf(ClearMailQueueTask.UnknownSerializedQueue.class); } diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java index c6797c2..6e44829 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java @@ -17,6 +17,7 @@ package org.apache.james.webadmin.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -53,26 +54,32 @@ class DeleteMailsFromMailQueueTaskTest { @Test void taskShouldBeSerializable() throws Exception { - ManageableMailQueue queue = mailQueueFactory.getQueue(queueName).get(); - DeleteMailsFromMailQueueTask taskSender = new DeleteMailsFromMailQueueTask(queue, Optional.of(new MailAddress("a@b.c")), Optional.empty(), Optional.empty()); - DeleteMailsFromMailQueueTask taskName = new DeleteMailsFromMailQueueTask(queue, Optional.empty(), Optional.of("name"), Optional.empty()); - DeleteMailsFromMailQueueTask taskRecipient = new DeleteMailsFromMailQueueTask(queue, Optional.empty(), Optional.empty(), Optional.of(new MailAddress("d@e.f"))); + DeleteMailsFromMailQueueTask.MailQueueFactory factory = name -> this.mailQueueFactory.getQueue(name).orElseThrow(RuntimeException::new); + DeleteMailsFromMailQueueTask taskSender = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.of(new MailAddress("a@b.c")), Optional.empty(), Optional.empty()); + DeleteMailsFromMailQueueTask taskName = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.empty(), Optional.of("name"), Optional.empty()); + DeleteMailsFromMailQueueTask taskRecipient = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.empty(), Optional.empty(), Optional.of(new MailAddress("d@e.f"))); - JsonSerializationVerifier.dtoModule(DeleteMailsFromMailQueueTaskDTO.module(mailQueueFactory)) + JsonSerializationVerifier.dtoModule(DeleteMailsFromMailQueueTaskDTO.module(this.mailQueueFactory)) .testCase(taskSender, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"sender\": \"a@b.c\"}") .testCase(taskName, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"name\": \"name\"}") .testCase(taskRecipient, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"recipient\": \"d@e.f\"}") + .equalityTester((a, b) -> { + assertThat(a.getQueueName()).isEqualTo(b.getQueueName()); + assertThat(a.getMaybeName()).isEqualTo(b.getMaybeName()); + assertThat(a.getMaybeSender()).isEqualTo(b.getMaybeSender()); + assertThat(a.getMaybeRecipient()).isEqualTo(b.getMaybeRecipient()); + }) .verify(); } @Test - void taskShouldThrowWhenDeserializeAnUnknownQueue() { + void taskShouldThrowWhenRunOnAnUnknownQueue() { MailQueueFactory<ManageableMailQueue> mailQueueFactory = mock(MailQueueFactory.class); when(mailQueueFactory.getQueue(anyString())).thenReturn(Optional.empty()); JsonTaskSerializer testee = JsonTaskSerializer.of(DeleteMailsFromMailQueueTaskDTO.module(mailQueueFactory)); String serializedJson = "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"sender\": \"a@b.c\"}"; - assertThatThrownBy(() -> testee.deserialize(serializedJson)) + assertThatThrownBy(() -> testee.deserialize(serializedJson).run()) .isInstanceOf(DeleteMailsFromMailQueueTask.UnknownSerializedQueue.class); } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java index 6ccb045..6171b00 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java @@ -19,6 +19,8 @@ package org.apache.james.webadmin.service; +import java.io.Closeable; +import java.io.IOException; import java.util.Optional; import java.util.function.Consumer; @@ -35,17 +37,22 @@ import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.util.OptionalUtils; import org.apache.james.util.streams.Iterators; import org.apache.mailet.Mail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; public class ReprocessingService { + + private static final Logger LOGGER = LoggerFactory.getLogger(ReprocessingService.class); + public static class MissingKeyException extends RuntimeException { MissingKeyException(MailKey key) { super(key.asString() + " can not be found"); } } - static class Reprocessor { + static class Reprocessor implements Closeable { private final MailQueue mailQueue; private final Optional<String> targetProcessor; @@ -63,6 +70,15 @@ public class ReprocessingService { throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e); } } + + @Override + public void close() { + try { + mailQueue.close(); + } catch (IOException e) { + LOGGER.debug("error closing queue", e); + } + } } private final MailQueueFactory<?> mailQueueFactory; @@ -76,30 +92,30 @@ public class ReprocessingService { } public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, String targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { - Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor); - - mailRepositoryStoreService - .getRepositories(path) - .forEach(Throwing.consumer((MailRepository repository) -> - Iterators.toStream(repository.list()) - .peek(keyListener) - .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key)))) - .flatMap(OptionalUtils::toStream) - .forEach(mail -> reprocessor.reprocess(repository, mail)))); + try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) { + mailRepositoryStoreService + .getRepositories(path) + .forEach(Throwing.consumer((MailRepository repository) -> + Iterators.toStream(repository.list()) + .peek(keyListener) + .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key)))) + .flatMap(OptionalUtils::toStream) + .forEach(mail -> reprocessor.reprocess(repository, mail)))); + } } public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, String targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { - Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor); - - Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService - .getRepositories(path) - .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key))))) - .filter(pair -> pair.getRight().isPresent()) - .map(pair -> Pair.of(pair.getLeft(), pair.getRight().get())) - .findFirst() - .orElseThrow(() -> new MissingKeyException(key)); - - reprocessor.reprocess(mailPair.getKey(), mailPair.getValue()); + try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) { + Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService + .getRepositories(path) + .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key))))) + .filter(pair -> pair.getRight().isPresent()) + .map(pair -> Pair.of(pair.getLeft(), pair.getRight().get())) + .findFirst() + .orElseThrow(() -> new MissingKeyException(key)); + + reprocessor.reprocess(mailPair.getKey(), mailPair.getValue()); + } } private MailQueue getMailQueue(String targetQueue) { diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java similarity index 91% rename from server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java rename to server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java index c6e094b..145acd5 100644 --- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java +++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java @@ -44,7 +44,7 @@ import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueItemDecoratorFactory; -import org.apache.james.queue.jms.JMSMailQueue; +import org.apache.james.queue.jms.JMSCacheableMailQueue; import org.apache.james.server.core.MailImpl; import org.apache.james.server.core.MimeMessageCopyOnWriteProxy; import org.apache.james.server.core.MimeMessageInputStream; @@ -89,17 +89,17 @@ import org.slf4j.LoggerFactory; * </p> * To have a good throughput you should use a caching connection factory. </p> */ -public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { - private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMailQueue.class); +public class ActiveMQCacheableMailQueue extends JMSCacheableMailQueue implements ActiveMQSupport { + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQCacheableMailQueue.class); private final boolean useBlob; /** - * Construct a {@link ActiveMQMailQueue} which only use {@link BlobMessage} + * Construct a {@link ActiveMQCacheableMailQueue} which only use {@link BlobMessage} * */ - public ActiveMQMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, MetricFactory metricFactory, - GaugeRegistry gaugeRegistry) { + public ActiveMQCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, MetricFactory metricFactory, + GaugeRegistry gaugeRegistry) { this(connectionFactory, mailQueueItemDecoratorFactory, queuename, true, metricFactory, gaugeRegistry); } @@ -110,8 +110,8 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { * @param queuename * @param useBlob */ - public ActiveMQMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, boolean useBlob, MetricFactory metricFactory, - GaugeRegistry gaugeRegistry) { + public ActiveMQCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, boolean useBlob, MetricFactory metricFactory, + GaugeRegistry gaugeRegistry) { super(connectionFactory, mailQueueItemDecoratorFactory, queuename, metricFactory, gaugeRegistry); this.useBlob = useBlob; } @@ -268,7 +268,7 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport { /** * Try to use ActiveMQ StatisticsPlugin to get size and if that fails - * fallback to {@link JMSMailQueue#getSize()} + * fallback to {@link JMSCacheableMailQueue#getSize()} */ @Override public long getSize() throws MailQueueException { diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java index 5008508..5b61ca5 100644 --- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java +++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java @@ -30,7 +30,7 @@ import org.apache.james.queue.jms.JMSMailQueueFactory; /** * {@link MailQueueFactory} implementations which return - * {@link ActiveMQMailQueue} instances + * {@link ActiveMQCacheableMailQueue} instances */ public class ActiveMQMailQueueFactory extends JMSMailQueueFactory { @@ -51,7 +51,7 @@ public class ActiveMQMailQueueFactory extends JMSMailQueueFactory { } @Override - protected ManageableMailQueue createMailQueue(String name) { - return new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry); + protected ManageableMailQueue createCacheableMailQueue(String name) { + return new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry); } } diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java index b2c3cea..da22eb7 100644 --- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java +++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java @@ -59,7 +59,7 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont static final String BASE_DIR = "file://target/james-test"; static final boolean USE_BLOB = true; - ActiveMQMailQueue mailQueue; + ActiveMQCacheableMailQueue mailQueue; MyFileSystem fileSystem; @BeforeEach @@ -78,7 +78,7 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont MetricFactory metricFactory = metricTestSystem.getMetricFactory(); GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry(); String queueName = BrokerExtension.generateRandomQueueName(broker); - mailQueue = new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, USE_BLOB, metricFactory, gaugeRegistry); + mailQueue = new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, USE_BLOB, metricFactory, gaugeRegistry); } @AfterEach diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java index 609cef1..e7a08dd 100644 --- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java +++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java @@ -18,8 +18,6 @@ ****************************************************************/ package org.apache.james.queue.activemq; -import javax.jms.ConnectionFactory; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; @@ -48,7 +46,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract static final boolean USE_BLOB = true; - ActiveMQMailQueue mailQueue; + ActiveMQCacheableMailQueue mailQueue; @BeforeEach public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) { @@ -60,7 +58,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract MetricFactory metricFactory = metricTestSystem.getMetricFactory(); GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry(); String queueName = BrokerExtension.generateRandomQueueName(broker); - mailQueue = new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, !USE_BLOB, metricFactory, gaugeRegistry); + mailQueue = new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, !USE_BLOB, metricFactory, gaugeRegistry); } @AfterEach diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java index 559d998..d4e9c3a 100644 --- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java +++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java @@ -19,6 +19,7 @@ package org.apache.james.queue.api; +import java.io.Closeable; import java.time.Duration; import java.util.concurrent.TimeUnit; @@ -58,7 +59,7 @@ import org.threeten.extra.Temporals; * </ul> * </p> */ -public interface MailQueue { +public interface MailQueue extends Closeable { String ENQUEUED_METRIC_NAME_PREFIX = "enqueuedMail:"; String DEQUEUED_METRIC_NAME_PREFIX = "dequeuedMail:"; diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java similarity index 97% rename from server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java rename to server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java index 4f288b3..6142be8 100644 --- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java +++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java @@ -71,15 +71,15 @@ import reactor.core.publisher.Mono; /** * {@link ManageableMailQueue} implementation which use the fs to store {@link Mail}'s * <p/> - * On create of the {@link FileMailQueue} the {@link #init()} will get called. This takes care of + * On create of the {@link FileCacheableMailQueue} the {@link #init()} will get called. This takes care of * loading the needed meta-data into memory for fast access. * * @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe * We recommend using embedded ActiveMQMailQueue implementation instead */ @Deprecated -public class FileMailQueue implements ManageableMailQueue { - private static final Logger LOGGER = LoggerFactory.getLogger(FileMailQueue.class); +public class FileCacheableMailQueue implements ManageableMailQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(FileCacheableMailQueue.class); private final Map<String, FileItem> keyMappings = Collections.synchronizedMap(new LinkedHashMap<>()); private final BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<>(); @@ -98,7 +98,7 @@ public class FileMailQueue implements ManageableMailQueue { private final String queueName; private final Flux<MailQueueItem> flux; - public FileMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException { + public FileCacheableMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException { this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory; this.sync = sync; this.queueName = queuename; @@ -111,6 +111,11 @@ public class FileMailQueue implements ManageableMailQueue { } @Override + public void close() { + //There's no resource to free + } + + @Override public String getName() { return queueName; } diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java index 87d88a8..623e1e0 100644 --- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java +++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.inject.Inject; @@ -34,7 +35,7 @@ import org.apache.james.queue.api.ManageableMailQueue; import com.google.common.collect.ImmutableSet; /** - * {@link MailQueueFactory} implementation which returns {@link FileMailQueue} instances + * {@link MailQueueFactory} implementation which returns {@link FileCacheableMailQueue} instances * * @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe * We recommend using embedded ActiveMQMailQueue implementation instead @@ -42,7 +43,7 @@ import com.google.common.collect.ImmutableSet; @Deprecated public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueue> { - private final Map<String, ManageableMailQueue> queues = new HashMap<>(); + private final Map<String, ManageableMailQueue> queues = new ConcurrentHashMap<>(); private MailQueueItemDecoratorFactory mailQueueActionItemDecoratorFactory; private FileSystem fs; private boolean sync = true; @@ -59,7 +60,7 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu } /** - * If <code>true</code> the later created {@link FileMailQueue} will call <code>fsync</code> after each message {@link FileMailQueue#enQueue(org.apache.mailet.Mail)} call. This + * If <code>true</code> the later created {@link FileCacheableMailQueue} will call <code>fsync</code> after each message {@link FileCacheableMailQueue#enQueue(org.apache.mailet.Mail)} call. This * is needed to be fully RFC conform but gives a performance penalty. If you are brave enough you man set it to <code>false</code> * <p/> * The default is <code>true</code> @@ -77,19 +78,13 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu @Override public ManageableMailQueue createQueue(String name) { - return getQueue(name).orElseGet(() -> createAndRegisterQueue(name)); - } - - private ManageableMailQueue createAndRegisterQueue(String name) { - synchronized (queues) { + return queues.computeIfAbsent(name, mailQueueName -> { try { - FileMailQueue queue = new FileMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), name, sync); - queues.put(name, queue); - return queue; + return new FileCacheableMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), mailQueueName, sync); } catch (IOException e) { - throw new RuntimeException("Unable to access queue " + name, e); + throw new RuntimeException("Unable to access queue " + mailQueueName, e); } - } + }); } } diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java similarity index 94% rename from server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java rename to server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java index 926092f..923a8db 100644 --- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java +++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java @@ -35,7 +35,7 @@ import org.junit.jupiter.api.Disabled; " - JAMES-2954 Incomplete browse implementation" + " - JAMES-2544 Mixing concurrent operation might lead to a deadlock and missing fields" + " - JAMES-2979 dequeue is not thread safe") -public class FileMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { +public class FileCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { private FileMailQueueFactory mailQueueFactory; private MockFileSystem fileSystem; diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java similarity index 90% rename from server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java rename to server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java index f2659d6..2b479e9 100644 --- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java +++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java @@ -34,16 +34,16 @@ import org.junit.rules.TemporaryFolder; " - JAMES-2954 Incomplete browse implementation" + " - JAMES-2544 Mixing concurrent operation might lead to a deadlock and missing fields" + " - JAMES-2979 dequeue is not thread safe") -public class FileMailQueueTest implements DelayedManageableMailQueueContract { +public class FileCacheableMailQueueTest implements DelayedManageableMailQueueContract { private static final boolean SYNC = true; private TemporaryFolder temporaryFolder = new TemporaryFolder(); - private FileMailQueue mailQueue; + private FileCacheableMailQueue mailQueue; @BeforeEach public void setUp() throws Exception { temporaryFolder.create(); - mailQueue = new FileMailQueue(new RawMailQueueItemDecoratorFactory(), temporaryFolder.newFolder(), "test", SYNC); + mailQueue = new FileCacheableMailQueue(new RawMailQueueItemDecoratorFactory(), temporaryFolder.newFolder(), "test", SYNC); } @AfterEach diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java similarity index 97% rename from server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java rename to server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java index 2511f49..5171cae 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java @@ -99,7 +99,7 @@ import reactor.core.publisher.Mono; * {@link Mail} objects. * </p> */ -public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable { +public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable { private final Flux<MailQueueItem> flux; @@ -153,7 +153,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori } } - private static final Logger LOGGER = LoggerFactory.getLogger(JMSMailQueue.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JMSCacheableMailQueue.class); public static final String FORCE_DELIVERY = "FORCE_DELIVERY"; @@ -172,9 +172,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori private final Joiner joiner; private final Splitter splitter; - public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, - String queueName, MetricFactory metricFactory, - GaugeRegistry gaugeRegistry) { + public JMSCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, + String queueName, MetricFactory metricFactory, + GaugeRegistry gaugeRegistry) { try { connection = connectionFactory.createConnection(); connection.start(); @@ -204,6 +204,14 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori flux = Mono.defer(this::deQueueOneItem).repeat(); } + /** + * To allow connection reuse (the queue is cacheable), we don't close the queue + * on close(), use {@link JMSCacheableMailQueue#dispose} to release resources + */ + @Override + public void close() { + } + @Override public String getName() { return queueName; diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java index 051e3c0..701e0b3 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java @@ -48,8 +48,8 @@ public class JMSMailQueueFactory extends AbstractMailQueueFactory<ManageableMail } @Override - protected ManageableMailQueue createMailQueue(String name) { - return new JMSMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, metricFactory, gaugeRegistry); + protected ManageableMailQueue createCacheableMailQueue(String name) { + return new JMSCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, metricFactory, gaugeRegistry); } - + } diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java index ad10836..d20c642 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java @@ -48,13 +48,13 @@ public class JMSMailQueueItem implements MailQueueItem { if (success) { session.commit(); } else { - JMSMailQueue.rollback(session); + JMSCacheableMailQueue.rollback(session); } } catch (JMSException ex) { throw new MailQueueException("Unable to commit dequeue operation for mail " + mail.getName(), ex); } finally { - JMSMailQueue.closeConsumer(consumer); - JMSMailQueue.closeSession(session); + JMSCacheableMailQueue.closeConsumer(consumer); + JMSCacheableMailQueue.closeSession(session); } } diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java index e1a29dd..fdd4b86 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java @@ -102,7 +102,7 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M } private T createAndRegisterQueue(String name) { - T queue = createMailQueue(name); + T queue = createCacheableMailQueue(name); if (useJMX) { registerMBean(name, queue); } @@ -111,12 +111,10 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M } /** - * Create a {@link MailQueue} for the given name - * - * @param name - * @return queue + * Create a {@link MailQueue} for the given name that happens to do nothing on close() + * to be able to cache the instance */ - protected abstract T createMailQueue(String name); + protected abstract T createCacheableMailQueue(String name); protected synchronized void registerMBean(String queuename, MailQueue queue) { diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java similarity index 95% rename from server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java rename to server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java index f98cf9d..5a62538 100644 --- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java +++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java @@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(BrokerExtension.class) -public class JMSMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { +public class JMSCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { private JMSMailQueueFactory mailQueueFactory; diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java similarity index 92% rename from server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java rename to server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java index f436a1d..061218a 100644 --- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java +++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java @@ -19,8 +19,6 @@ package org.apache.james.queue.jms; -import javax.jms.ConnectionFactory; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; @@ -41,10 +39,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(BrokerExtension.class) -public class JMSMailQueueTest implements DelayedManageableMailQueueContract, PriorityManageableMailQueueContract, DelayedPriorityMailQueueContract, +public class JMSCacheableMailQueueTest implements DelayedManageableMailQueueContract, PriorityManageableMailQueueContract, DelayedPriorityMailQueueContract, MailQueueMetricContract { - private JMSMailQueue mailQueue; + private JMSCacheableMailQueue mailQueue; @BeforeEach void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) { @@ -56,7 +54,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri MetricFactory metricFactory = metricTestSystem.getMetricFactory(); GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry(); String queueName = BrokerExtension.generateRandomQueueName(broker); - mailQueue = new JMSMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, metricFactory, gaugeRegistry); + mailQueue = new JMSCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, metricFactory, gaugeRegistry); } @AfterEach diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java index 2ae2c50..2f25606 100644 --- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java +++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java @@ -46,7 +46,7 @@ public class AbstractMailQueueFactoryTest { mBeanServer = mock(MBeanServer.class); abstractMailQueueFactory = new AbstractMailQueueFactory<ManageableMailQueue>() { @Override - protected ManageableMailQueue createMailQueue(String name) { + protected ManageableMailQueue createCacheableMailQueue(String name) { return mock(ManageableMailQueue.class); } }; diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index 25b80b6..4aa75ec 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -19,6 +19,7 @@ package org.apache.james.queue.memory; +import java.io.IOException; import java.time.DateTimeException; import java.time.Duration; import java.time.Instant; @@ -60,7 +61,7 @@ import reactor.core.scheduler.Schedulers; public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> { - private final ConcurrentHashMap<String, MemoryMailQueueFactory.MemoryMailQueue> mailQueues; + private final ConcurrentHashMap<String, MemoryCacheableMailQueue> mailQueues; private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory; @Inject @@ -80,19 +81,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu } @Override - public MemoryMailQueueFactory.MemoryMailQueue createQueue(String name) { - MemoryMailQueueFactory.MemoryMailQueue newMailQueue = new MemoryMailQueue(name, mailQueueItemDecoratorFactory); - return Optional.ofNullable(mailQueues.putIfAbsent(name, newMailQueue)) - .orElse(newMailQueue); + public MemoryCacheableMailQueue createQueue(String name) { + return mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory)); } - public static class MemoryMailQueue implements ManageableMailQueue { + public static class MemoryCacheableMailQueue implements ManageableMailQueue { private final DelayQueue<MemoryMailQueueItem> mailItems; private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems; private final String name; private final Flux<MailQueueItem> flux; - public MemoryMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { + public MemoryCacheableMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { this.mailItems = new DelayQueue<>(); this.inProcessingMailItems = new LinkedBlockingDeque<>(); this.name = name; @@ -105,6 +104,11 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu } @Override + public void close() { + //There's no resource to free + } + + @Override public String getName() { return name; } @@ -244,7 +248,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu return false; } - MemoryMailQueue that = (MemoryMailQueue) o; + MemoryCacheableMailQueue that = (MemoryCacheableMailQueue) o; return Objects.equal(this.name, that.name); } @@ -257,10 +261,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu public static class MemoryMailQueueItem implements MailQueue.MailQueueItem, Delayed { private final Mail mail; - private final MemoryMailQueue queue; + private final MemoryCacheableMailQueue queue; private final ZonedDateTime delivery; - public MemoryMailQueueItem(Mail mail, MemoryMailQueue queue, ZonedDateTime delivery) { + public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, ZonedDateTime delivery) { this.mail = mail; this.queue = queue; this.delivery = delivery; diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java similarity index 93% rename from server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java rename to server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java index 28002ca..aa2e294 100644 --- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java +++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java @@ -26,7 +26,7 @@ import org.apache.james.queue.api.ManageableMailQueueFactoryContract; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.junit.jupiter.api.BeforeEach; -class MemoryMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { +class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract { MemoryMailQueueFactory memoryMailQueueFactory; diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java similarity index 91% rename from server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java rename to server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java index 085e565..5868995 100644 --- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java +++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java @@ -29,13 +29,13 @@ import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class MemoryMailQueueTest implements DelayedManageableMailQueueContract { +public class MemoryCacheableMailQueueTest implements DelayedManageableMailQueueContract { - private MemoryMailQueueFactory.MemoryMailQueue mailQueue; + private MemoryMailQueueFactory.MemoryCacheableMailQueue mailQueue; @BeforeEach public void setUp() { - mailQueue = new MemoryMailQueueFactory.MemoryMailQueue("test", new RawMailQueueItemDecoratorFactory()); + mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue("test", new RawMailQueueItemDecoratorFactory()); } @Override diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index ece7018..64c0c93 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX; +import java.io.Closeable; import java.io.IOException; import java.util.function.Consumer; import java.util.function.Function; @@ -40,13 +41,14 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.Receiver; -class Dequeuer { +class Dequeuer implements Closeable { private static final boolean REQUEUE = true; private static final int EXECUTION_RATE = 5; - private final Flux<AcknowledgableDelivery> flux; private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { + private final Consumer<Boolean> ack; private final EnqueueId enqueueId; private final Mail mail; @@ -70,12 +72,15 @@ class Dequeuer { public void done(boolean success) { ack.accept(success); } + } private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader; private final Metric dequeueMetric; private final MailReferenceSerializer mailReferenceSerializer; private final MailQueueView mailQueueView; + private final Receiver receiver; + private final Flux<AcknowledgableDelivery> flux; Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory, @@ -84,11 +89,17 @@ class Dequeuer { this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); - this.flux = receiverProvider.createReceiver() + this.receiver = receiverProvider.createReceiver(); + this.flux = this.receiver .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE)) .filter(getResponse -> getResponse.getBody() != null); } + @Override + public void close() { + receiver.close(); + } + Flux<? extends MailQueue.MailQueueItem> deQueue() { return flux.concatMap(this::loadItem) .concatMap(this::filterIfDeleted); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 2dc7663..b09fe30 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -58,6 +58,11 @@ public class RabbitMQMailQueue implements ManageableMailQueue { } @Override + public void close() { + dequeuer.close(); + } + + @Override public String getName() { return name.asString(); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index f771118..57bde2f 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -29,7 +29,6 @@ import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX import java.time.Clock; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import javax.inject.Inject; @@ -46,6 +45,7 @@ import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueItemDecoratorFactory; import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.james.util.OptionalUtils; import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; @@ -119,26 +119,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu } } - /** - * RabbitMQMailQueue should have a single instance in a given JVM for a given MailQueueName. - * This class helps at keeping track of previously instantiated MailQueues. - */ - private class RabbitMQMailQueueObjectPool { - - private final ConcurrentHashMap<MailQueueName, RabbitMQMailQueue> instantiatedQueues; - - RabbitMQMailQueueObjectPool() { - this.instantiatedQueues = new ConcurrentHashMap<>(); - } - - RabbitMQMailQueue retrieveInstanceFor(MailQueueName name) { - return instantiatedQueues.computeIfAbsent(name, privateFactory::create); - } - } - private final RabbitMQMailQueueManagement mqManagementApi; private final PrivateFactory privateFactory; - private final RabbitMQMailQueueObjectPool mailQueueObjectPool; private final Sender sender; @VisibleForTesting @@ -149,7 +131,6 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu this.sender = sender; this.mqManagementApi = mqManagementApi; this.privateFactory = privateFactory; - this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool(); } @Override @@ -166,8 +147,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu @Override public Set<RabbitMQMailQueue> listCreatedMailQueues() { + //TODO: it creates connections and leak them return mqManagementApi.listCreatedMailQueueNames() - .map(mailQueueObjectPool::retrieveInstanceFor) + .flatMap(name -> OptionalUtils.toStream(getQueue(name.asString()))) .collect(ImmutableSet.toImmutableSet()); } @@ -188,13 +170,13 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu .routingKey(EMPTY_ROUTING_KEY))) .then() .block(); - return mailQueueObjectPool.retrieveInstanceFor(mailQueueName); + return privateFactory.create(mailQueueName); } private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name) { return mqManagementApi.listCreatedMailQueueNames() .filter(name::equals) - .map(mailQueueObjectPool::retrieveInstanceFor) + .map(privateFactory::create) .findFirst(); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index a717476..5651d02 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -91,21 +91,4 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM return mailQueueFactory; } - @Test - void createQueueShouldReturnTheSameInstanceWhenParallelCreateSameQueueName() throws Exception { - Set<RabbitMQMailQueue> createdRabbitMQMailQueues = ConcurrentHashMap.newKeySet(); - - ConcurrentTestRunner.builder() - .operation((threadNumber, operationNumber) -> - createdRabbitMQMailQueues.add(mailQueueFactory.createQueue("spool"))) - .threadCount(100) - .operationCount(10) - .runSuccessfullyWithin(Duration.ofMinutes(10)); - - assertThat(mailQueueFactory.listCreatedMailQueues()) - .hasSize(1) - .isEqualTo(createdRabbitMQMailQueues) - .extracting(RabbitMQMailQueue::getName) - .hasOnlyOneElementSatisfying(queueName -> assertThat(queueName).isEqualTo("spool")); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org