This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fc2fa8730d79d4a1a7312af94408648afca126cc Author: Benoit Tellier <[email protected]> AuthorDate: Mon Oct 7 14:48:36 2019 +0700 JAMES-2937 Inject RabbitMQChannelPool This avoids repeating ourselves regarding Sender and Receiver specification and simplifies overall wiring. This also highlighted some 'dirty' initialization performed within MailSpool constructor, out of startable chain. --- backends-common/rabbitmq/pom.xml | 4 ++ .../rabbitmq/ReactorRabbitMQChannelPool.java | 36 ++++++++++++++-- .../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 8 ++-- .../james/mailbox/events/GroupRegistration.java | 11 +++-- .../mailbox/events/GroupRegistrationHandler.java | 17 +++----- .../mailbox/events/KeyRegistrationHandler.java | 10 ++--- .../james/mailbox/events/RabbitMQEventBus.java | 25 ++++------- .../james/mailbox/events/RabbitMQEventBusTest.java | 29 ++++++++----- .../rabbitmq/host/RabbitMQEventBusHostSystem.java | 10 +++-- .../modules/DistributedTaskManagerModule.java | 7 ++++ .../apache/james/jmap/draft/JMAPCommonModule.java | 10 +++++ .../james/modules/rabbitmq/RabbitMQModule.java | 32 +++++---------- .../apache/james/jmap/draft/send/MailSpool.java | 10 ++++- .../james/jmap/draft/send/MailSpoolTest.java | 1 + .../org/apache/james/queue/rabbitmq/Dequeuer.java | 8 ++-- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 5 ++- .../queue/rabbitmq/RabbitMQMailQueueFactory.java | 48 ++++++---------------- .../RabbitMQMailQueueConfigurationChangeTest.java | 17 ++++---- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 26 ++++-------- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 17 +++----- .../distributed/RabbitMQTerminationSubscriber.java | 4 +- .../distributed/RabbitMQWorkQueue.java | 34 ++++++--------- .../distributed/RabbitMQWorkQueueSupplier.scala | 5 +-- .../distributed/DistributedTaskManagerTest.java | 12 ++++-- .../distributed/RabbitMQWorkQueueTest.java | 11 +++-- server/task/task-memory/pom.xml | 4 ++ .../main/java/org/apache/james/task/WorkQueue.java | 8 +++- .../eventsourcing/EventSourcingTaskManager.scala | 9 ++-- 28 files changed, 216 insertions(+), 202 deletions(-) diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml index fa1c156..7cd59c1 100644 --- a/backends-common/rabbitmq/pom.xml +++ b/backends-common/rabbitmq/pom.xml @@ -37,6 +37,10 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-lifecycle-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-testing</artifactId> <scope>test</scope> </dependency> diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index 9e66225..e240b93 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -24,12 +24,15 @@ import java.util.Comparator; import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.BiConsumer; +import javax.annotation.PreDestroy; +import javax.inject.Inject; + import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; - +import org.apache.james.lifecycle.api.Startable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +45,13 @@ import reactor.core.publisher.SignalType; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.ChannelPool; import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Receiver; +import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; import reactor.rabbitmq.SenderOptions; -public class ReactorRabbitMQChannelPool implements ChannelPool { +public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { + private static final int MAX_CHANNELS_NUMBER = 5; static class ChannelFactory extends BasePooledObjectFactory<Channel> { @@ -95,6 +101,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { private final Mono<Connection> connectionMono; private final GenericObjectPool<Channel> pool; private final ConcurrentSkipListSet<Channel> borrowedChannels; + private Sender sender; + + @Inject + public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) { + this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER); + } public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize) { this.connectionMono = connectionMono; @@ -106,6 +118,22 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode)); } + public void start() { + sender = createSender(); + } + + public Sender getSender() { + return sender; + } + + public Receiver createReceiver() { + return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + } + + public Mono<Connection> getConnectionMono() { + return connectionMono; + } + @Override public Mono<? extends Channel> getChannelMono() { return Mono.fromCallable(() -> { @@ -127,7 +155,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { }; } - public Sender createSender() { + private Sender createSender() { return RabbitFlux.createSender(new SenderOptions() .connectionMono(connectionMono) .channelPool(this) @@ -146,8 +174,10 @@ public class ReactorRabbitMQChannelPool implements ChannelPool { } } + @PreDestroy @Override public void close() { + sender.close(); borrowedChannels.forEach(channel -> getChannelCloseHandler().accept(SignalType.ON_NEXT, channel)); borrowedChannels.clear(); pool.close(); diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java index 81994c8..21be0bd 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java @@ -66,9 +66,11 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract { } private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) { - return new ReactorRabbitMQChannelPool( - rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(), - poolSize); + ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool( + rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(), + poolSize); + reactorRabbitMQChannelPool.start(); + return reactorRabbitMQChannelPool; } // Pool wait timeout is an expected exception diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 953a4fb..dac4ae3 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -31,12 +31,13 @@ import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.Optional; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.event.json.EventSerializer; import org.apache.james.util.MDCBuilder; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; -import com.rabbitmq.client.Connection; + import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -45,9 +46,7 @@ import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; -import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; -import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; class GroupRegistration implements Registration { @@ -85,15 +84,15 @@ class GroupRegistration implements Registration { private final MailboxListenerExecutor mailboxListenerExecutor; private Optional<Disposable> receiverSubscriber; - GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer, + GroupRegistration(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer, MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters, Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) { this.eventSerializer = eventSerializer; this.mailboxListener = mailboxListener; this.queueName = WorkQueueName.of(group); - this.sender = sender; - this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier)); + this.sender = reactorRabbitMQChannelPool.getSender(); + this.receiver = reactorRabbitMQChannelPool.createReceiver(); this.mailboxListenerExecutor = mailboxListenerExecutor; this.receiverSubscriber = Optional.empty(); this.unregisterGroup = unregisterGroup; diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java index 2da507f..5337df6 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java @@ -23,28 +23,22 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.event.json.EventSerializer; -import com.rabbitmq.client.Connection; - -import reactor.core.publisher.Mono; -import reactor.rabbitmq.Sender; - class GroupRegistrationHandler { private final Map<Group, GroupRegistration> groupRegistrations; private final EventSerializer eventSerializer; - private final Sender sender; - private final Mono<Connection> connectionMono; private final RetryBackoffConfiguration retryBackoff; private final EventDeadLetters eventDeadLetters; private final MailboxListenerExecutor mailboxListenerExecutor; + private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; - GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, + GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) { this.eventSerializer = eventSerializer; - this.sender = sender; - this.connectionMono = connectionMono; + this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool; this.retryBackoff = retryBackoff; this.eventDeadLetters = eventDeadLetters; this.mailboxListenerExecutor = mailboxListenerExecutor; @@ -73,8 +67,7 @@ class GroupRegistrationHandler { private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) { return new GroupRegistration( - connectionMono, - sender, + reactorRabbitMQChannelPool, eventSerializer, listener, group, diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index 1f2fbae..dc968fe 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -28,6 +28,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID; import java.nio.charset.StandardCharsets; import java.util.Optional; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.event.json.EventSerializer; import org.apache.james.util.MDCBuilder; import org.apache.james.util.MDCStructuredLogger; @@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; import reactor.core.Disposable; @@ -45,9 +45,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; -import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; -import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; class KeyRegistrationHandler { @@ -64,13 +62,13 @@ class KeyRegistrationHandler { private final MailboxListenerExecutor mailboxListenerExecutor; private Optional<Disposable> receiverSubscriber; - KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) { + KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) { this.eventBusId = eventBusId; this.eventSerializer = eventSerializer; - this.sender = sender; + this.sender = reactorRabbitMQChannelPool.getSender(); this.routingKeyConverter = routingKeyConverter; this.localListenerRegistry = localListenerRegistry; - this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + this.receiver = reactorRabbitMQChannelPool.createReceiver(); this.mailboxListenerExecutor = mailboxListenerExecutor; this.registrationQueue = new RegistrationQueueName(); this.registrationBinder = new RegistrationBinder(sender, registrationQueue); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index a659062..06d786a 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -25,47 +25,42 @@ import javax.annotation.PreDestroy; import javax.inject.Inject; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.event.json.EventSerializer; import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.MetricFactory; import com.google.common.base.Preconditions; -import com.rabbitmq.client.Connection; + import reactor.core.publisher.Mono; -import reactor.rabbitmq.Sender; public class RabbitMQEventBus implements EventBus, Startable { - private static final int MAX_CHANNELS_NUMBER = 5; private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running"; static final String MAILBOX_EVENT = "mailboxEvent"; static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange"; static final String EVENT_BUS_ID = "eventBusId"; - private final Mono<Connection> connectionMono; private final EventSerializer eventSerializer; private final RoutingKeyConverter routingKeyConverter; private final RetryBackoffConfiguration retryBackoff; private final EventBusId eventBusId; private final EventDeadLetters eventDeadLetters; + private final ReactorRabbitMQChannelPool channelPool; private final MailboxListenerExecutor mailboxListenerExecutor; private volatile boolean isRunning; private volatile boolean isStopping; - private ReactorRabbitMQChannelPool channelPool; private GroupRegistrationHandler groupRegistrationHandler; private KeyRegistrationHandler keyRegistrationHandler; - EventDispatcher eventDispatcher; - private Sender sender; + private EventDispatcher eventDispatcher; @Inject - public RabbitMQEventBus(SimpleConnectionPool simpleConnectionPool, EventSerializer eventSerializer, + public RabbitMQEventBus(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer, RetryBackoffConfiguration retryBackoff, RoutingKeyConverter routingKeyConverter, EventDeadLetters eventDeadLetters, MetricFactory metricFactory) { + this.channelPool = reactorRabbitMQChannelPool; this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory); this.eventBusId = EventBusId.random(); - this.connectionMono = simpleConnectionPool.getResilientConnection(); this.eventSerializer = eventSerializer; this.routingKeyConverter = routingKeyConverter; this.retryBackoff = retryBackoff; @@ -76,13 +71,11 @@ public class RabbitMQEventBus implements EventBus, Startable { public void start() { if (!isRunning && !isStopping) { - this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); - sender = channelPool.createSender(); LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); - keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor); - groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor); - eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor); + keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor); + groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor); + eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor); eventDispatcher.start(); keyRegistrationHandler.start(); @@ -97,8 +90,6 @@ public class RabbitMQEventBus implements EventBus, Startable { isRunning = false; groupRegistrationHandler.stop(); keyRegistrationHandler.stop(); - sender.close(); - channelPool.close(); } } diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index ab6d885..2b92e20 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -56,7 +56,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy; import org.apache.james.backends.rabbitmq.RabbitMQFixture; import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI; -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.event.json.EventSerializer; import org.apache.james.mailbox.events.EventBusTestFixture.GroupA; import org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution; @@ -97,11 +97,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, private RabbitMQEventBus eventBus; private RabbitMQEventBus eventBus2; private RabbitMQEventBus eventBus3; - private Sender sender; private EventSerializer eventSerializer; private RoutingKeyConverter routingKeyConverter; private MemoryEventDeadLetters memoryEventDeadLetters; private Mono<Connection> resilientConnection; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; @BeforeEach void setUp() { @@ -110,6 +110,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, TestId.Factory mailboxIdFactory = new TestId.Factory(); eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer()); routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPool.start(); eventBus = newEventBus(); eventBus2 = newEventBus(); @@ -119,7 +121,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus2.start(); eventBus3.start(); resilientConnection = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); - sender = RabbitFlux.createSender(new SenderOptions().connectionMono(resilientConnection)); } @AfterEach @@ -129,17 +130,17 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus3.stop(); ALL_GROUPS.stream() .map(GroupRegistration.WorkQueueName::of) - .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName.asString())).block()); - sender.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block(); - sender.close(); + .forEach(queueName -> reactorRabbitMQChannelPool.getSender().delete(QueueSpecification.queue(queueName.asString())).block()); + reactorRabbitMQChannelPool.getSender().delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block(); + reactorRabbitMQChannelPool.close(); } private RabbitMQEventBus newEventBus() { - return newEventBus(rabbitMQExtension.getRabbitConnectionPool()); + return newEventBus(reactorRabbitMQChannelPool); } - private RabbitMQEventBus newEventBus(SimpleConnectionPool connectionPool) { - return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory()); + private RabbitMQEventBus newEventBus(ReactorRabbitMQChannelPool rabbitMQChannelPool) { + return new RabbitMQEventBus(rabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory()); } @Override @@ -340,10 +341,18 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, .restartPolicy(DockerRestartPolicy.PER_TEST); private RabbitMQEventBus rabbitMQEventBusWithNetWorkIssue; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPoolWithNetWorkIssue; @BeforeEach void beforeEach() { - rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPoolWithNetWorkIssue = new ReactorRabbitMQChannelPool(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPoolWithNetWorkIssue.start(); + rabbitMQEventBusWithNetWorkIssue = newEventBus(reactorRabbitMQChannelPoolWithNetWorkIssue); + } + + @AfterEach + void afterEach() { + reactorRabbitMQChannelPoolWithNetWorkIssue.close(); } @Test diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java index e93ac4f..25c3635 100644 --- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java +++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java @@ -20,10 +20,10 @@ package org.apache.james.mpt.imapmailbox.rabbitmq.host; -import java.net.URISyntaxException; import java.util.concurrent.TimeUnit; import org.apache.james.backends.rabbitmq.DockerRabbitMQ; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.core.quota.QuotaCount; import org.apache.james.core.quota.QuotaSize; @@ -63,6 +63,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { private RabbitMQEventBus eventBus; private SimpleConnectionPool connectionPool; private InMemoryIntegrationResources resources; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; RabbitMQEventBusHostSystem(DockerRabbitMQ dockerRabbitMQ) { this.dockerRabbitMQ = dockerRabbitMQ; @@ -73,6 +74,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { super.beforeTest(); connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory()); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool); + reactorRabbitMQChannelPool.start(); eventBus = createEventBus(); eventBus.start(); @@ -101,18 +104,19 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { defaultImapProcessorFactory); } - private RabbitMQEventBus createEventBus() throws URISyntaxException { + private RabbitMQEventBus createEventBus() { InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory(); InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory(); EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer()); RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory))); - return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, + return new RabbitMQEventBus(reactorRabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(), new NoopMetricFactory()); } @Override public void afterTest() { eventBus.stop(); + reactorRabbitMQChannelPool.close(); connectionPool.close(); } diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java index b74e6dd..ce0333f 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java +++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java @@ -30,6 +30,7 @@ import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule; import org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber; +import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue; import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier; import org.apache.james.utils.InitializationOperation; import org.apache.james.utils.InitilizationOperationBuilder; @@ -64,5 +65,11 @@ public class DistributedTaskManagerModule extends AbstractModule { .init(instance::start); } + @ProvidesIntoSet + InitializationOperation workQueue(EventSourcingTaskManager instance) { + return InitilizationOperationBuilder + .forClass(RabbitMQWorkQueue.class) + .init(instance::start); + } } diff --git a/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java b/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java index ed2c316..dce8120 100644 --- a/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java +++ b/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java @@ -42,6 +42,8 @@ import org.apache.james.lifecycle.api.StartUpCheck; import org.apache.james.util.date.DefaultZonedDateTimeProvider; import org.apache.james.util.date.ZonedDateTimeProvider; import org.apache.james.util.mime.MessageContentExtractor; +import org.apache.james.utils.InitializationOperation; +import org.apache.james.utils.InitilizationOperationBuilder; import org.apache.mailet.base.AutomaticallySentMailDetector; import org.apache.mailet.base.AutomaticallySentMailDetectorImpl; @@ -50,6 +52,7 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.multibindings.Multibinder; +import com.google.inject.multibindings.ProvidesIntoSet; import com.google.inject.name.Names; public class JMAPCommonModule extends AbstractModule { @@ -96,4 +99,11 @@ public class JMAPCommonModule extends AbstractModule { accessTokenAuthenticationStrategy, queryParameterAuthenticationStrategy); } + + @ProvidesIntoSet + InitializationOperation workQueue(MailSpool instance) { + return InitilizationOperationBuilder + .forClass(MailSpool.class) + .init(instance::start); + } } diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java index 5eb49df..4226aea 100644 --- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java +++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java @@ -29,10 +29,10 @@ import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.rabbitmq.RabbitMQChannelPool; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.SimpleChannelPool; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; -import org.apache.james.lifecycle.api.Startable; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; @@ -50,17 +50,18 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement; -import org.apache.james.utils.InitialisationOperation; +import org.apache.james.utils.InitializationOperation; +import org.apache.james.utils.InitilizationOperationBuilder; import org.apache.james.utils.PropertiesProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.inject.AbstractModule; -import com.google.inject.Inject; import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; +import com.google.inject.multibindings.ProvidesIntoSet; public class RabbitMQModule extends AbstractModule { @@ -70,6 +71,7 @@ public class RabbitMQModule extends AbstractModule { @Override protected void configure() { + bind(ReactorRabbitMQChannelPool.class).in(Scopes.SINGLETON); bind(EnqueuedMailsDAO.class).in(Scopes.SINGLETON); bind(DeletedMailsDAO.class).in(Scopes.SINGLETON); bind(BrowseStartDAO.class).in(Scopes.SINGLETON); @@ -88,7 +90,6 @@ public class RabbitMQModule extends AbstractModule { eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION); Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class); - Multibinder.newSetBinder(binder(), InitialisationOperation.class).addBinding().to(RabbitMQMailQueueFactoryInitialisationOperation.class); } @Provides @@ -145,23 +146,10 @@ public class RabbitMQModule extends AbstractModule { return RabbitMQMailQueueConfiguration.from(configuration); } - @Singleton - public static class RabbitMQMailQueueFactoryInitialisationOperation implements InitialisationOperation { - private final RabbitMQMailQueueFactory rabbitMQMailQueueFactory; - - @Inject - public RabbitMQMailQueueFactoryInitialisationOperation(RabbitMQMailQueueFactory rabbitMQMailQueueFactory) { - this.rabbitMQMailQueueFactory = rabbitMQMailQueueFactory; - } - - @Override - public void initModule() { - rabbitMQMailQueueFactory.start(); - } - - @Override - public Class<? extends Startable> forClass() { - return RabbitMQMailQueueFactory.class; - } + @ProvidesIntoSet + InitializationOperation workQueue(ReactorRabbitMQChannelPool instance) { + return InitilizationOperationBuilder + .forClass(ReactorRabbitMQChannelPool.class) + .init(instance::start); } } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java index d4ebbb6..c652bc9 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java @@ -21,6 +21,7 @@ package org.apache.james.jmap.draft.send; import javax.inject.Inject; +import org.apache.james.lifecycle.api.Startable; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueue.MailQueueException; import org.apache.james.queue.api.MailQueueFactory; @@ -30,12 +31,17 @@ import org.apache.mailet.Mail; import com.google.common.annotations.VisibleForTesting; -public class MailSpool { +public class MailSpool implements Startable { - private final MailQueue queue; + private final MailQueueFactory<?> queueFactory; + private MailQueue queue; @Inject @VisibleForTesting MailSpool(MailQueueFactory<?> queueFactory) { + this.queueFactory = queueFactory; + } + + public void start() { queue = queueFactory.createQueue(MailQueueFactory.SPOOL); } diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java index 147f21e..0552c2f 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java @@ -49,6 +49,7 @@ public class MailSpoolTest { myQueue = mailQueueFactory.createQueue(MailQueueFactory.SPOOL); mailSpool = new MailSpool(mailQueueFactory); + mailSpool.start(); } @Test diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index a6de706..21546b9 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; @@ -33,15 +34,12 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; import com.github.fge.lambdas.consumers.ThrowingConsumer; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.ConsumeOptions; -import reactor.rabbitmq.RabbitFlux; -import reactor.rabbitmq.ReceiverOptions; class Dequeuer { private static final boolean REQUEUE = true; @@ -79,14 +77,14 @@ class Dequeuer { private final MailReferenceSerializer mailReferenceSerializer; private final MailQueueView mailQueueView; - Dequeuer(MailQueueName name, Mono<Connection> connectionMono, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader, + Dequeuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory, MailQueueView mailQueueView) { this.mailLoader = mailLoader; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); - this.flux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)) + this.flux = reactorRabbitMQChannelPool.createReceiver() .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE)) .filter(getResponse -> getResponse.getBody() != null); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index ea7d04a..4e0485c 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -27,6 +27,7 @@ import java.time.Clock; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.metrics.api.Metric; @@ -51,11 +52,11 @@ class Enqueuer { private final MailQueueView mailQueueView; private final Clock clock; - Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, + Enqueuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, MailReferenceSerializer serializer, MetricFactory metricFactory, MailQueueView mailQueueView, Clock clock) { this.name = name; - this.sender = sender; + this.sender = reactorRabbitMQChannelPool.getSender(); this.mimeMessageStore = mimeMessageStore; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index b0748c0..b48f60f 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -32,17 +32,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.mail.internet.MimeMessage; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; -import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueueFactory; @@ -53,23 +50,18 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; -import com.rabbitmq.client.Connection; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.QueueSpecification; -import reactor.rabbitmq.Sender; -public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>, Startable { - private static final int MAX_CHANNELS_NUMBER = 5; +public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> { @VisibleForTesting static class PrivateFactory { private final MetricFactory metricFactory; private final GaugeRegistry gaugeRegistry; - private final Mono<Connection> connectionMono; - private final Sender sender; + private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final MailReferenceSerializer mailReferenceSerializer; private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader; @@ -81,7 +73,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu @Inject @VisibleForTesting PrivateFactory(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, - Mono<Connection> connectionMono, Sender sender, MimeMessageStore.Factory mimeMessageStoreFactory, + ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, + MimeMessageStore.Factory mimeMessageStoreFactory, BlobId.Factory blobIdFactory, MailQueueView.Factory mailQueueViewFactory, Clock clock, @@ -89,8 +82,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu RabbitMQMailQueueConfiguration configuration) { this.metricFactory = metricFactory; this.gaugeRegistry = gaugeRegistry; - this.connectionMono = connectionMono; - this.sender = sender; + this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool; this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore(); this.mailQueueViewFactory = mailQueueViewFactory; this.clock = clock; @@ -107,9 +99,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue( metricFactory, mailQueueName, - new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer, + new Enqueuer(mailQueueName, reactorRabbitMQChannelPool, mimeMessageStore, mailReferenceSerializer, metricFactory, mailQueueView, clock), - new Dequeuer(mailQueueName, connectionMono, mailLoader, mailReferenceSerializer, + new Dequeuer(mailQueueName, reactorRabbitMQChannelPool, mailLoader, mailReferenceSerializer, metricFactory, mailQueueView), mailQueueView, decoratorFactory); @@ -145,26 +137,19 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu private final RabbitMQMailQueueManagement mqManagementApi; private final PrivateFactory privateFactory; private final RabbitMQMailQueueObjectPool mailQueueObjectPool; - private final Mono<Connection> connectionMono; - private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; - private Sender sender; + private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; @VisibleForTesting @Inject - RabbitMQMailQueueFactory(SimpleConnectionPool simpleConnectionPool, + RabbitMQMailQueueFactory(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RabbitMQMailQueueManagement mqManagementApi, PrivateFactory privateFactory) { - this.connectionMono = simpleConnectionPool.getResilientConnection(); + this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool; this.mqManagementApi = mqManagementApi; this.privateFactory = privateFactory; this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool(); } - public void start() { - this.reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); - this.sender = reactorRabbitMQChannelPool.createSender(); - } - @Override public Optional<RabbitMQMailQueue> getQueue(String name) { return getQueueFromRabbitServer(MailQueueName.fromString(name)); @@ -187,15 +172,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) { String exchangeName = mailQueueName.toRabbitExchangeName().asString(); Flux.concat( - sender.declareExchange(ExchangeSpecification.exchange(exchangeName) + reactorRabbitMQChannelPool.getSender().declareExchange(ExchangeSpecification.exchange(exchangeName) .durable(true) .type("direct")), - sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString()) + reactorRabbitMQChannelPool.getSender().declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString()) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) .arguments(NO_ARGUMENTS)), - sender.bind(BindingSpecification.binding() + reactorRabbitMQChannelPool.getSender().bind(BindingSpecification.binding() .exchange(mailQueueName.toRabbitExchangeName().asString()) .queue(mailQueueName.toWorkQueueName().asString()) .routingKey(EMPTY_ROUTING_KEY))) @@ -210,11 +195,4 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu .map(mailQueueObjectPool::retrieveInstanceFor) .findFirst(); } - - @PreDestroy - public void stop() { - sender.close(); - reactorRabbitMQChannelPool.close(); - } - } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index 7c23f0f..4ad6f85 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -61,9 +61,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import com.github.fge.lambdas.Throwing; -import com.rabbitmq.client.Connection; - -import reactor.core.publisher.Mono; class RabbitMQMailQueueConfigurationChangeTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -79,7 +76,6 @@ class RabbitMQMailQueueConfigurationChangeTest { private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z"); private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS); private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS); - public static final int POOL_SIZE = 5; @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( @@ -94,6 +90,7 @@ class RabbitMQMailQueueConfigurationChangeTest { private UpdatableTickingClock clock; private RabbitMQMailQueueManagement mqManagementApi; private MimeMessageStore.Factory mimeMessageStoreFactory; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; @BeforeEach void setup(CassandraCluster cassandra) throws Exception { @@ -101,11 +98,14 @@ class RabbitMQMailQueueConfigurationChangeTest { mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO); clock = new UpdatableTickingClock(IN_SLICE_1); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPool.start(); } @AfterEach void tearDown() { mqManagementApi.deleteAllQueues(); + reactorRabbitMQChannelPool.close(); } private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, CassandraMailQueueViewConfiguration mailQueueViewConfiguration) throws Exception { @@ -119,21 +119,18 @@ class RabbitMQMailQueueConfigurationChangeTest { .sizeMetricsEnabled(true) .build(); - Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); - ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE); + RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), new NoopGaugeRegistry(), - connectionMono, - reactorRabbitMQChannelPool.createSender(), + reactorRabbitMQChannelPool, mimeMessageStoreFactory, BLOB_ID_FACTORY, mailQueueViewFactory, clock, new RawMailQueueItemDecoratorFactory(), mailQueueSizeConfiguration); - RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory); - mailQueueFactory.start(); + RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, privateFactory); return mailQueueFactory.createQueue(SPOOL); } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 127ed23..0102a05 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -44,6 +44,7 @@ import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobStore; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule; + import org.apache.james.metrics.api.Gauge; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueMetricContract; @@ -68,7 +69,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; import com.github.fge.lambdas.Throwing; -import com.rabbitmq.client.Connection; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -84,7 +84,6 @@ class RabbitMQMailQueueTest { private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS); private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS); private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS); - private static final int POOL_SIZE = 5; @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( @@ -100,9 +99,11 @@ class RabbitMQMailQueueTest { private UpdatableTickingClock clock; private RabbitMQMailQueue mailQueue; private RabbitMQMailQueueManagement mqManagementApi; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; @AfterEach void tearDown() { + reactorRabbitMQChannelPool.close(); mqManagementApi.deleteAllQueues(); } @@ -238,11 +239,6 @@ class RabbitMQMailQueueTest { })) .blockLast(); } - - @AfterEach - void tearDown() { - mqManagementApi.deleteAllQueues(); - } } @Nested @@ -264,11 +260,6 @@ class RabbitMQMailQueueTest { ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class); verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture()); } - - @AfterEach - void tearDown() { - mqManagementApi.deleteAllQueues(); - } } private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception { @@ -284,14 +275,12 @@ class RabbitMQMailQueueTest { .build(), mimeMessageStoreFactory); - Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); - ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE); - + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPool.start(); RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( metricTestSystem.getMetricFactory(), metricTestSystem.getSpyGaugeRegistry(), - connectionMono, - reactorRabbitMQChannelPool.createSender(), + reactorRabbitMQChannelPool, mimeMessageStoreFactory, BLOB_ID_FACTORY, mailQueueViewFactory, @@ -299,8 +288,7 @@ class RabbitMQMailQueueTest { new RawMailQueueItemDecoratorFactory(), configuration); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); - mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, factory); - mailQueueFactory.start(); + mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, factory); mailQueue = mailQueueFactory.createQueue(SPOOL); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index fb5b8b7..8ac46b3 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -46,19 +46,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import com.rabbitmq.client.Connection; - -import reactor.core.publisher.Mono; - class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); - public static final int POOL_SIZE = 5; @RegisterExtension static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ(); private RabbitMQMailQueueFactory mailQueueFactory; private RabbitMQMailQueueManagement mqManagementApi; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; @BeforeEach void setup() throws Exception { @@ -72,13 +68,12 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM .sizeMetricsEnabled(true) .build(); - Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); - ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPool.start(); RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), new NoopGaugeRegistry(), - connectionMono, - reactorRabbitMQChannelPool.createSender(), + reactorRabbitMQChannelPool, mimeMessageStoreFactory, BLOB_ID_FACTORY, mailQueueViewFactory, @@ -86,13 +81,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM new RawMailQueueItemDecoratorFactory(), configuration); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); - mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory); - mailQueueFactory.start(); + mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, privateFactory); } @AfterEach void tearDown() { mqManagementApi.deleteAllQueues(); + reactorRabbitMQChannelPool.close(); } @Override diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index d2bdeeb..dbb4b8f 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -42,6 +42,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; + import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Mono; @@ -83,7 +84,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta } public void start() { - sender = channelPool.createSender(); + channelPool.start(); + sender = channelPool.getSender(); sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block(); diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 21a9880..62a2f1b 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -25,8 +25,6 @@ import java.util.Optional; import java.util.UUID; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; -import org.apache.james.lifecycle.api.Startable; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.task.Task; import org.apache.james.task.TaskId; @@ -39,8 +37,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; + import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.UnicastProcessor; @@ -51,12 +49,11 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; -import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; -public class RabbitMQWorkQueue implements WorkQueue, Startable { +public class RabbitMQWorkQueue implements WorkQueue { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class); // Need at least one by receivers plus a shared one for senders @@ -71,10 +68,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { public static final String TASK_ID = "taskId"; private final TaskManagerWorker worker; - private final Mono<Connection> connectionMono; private final ReactorRabbitMQChannelPool channelPool; private final JsonTaskSerializer taskSerializer; - private Sender sender; private RabbitMQExclusiveConsumer receiver; private UnicastProcessor<TaskId> sendCancelRequestsQueue; private Disposable sendCancelRequestsQueueHandle; @@ -83,29 +78,28 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { private Sender cancelRequestSender; private Receiver cancelRequestListener; - public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) { + public RabbitMQWorkQueue(TaskManagerWorker worker, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, JsonTaskSerializer taskSerializer) { this.worker = worker; - this.connectionMono = simpleConnectionPool.getResilientConnection(); + this.channelPool = reactorRabbitMQChannelPool; this.taskSerializer = taskSerializer; - this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); } + @Override public void start() { startWorkqueue(); listenToCancelRequests(); } private void startWorkqueue() { - sender = channelPool.createSender(); - sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); - sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block(); - sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block(); + channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); + channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block(); + channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block(); consumeWorkqueue(); } private void consumeWorkqueue() { - receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); + receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(channelPool.getConnectionMono())); receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions()) .subscribeOn(Schedulers.boundedElastic()) .flatMap(this::executeTask) @@ -143,7 +137,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } void listenToCancelRequests() { - cancelRequestSender = channelPool.createSender(); + cancelRequestSender = channelPool.getSender(); String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString(); cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block(); @@ -159,8 +153,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { } private void registerCancelRequestsListener(String queueName) { - cancelRequestListener = RabbitFlux - .createReceiver(new ReceiverOptions().connectionMono(connectionMono)); + cancelRequestListener = channelPool.createReceiver(); cancelRequestListenerHandle = cancelRequestListener .consumeAutoAck(queueName) .subscribeOn(Schedulers.boundedElastic()) @@ -188,7 +181,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { .headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString())) .build(); OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload); - sender.send(Mono.just(outboundMessage)).block(); + channelPool.getSender().send(Mono.just(outboundMessage)).block(); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -203,12 +196,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable { public void close() { Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose); Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close); - Optional.ofNullable(sender).ifPresent(Sender::close); Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose); Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose); - Optional.ofNullable(sender).ifPresent(Sender::close); Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close); Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close); - channelPool.close(); } } diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala index f86be44..7feed07 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala @@ -22,13 +22,13 @@ import java.time.Duration import com.google.common.annotations.VisibleForTesting import javax.inject.Inject -import org.apache.james.backends.rabbitmq.SimpleConnectionPool +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.server.task.json.JsonTaskSerializer import org.apache.james.task.SerialTaskManagerWorker import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener} -class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: SimpleConnectionPool, +class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: ReactorRabbitMQChannelPool, private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier { val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30) @@ -41,7 +41,6 @@ class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: Si val listener = WorkerStatusListener(eventSourcingSystem) val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval) val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer) - rabbitMQWorkQueue.start() rabbitMQWorkQueue } } diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index 42f6f25..83c60d8 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -34,7 +34,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.rabbitmq.RabbitMQExtension; -import org.apache.james.backends.rabbitmq.SimpleConnectionPool; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension; @@ -74,11 +74,13 @@ import com.github.steveash.guavate.Guavate; class DistributedTaskManagerTest implements TaskManagerContract { + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; + private static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier { private final List<RabbitMQWorkQueue> workQueues; private final RabbitMQWorkQueueSupplier supplier; - TrackedRabbitMQWorkQueueSupplier(SimpleConnectionPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) { + TrackedRabbitMQWorkQueueSupplier(ReactorRabbitMQChannelPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) { workQueues = new ArrayList<>(); supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, taskSerializer); } @@ -86,6 +88,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Override public WorkQueue apply(EventSourcingSystem eventSourcingSystem) { RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, UPDATE_INFORMATION_POLLING_INTERVAL); + workQueue.start(); workQueues.add(workQueue); return workQueue; } @@ -138,7 +141,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { @BeforeEach void setUp(EventStore eventStore) { - workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), TASK_SERIALIZER); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPool.start(); + workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(reactorRabbitMQChannelPool, TASK_SERIALIZER); this.eventStore = eventStore; terminationSubscribers = new ArrayList<>(); } @@ -147,6 +152,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { void tearDown() { terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close); workQueueSupplier.stopWorkQueues(); + reactorRabbitMQChannelPool.close(); } public EventSourcingTaskManager taskManager() { diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java index 91af0e2..5468af1 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.server.task.json.TestTask; import org.apache.james.server.task.json.dto.TestTaskDTOModules; @@ -68,6 +69,7 @@ class RabbitMQWorkQueueTest { private RabbitMQWorkQueue testee; private ImmediateWorker worker; private JsonTaskSerializer serializer; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; private static class ImmediateWorker implements TaskManagerWorker { @@ -101,13 +103,16 @@ class RabbitMQWorkQueueTest { void setUp() { worker = spy(new ImmediateWorker()); serializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE); - testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitConnectionPool(), serializer); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool()); + reactorRabbitMQChannelPool.start(); + testee = new RabbitMQWorkQueue(worker, reactorRabbitMQChannelPool, serializer); testee.start(); } @AfterEach void tearDown() { testee.close(); + reactorRabbitMQChannelPool.close(); } @Test @@ -132,7 +137,7 @@ class RabbitMQWorkQueueTest { testee.submit(TASK_WITH_ID); ImmediateWorker otherTaskManagerWorker = new ImmediateWorker(); - try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), serializer)) { + try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, serializer)) { otherWorkQueue.start(); IntStream.range(0, 9) @@ -151,7 +156,7 @@ class RabbitMQWorkQueueTest { ImmediateWorker otherTaskManagerWorker = new ImmediateWorker(); JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE); - try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer)) { + try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, otherTaskSerializer)) { //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue. Thread.sleep(200); otherWorkQueue.start(); diff --git a/server/task/task-memory/pom.xml b/server/task/task-memory/pom.xml index 0b60d59..e6dccd7 100644 --- a/server/task/task-memory/pom.xml +++ b/server/task/task-memory/pom.xml @@ -44,6 +44,10 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-lifecycle-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-util</artifactId> </dependency> <dependency> diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java index ae363ff..8a4f490 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java @@ -20,7 +20,13 @@ package org.apache.james.task; import java.io.Closeable; -public interface WorkQueue extends Closeable { +import org.apache.james.lifecycle.api.Startable; + +public interface WorkQueue extends Closeable, Startable { + + default void start() { + + } void submit(TaskWithId taskWithId); diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 35856b0..fe2d869 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -22,14 +22,15 @@ import java.io.Closeable import java.time.Duration import java.util +import com.google.common.annotations.VisibleForTesting +import javax.annotation.PreDestroy import javax.inject.Inject import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.apache.james.eventsourcing.{AggregateId, Subscriber} +import org.apache.james.lifecycle.api.Startable import org.apache.james.task.TaskManager.ReachedTimeoutException import org.apache.james.task._ import org.apache.james.task.eventsourcing.TaskCommand._ -import com.google.common.annotations.VisibleForTesting -import javax.annotation.PreDestroy import reactor.core.publisher.{Flux, Mono} import reactor.core.scheduler.Schedulers @@ -38,7 +39,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] val eventStore: EventStore, val executionDetailsProjection: TaskExecutionDetailsProjection, val hostname: Hostname, - val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable { + val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable with Startable { private def workDispatcher: Subscriber = { case Created(aggregateId, _, task, _) => @@ -69,6 +70,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem) + def start(): Unit = workQueue.start() + override def submit(task: Task): TaskId = { val taskId = TaskId.generateTaskId val command = Create(taskId, task) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
