This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4b9135ae8e81cc707de1e910bf1de6d7d895dfa1 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Oct 4 11:52:59 2019 +0700 JAMES-2937 RabbitMQMailQueue should rely on ReactorRabbitMQChannelPool --- .../james/modules/rabbitmq/RabbitMQModule.java | 24 +++++ .../org/apache/james/queue/rabbitmq/Dequeuer.java | 12 ++- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 35 ++++--- .../apache/james/queue/rabbitmq/RabbitClient.java | 72 ------------- .../queue/rabbitmq/RabbitMQMailQueueFactory.java | 69 +++++++++++-- .../RabbitMQMailQueueConfigurationChangeTest.java | 16 ++- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 114 ++++++++++----------- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 17 ++- 8 files changed, 191 insertions(+), 168 deletions(-) diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java index bdb9f70..5eb49df 100644 --- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java +++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java @@ -32,6 +32,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck; import org.apache.james.backends.rabbitmq.SimpleChannelPool; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; +import org.apache.james.lifecycle.api.Startable; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; @@ -49,11 +50,13 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement; +import org.apache.james.utils.InitialisationOperation; import org.apache.james.utils.PropertiesProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.inject.AbstractModule; +import com.google.inject.Inject; import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; @@ -85,6 +88,7 @@ public class RabbitMQModule extends AbstractModule { eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION); Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class); + Multibinder.newSetBinder(binder(), InitialisationOperation.class).addBinding().to(RabbitMQMailQueueFactoryInitialisationOperation.class); } @Provides @@ -140,4 +144,24 @@ public class RabbitMQModule extends AbstractModule { private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) { return RabbitMQMailQueueConfiguration.from(configuration); } + + @Singleton + public static class RabbitMQMailQueueFactoryInitialisationOperation implements InitialisationOperation { + private final RabbitMQMailQueueFactory rabbitMQMailQueueFactory; + + @Inject + public RabbitMQMailQueueFactoryInitialisationOperation(RabbitMQMailQueueFactory rabbitMQMailQueueFactory) { + this.rabbitMQMailQueueFactory = rabbitMQMailQueueFactory; + } + + @Override + public void initModule() { + rabbitMQMailQueueFactory.start(); + } + + @Override + public Class<? extends Startable> forClass() { + return RabbitMQMailQueueFactory.class; + } + } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index ae9829e..a6de706 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -33,15 +33,19 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; import com.github.fge.lambdas.consumers.ThrowingConsumer; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.Delivery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.rabbitmq.AcknowledgableDelivery; +import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.ReceiverOptions; class Dequeuer { - private static final boolean REQUEUE = true; + private static final int EXECUTION_RATE = 5; private final Flux<AcknowledgableDelivery> flux; private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { @@ -75,15 +79,15 @@ class Dequeuer { private final MailReferenceSerializer mailReferenceSerializer; private final MailQueueView mailQueueView; - Dequeuer(MailQueueName name, RabbitClient rabbitClient, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader, + Dequeuer(MailQueueName name, Mono<Connection> connectionMono, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory, MailQueueView mailQueueView) { this.mailLoader = mailLoader; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); - this.flux = rabbitClient - .receive(name) + this.flux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono)) + .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE)) .filter(getResponse -> getResponse.getBody() != null); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 776a739..ea7d04a 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -19,6 +19,7 @@ package org.apache.james.queue.rabbitmq; +import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX; import java.time.Clock; @@ -38,21 +39,23 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Mono; +import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.Sender; class Enqueuer { private final MailQueueName name; - private final RabbitClient rabbitClient; + private final Sender sender; private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final MailReferenceSerializer mailReferenceSerializer; private final Metric enqueueMetric; private final MailQueueView mailQueueView; private final Clock clock; - Enqueuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, + Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, MailReferenceSerializer serializer, MetricFactory metricFactory, MailQueueView mailQueueView, Clock clock) { this.name = name; - this.rabbitClient = rabbitClient; + this.sender = sender; this.mimeMessageStore = mimeMessageStore; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; @@ -64,7 +67,7 @@ class Enqueuer { EnqueueId enqueueId = EnqueueId.generate(); saveMail(mail) .map(partIds -> new MailReference(enqueueId, mail, partIds)) - .map(Throwing.function(this::publishReferenceToRabbit).sneakyThrow()) + .flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow()) .flatMap(mailQueueView::storeMail) .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)) .block(); @@ -78,16 +81,20 @@ class Enqueuer { } } - private EnqueuedItem publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException { - rabbitClient.publish(name, getMailReferenceBytes(mailReference)); - - return EnqueuedItem.builder() - .enqueueId(mailReference.getEnqueueId()) - .mailQueueName(name) - .mail(mailReference.getMail()) - .enqueuedTime(clock.instant()) - .mimeMessagePartsId(mailReference.getPartsId()) - .build(); + private Mono<EnqueuedItem> publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException { + OutboundMessage data = new OutboundMessage( + name.toRabbitExchangeName().asString(), + EMPTY_ROUTING_KEY, + getMailReferenceBytes(mailReference)); + return sender.send(Mono.just(data)) + .then(Mono.just( + EnqueuedItem.builder() + .enqueueId(mailReference.getEnqueueId()) + .mailQueueName(name) + .mail(mailReference.getMail()) + .enqueuedTime(clock.instant()) + .mimeMessagePartsId(mailReference.getPartsId()) + .build())); } private byte[] getMailReferenceBytes(MailReference mailReference) throws MailQueue.MailQueueException { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java deleted file mode 100644 index 779e71e..0000000 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java +++ /dev/null @@ -1,72 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.queue.rabbitmq; - -import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; -import static org.apache.james.backends.rabbitmq.Constants.DURABLE; -import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; -import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; - -import java.io.IOException; - -import javax.inject.Inject; - -import org.apache.james.backends.rabbitmq.RabbitMQChannelPool; -import org.apache.james.queue.api.MailQueue; - -import com.rabbitmq.client.AMQP; -import reactor.core.publisher.Flux; -import reactor.rabbitmq.AcknowledgableDelivery; - -class RabbitClient { - private final RabbitMQChannelPool channelPool; - - @Inject - RabbitClient(RabbitMQChannelPool channelPool) { - this.channelPool = channelPool; - } - - void attemptQueueCreation(MailQueueName name) { - channelPool.execute(channel -> { - try { - channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE); - channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS); - channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(), EMPTY_ROUTING_KEY); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException { - channelPool.execute(channel -> { - try { - channel.basicPublish(name.toRabbitExchangeName().asString(), EMPTY_ROUTING_KEY, new AMQP.BasicProperties(), message); - } catch (IOException e) { - throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e); - } - }); - } - - Flux<AcknowledgableDelivery> receive(MailQueueName name) { - return channelPool.receive(name.toWorkQueueName().asString()); - } -} diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index 0e1bf95..b0748c0 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -19,6 +19,11 @@ package org.apache.james.queue.rabbitmq; +import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; +import static org.apache.james.backends.rabbitmq.Constants.DURABLE; +import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; +import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; +import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX; import java.time.Clock; @@ -27,13 +32,17 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.mail.internet.MimeMessage; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; +import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueueFactory; @@ -44,13 +53,23 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.rabbitmq.client.Connection; -public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> { +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Sender; + +public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>, Startable { + private static final int MAX_CHANNELS_NUMBER = 5; @VisibleForTesting static class PrivateFactory { private final MetricFactory metricFactory; private final GaugeRegistry gaugeRegistry; - private final RabbitClient rabbitClient; + private final Mono<Connection> connectionMono; + private final Sender sender; private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final MailReferenceSerializer mailReferenceSerializer; private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader; @@ -62,8 +81,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu @Inject @VisibleForTesting PrivateFactory(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, - RabbitClient rabbitClient, - MimeMessageStore.Factory mimeMessageStoreFactory, + Mono<Connection> connectionMono, Sender sender, MimeMessageStore.Factory mimeMessageStoreFactory, BlobId.Factory blobIdFactory, MailQueueView.Factory mailQueueViewFactory, Clock clock, @@ -71,7 +89,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu RabbitMQMailQueueConfiguration configuration) { this.metricFactory = metricFactory; this.gaugeRegistry = gaugeRegistry; - this.rabbitClient = rabbitClient; + this.connectionMono = connectionMono; + this.sender = sender; this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore(); this.mailQueueViewFactory = mailQueueViewFactory; this.clock = clock; @@ -88,9 +107,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue( metricFactory, mailQueueName, - new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer, + new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer, metricFactory, mailQueueView, clock), - new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer, + new Dequeuer(mailQueueName, connectionMono, mailLoader, mailReferenceSerializer, metricFactory, mailQueueView), mailQueueView, decoratorFactory); @@ -123,22 +142,29 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu } } - private final RabbitClient rabbitClient; private final RabbitMQMailQueueManagement mqManagementApi; private final PrivateFactory privateFactory; private final RabbitMQMailQueueObjectPool mailQueueObjectPool; + private final Mono<Connection> connectionMono; + private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool; + private Sender sender; @VisibleForTesting @Inject - RabbitMQMailQueueFactory(RabbitClient rabbitClient, + RabbitMQMailQueueFactory(SimpleConnectionPool simpleConnectionPool, RabbitMQMailQueueManagement mqManagementApi, PrivateFactory privateFactory) { - this.rabbitClient = rabbitClient; + this.connectionMono = simpleConnectionPool.getResilientConnection(); this.mqManagementApi = mqManagementApi; this.privateFactory = privateFactory; this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool(); } + public void start() { + this.reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); + this.sender = reactorRabbitMQChannelPool.createSender(); + } + @Override public Optional<RabbitMQMailQueue> getQueue(String name) { return getQueueFromRabbitServer(MailQueueName.fromString(name)); @@ -159,7 +185,22 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu } private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) { - rabbitClient.attemptQueueCreation(mailQueueName); + String exchangeName = mailQueueName.toRabbitExchangeName().asString(); + Flux.concat( + sender.declareExchange(ExchangeSpecification.exchange(exchangeName) + .durable(true) + .type("direct")), + sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString()) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(!AUTO_DELETE) + .arguments(NO_ARGUMENTS)), + sender.bind(BindingSpecification.binding() + .exchange(mailQueueName.toRabbitExchangeName().asString()) + .queue(mailQueueName.toWorkQueueName().asString()) + .routingKey(EMPTY_ROUTING_KEY))) + .then() + .block(); return mailQueueObjectPool.retrieveInstanceFor(mailQueueName); } @@ -170,4 +211,10 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu .findFirst(); } + @PreDestroy + public void stop() { + sender.close(); + reactorRabbitMQChannelPool.close(); + } + } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index 464faf2..7c23f0f 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -36,6 +36,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobStore; @@ -60,6 +61,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import com.github.fge.lambdas.Throwing; +import com.rabbitmq.client.Connection; + +import reactor.core.publisher.Mono; class RabbitMQMailQueueConfigurationChangeTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -75,6 +79,7 @@ class RabbitMQMailQueueConfigurationChangeTest { private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z"); private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS); private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS); + public static final int POOL_SIZE = 5; @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( @@ -88,7 +93,6 @@ class RabbitMQMailQueueConfigurationChangeTest { private UpdatableTickingClock clock; private RabbitMQMailQueueManagement mqManagementApi; - private RabbitClient rabbitClient; private MimeMessageStore.Factory mimeMessageStoreFactory; @BeforeEach @@ -96,7 +100,6 @@ class RabbitMQMailQueueConfigurationChangeTest { CassandraBlobStore blobsDAO = new CassandraBlobStore(cassandra.getConf()); mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO); clock = new UpdatableTickingClock(IN_SLICE_1); - rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); } @@ -111,21 +114,26 @@ class RabbitMQMailQueueConfigurationChangeTest { mailQueueViewConfiguration, mimeMessageStoreFactory); + RabbitMQMailQueueConfiguration mailQueueSizeConfiguration = RabbitMQMailQueueConfiguration.builder() .sizeMetricsEnabled(true) .build(); + Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); + ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE); RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), new NoopGaugeRegistry(), - rabbitClient, + connectionMono, + reactorRabbitMQChannelPool.createSender(), mimeMessageStoreFactory, BLOB_ID_FACTORY, mailQueueViewFactory, clock, new RawMailQueueItemDecoratorFactory(), mailQueueSizeConfiguration); - RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, privateFactory); + RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory); + mailQueueFactory.start(); return mailQueueFactory.createQueue(SPOOL); } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index ea738d9..127ed23 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -38,6 +38,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobStore; @@ -67,6 +68,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; import com.github.fge.lambdas.Throwing; +import com.rabbitmq.client.Connection; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -82,6 +84,7 @@ class RabbitMQMailQueueTest { private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS); private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS); private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS); + private static final int POOL_SIZE = 5; @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( @@ -107,36 +110,11 @@ class RabbitMQMailQueueTest { class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract { @BeforeEach void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception { - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); - MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); - clock = new UpdatableTickingClock(IN_SLICE_1); - - MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), - CassandraMailQueueViewConfiguration.builder() - .bucketCount(THREE_BUCKET_COUNT) - .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) - .sliceWindow(ONE_HOUR_SLICE_WINDOW) - .build(), - mimeMessageStoreFactory); - - RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder() - .sizeMetricsEnabled(true) - .build(); - - RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); - RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( - metricTestSystem.getMetricFactory(), - metricTestSystem.getSpyGaugeRegistry(), - rabbitClient, - mimeMessageStoreFactory, - BLOB_ID_FACTORY, - mailQueueViewFactory, - clock, - new RawMailQueueItemDecoratorFactory(), - configuration); - mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); - mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); - mailQueue = mailQueueFactory.createQueue(SPOOL); + setUp(cassandra, + metricTestSystem, + RabbitMQMailQueueConfiguration.builder() + .sizeMetricsEnabled(true) + .build()); } @Override @@ -260,6 +238,11 @@ class RabbitMQMailQueueTest { })) .blockLast(); } + + @AfterEach + void tearDown() { + mqManagementApi.deleteAllQueues(); + } } @Nested @@ -269,36 +252,11 @@ class RabbitMQMailQueueTest { @BeforeEach void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception { - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); - MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); - clock = new UpdatableTickingClock(IN_SLICE_1); - - MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), - CassandraMailQueueViewConfiguration.builder() - .bucketCount(THREE_BUCKET_COUNT) - .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) - .sliceWindow(ONE_HOUR_SLICE_WINDOW) - .build(), - mimeMessageStoreFactory); - - RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder() + setUp(cassandra, + metricTestSystem, + RabbitMQMailQueueConfiguration.builder() .sizeMetricsEnabled(false) - .build(); - - RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); - RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( - metricTestSystem.getMetricFactory(), - metricTestSystem.getSpyGaugeRegistry(), - rabbitClient, - mimeMessageStoreFactory, - BLOB_ID_FACTORY, - mailQueueViewFactory, - clock, - new RawMailQueueItemDecoratorFactory(), - configuration); - mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); - mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); - mailQueue = mailQueueFactory.createQueue(SPOOL); + .build()); } @Test @@ -306,5 +264,43 @@ class RabbitMQMailQueueTest { ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class); verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture()); } + + @AfterEach + void tearDown() { + mqManagementApi.deleteAllQueues(); + } + } + + private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception { + CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); + clock = new UpdatableTickingClock(IN_SLICE_1); + + MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), + CassandraMailQueueViewConfiguration.builder() + .bucketCount(THREE_BUCKET_COUNT) + .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) + .sliceWindow(ONE_HOUR_SLICE_WINDOW) + .build(), + mimeMessageStoreFactory); + + Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); + ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE); + + RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( + metricTestSystem.getMetricFactory(), + metricTestSystem.getSpyGaugeRegistry(), + connectionMono, + reactorRabbitMQChannelPool.createSender(), + mimeMessageStoreFactory, + BLOB_ID_FACTORY, + mailQueueViewFactory, + clock, + new RawMailQueueItemDecoratorFactory(), + configuration); + mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); + mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, factory); + mailQueueFactory.start(); + mailQueue = mailQueueFactory.createQueue(SPOOL); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 625f988..fb5b8b7 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.metrics.api.NoopGaugeRegistry; @@ -45,8 +46,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.rabbitmq.client.Connection; + +import reactor.core.publisher.Mono; + class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + public static final int POOL_SIZE = 5; @RegisterExtension static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ(); @@ -66,11 +72,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM .sizeMetricsEnabled(true) .build(); - RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); - RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( + Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(); + ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE); + RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), new NoopGaugeRegistry(), - rabbitClient, + connectionMono, + reactorRabbitMQChannelPool.createSender(), mimeMessageStoreFactory, BLOB_ID_FACTORY, mailQueueViewFactory, @@ -78,7 +86,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM new RawMailQueueItemDecoratorFactory(), configuration); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); - mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); + mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory); + mailQueueFactory.start(); } @AfterEach --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
