This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7bf2df7c270889ae8ae2e0975c7b239957386e5a Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Thu Oct 3 15:35:35 2019 +0700 JAMES-2760 Add a configuration parameter for enabling/disabling the metrics on mail queue size of RabbitMQ --- .../apache/james/modules/TestRabbitMQModule.java | 7 + .../james/modules/rabbitmq/RabbitMQModule.java | 7 + .../queue/rabbitmq/RabbitMQMailQueueFactory.java | 10 +- .../view/RabbitMQMailQueueConfiguration.java | 96 ++++++ .../RabbitMQMailQueueConfigurationChangeTest.java | 10 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 352 ++++++++++++--------- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 8 +- .../view/RabbitMQMailQueueConfigurationTest.java | 52 +++ 8 files changed, 391 insertions(+), 151 deletions(-) diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java index e17539b..35438ac 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java +++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java @@ -31,6 +31,7 @@ import org.apache.james.CleanupTasksPerformer; import org.apache.james.backends.rabbitmq.DockerRabbitMQ; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.queue.rabbitmq.RabbitMQMailQueueManagement; +import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import com.google.inject.AbstractModule; @@ -69,6 +70,12 @@ public class TestRabbitMQModule extends AbstractModule { .build(); } + @Provides + @Singleton + private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration() { + return RabbitMQMailQueueConfiguration.sizeMetricsEnabled(); + } + public static class QueueCleanUp implements CleanupTasksPerformer.CleanupTask { private final RabbitMQMailQueueManagement api; diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java index 09a7bfe..16a168e 100644 --- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java +++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java @@ -34,6 +34,7 @@ import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory; +import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.BrowseStartDAO; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; @@ -119,4 +120,10 @@ public class RabbitMQModule extends AbstractModule { private CassandraMailQueueViewConfiguration getMailQueueViewConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) { return CassandraMailQueueViewConfiguration.from(configuration); } + + @Provides + @Singleton + private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) { + return RabbitMQMailQueueConfiguration.from(configuration); + } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index adaf3de..a888da0 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -38,6 +38,7 @@ import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueItemDecoratorFactory; +import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import com.github.fge.lambdas.Throwing; @@ -56,6 +57,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu private final MailQueueView.Factory mailQueueViewFactory; private final Clock clock; private final MailQueueItemDecoratorFactory decoratorFactory; + private final RabbitMQMailQueueConfiguration configuration; @Inject @VisibleForTesting PrivateFactory(MetricFactory metricFactory, @@ -65,7 +67,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu BlobId.Factory blobIdFactory, MailQueueView.Factory mailQueueViewFactory, Clock clock, - MailQueueItemDecoratorFactory decoratorFactory) { + MailQueueItemDecoratorFactory decoratorFactory, + RabbitMQMailQueueConfiguration configuration) { this.metricFactory = metricFactory; this.gaugeRegistry = gaugeRegistry; this.rabbitClient = rabbitClient; @@ -75,6 +78,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu this.decoratorFactory = decoratorFactory; this.mailReferenceSerializer = new MailReferenceSerializer(); this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow(); + this.configuration = configuration; } RabbitMQMailQueue create(MailQueueName mailQueueName) { @@ -96,7 +100,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu } private void registerGaugeFor(RabbitMQMailQueue rabbitMQMailQueue) { - this.gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + rabbitMQMailQueue.getName(), rabbitMQMailQueue::getSize); + if (configuration.isSizeMetricsEnabled()) { + this.gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + rabbitMQMailQueue.getName(), rabbitMQMailQueue::getSize); + } } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfiguration.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfiguration.java new file mode 100644 index 0000000..b05c0bc --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfiguration.java @@ -0,0 +1,96 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view; + +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.configuration2.Configuration; + +public class RabbitMQMailQueueConfiguration { + private static final boolean DEFAULT_SIZE_METRICS_ENABLED = true; + + public static class Builder { + private Optional<Boolean> sizeMetricsEnabled; + + public Builder sizeMetricsEnabled(boolean sizeMetricsEnabled) { + this.sizeMetricsEnabled = Optional.of(sizeMetricsEnabled); + return this; + } + + public Builder sizeMetricsEnabled(Optional<Boolean> sizeMetricsEnabled) { + this.sizeMetricsEnabled = sizeMetricsEnabled; + return this; + } + + public RabbitMQMailQueueConfiguration build() { + return new RabbitMQMailQueueConfiguration(sizeMetricsEnabled.orElse(DEFAULT_SIZE_METRICS_ENABLED)); + } + } + + public static final String SIZE_METRICS_ENABLED_PROPERTY = "mailqueue.size.metricsEnabled"; + + public static Builder builder() { + return new Builder(); + } + + public static RabbitMQMailQueueConfiguration from(Configuration configuration) { + return builder() + .sizeMetricsEnabled(Optional.ofNullable(configuration.getBoolean(SIZE_METRICS_ENABLED_PROPERTY, null))) + .build(); + } + + public static RabbitMQMailQueueConfiguration sizeMetricsEnabled() { + return builder() + .sizeMetricsEnabled(true) + .build(); + } + + public static RabbitMQMailQueueConfiguration sizeMetricsDisabled() { + return builder() + .sizeMetricsEnabled(false) + .build(); + } + + private final boolean sizeMetricsEnabled; + + private RabbitMQMailQueueConfiguration(boolean sizeMetricsEnabled) { + this.sizeMetricsEnabled = sizeMetricsEnabled; + } + + public boolean isSizeMetricsEnabled() { + return sizeMetricsEnabled; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RabbitMQMailQueueConfiguration) { + RabbitMQMailQueueConfiguration that = (RabbitMQMailQueueConfiguration) o; + + return Objects.equals(this.sizeMetricsEnabled, that.sizeMetricsEnabled); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(sizeMetricsEnabled); + } +} diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index b17e938..464faf2 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -31,11 +31,11 @@ import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; +import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobStore; @@ -46,6 +46,7 @@ import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; +import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory; @@ -110,6 +111,10 @@ class RabbitMQMailQueueConfigurationChangeTest { mailQueueViewConfiguration, mimeMessageStoreFactory); + RabbitMQMailQueueConfiguration mailQueueSizeConfiguration = RabbitMQMailQueueConfiguration.builder() + .sizeMetricsEnabled(true) + .build(); + RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), new NoopGaugeRegistry(), @@ -118,7 +123,8 @@ class RabbitMQMailQueueConfigurationChangeTest { BLOB_ID_FACTORY, mailQueueViewFactory, clock, - new RawMailQueueItemDecoratorFactory()); + new RawMailQueueItemDecoratorFactory(), + mailQueueSizeConfiguration); RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, privateFactory); return mailQueueFactory.createQueue(SPOOL); } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 703b233..ea738d9 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -23,6 +23,9 @@ import static java.time.temporal.ChronoUnit.HOURS; import static org.apache.james.queue.api.Mails.defaultMail; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import java.time.Duration; import java.time.Instant; @@ -30,23 +33,24 @@ import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.apache.james.backends.rabbitmq.DockerRabbitMQ; -import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; +import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobStore; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule; +import org.apache.james.metrics.api.Gauge; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueMetricContract; import org.apache.james.queue.api.MailQueueMetricExtension; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.ManageableMailQueueContract; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; +import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory; @@ -57,15 +61,17 @@ import org.apache.mailet.Mail; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract { +class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static final int THREE_BUCKET_COUNT = 3; private static final int UPDATE_BROWSE_START_PACE = 2; @@ -92,159 +98,213 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ private RabbitMQMailQueue mailQueue; private RabbitMQMailQueueManagement mqManagementApi; - @Override - public void enQueue(Mail mail) throws MailQueue.MailQueueException { - ManageableMailQueueContract.super.enQueue(mail); - clock.tick(); + @AfterEach + void tearDown() { + mqManagementApi.deleteAllQueues(); } - @BeforeEach - void setup(DockerRabbitMQ rabbitMQ, CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception { - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); - MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); - clock = new UpdatableTickingClock(IN_SLICE_1); + @Nested + class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract { + @BeforeEach + void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception { + CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); + clock = new UpdatableTickingClock(IN_SLICE_1); - MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), - CassandraMailQueueViewConfiguration.builder() + MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), + CassandraMailQueueViewConfiguration.builder() .bucketCount(THREE_BUCKET_COUNT) .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) .sliceWindow(ONE_HOUR_SLICE_WINDOW) .build(), - mimeMessageStoreFactory); - - RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); - RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( - metricTestSystem.getMetricFactory(), - metricTestSystem.getSpyGaugeRegistry(), - rabbitClient, - mimeMessageStoreFactory, - BLOB_ID_FACTORY, - mailQueueViewFactory, - clock, - new RawMailQueueItemDecoratorFactory()); - mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); - mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); - mailQueue = mailQueueFactory.createQueue(SPOOL); - } - - @AfterEach - void tearDown() { - mqManagementApi.deleteAllQueues(); - } - - @Override - public MailQueue getMailQueue() { - return mailQueue; + mimeMessageStoreFactory); + + RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder() + .sizeMetricsEnabled(true) + .build(); + + RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); + RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( + metricTestSystem.getMetricFactory(), + metricTestSystem.getSpyGaugeRegistry(), + rabbitClient, + mimeMessageStoreFactory, + BLOB_ID_FACTORY, + mailQueueViewFactory, + clock, + new RawMailQueueItemDecoratorFactory(), + configuration); + mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); + mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); + mailQueue = mailQueueFactory.createQueue(SPOOL); + } + + @Override + public void enQueue(Mail mail) throws MailQueue.MailQueueException { + ManageableMailQueueContract.super.enQueue(mail); + clock.tick(); + } + + @Override + public MailQueue getMailQueue() { + return mailQueue; + } + + @Override + public ManageableMailQueue getManageableMailQueue() { + return mailQueue; + } + + @Test + void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception { + ManageableMailQueue mailQueue = getManageableMailQueue(); + int emailCount = 5; + + clock.setInstant(IN_SLICE_1); + enqueueSomeMails(namePatternForSlice(1), emailCount); + + clock.setInstant(IN_SLICE_2); + enqueueSomeMails(namePatternForSlice(2), emailCount); + + clock.setInstant(IN_SLICE_3); + enqueueSomeMails(namePatternForSlice(3), emailCount); + + clock.setInstant(IN_SLICE_5); + enqueueSomeMails(namePatternForSlice(5), emailCount); + + clock.setInstant(IN_SLICE_7); + Stream<String> names = Iterators.toStream(mailQueue.browse()) + .map(ManageableMailQueue.MailQueueItemView::getMail) + .map(Mail::getName); + + assertThat(names).containsExactly( + "1-1", "1-2", "1-3", "1-4", "1-5", + "2-1", "2-2", "2-3", "2-4", "2-5", + "3-1", "3-2", "3-3", "3-4", "3-5", + "5-1", "5-2", "5-3", "5-4", "5-5"); + } + + @Test + void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception { + ManageableMailQueue mailQueue = getManageableMailQueue(); + int emailCount = 5; + + clock.setInstant(IN_SLICE_1); + enqueueSomeMails(namePatternForSlice(1), emailCount); + + clock.setInstant(IN_SLICE_2); + enqueueSomeMails(namePatternForSlice(2), emailCount); + + clock.setInstant(IN_SLICE_3); + enqueueSomeMails(namePatternForSlice(3), emailCount); + + clock.setInstant(IN_SLICE_5); + enqueueSomeMails(namePatternForSlice(5), emailCount); + + clock.setInstant(IN_SLICE_7); + dequeueMails(5); + dequeueMails(5); + dequeueMails(3); + + Stream<String> names = Iterators.toStream(mailQueue.browse()) + .map(ManageableMailQueue.MailQueueItemView::getMail) + .map(Mail::getName); + + assertThat(names) + .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"); + } + + private Function<Integer, String> namePatternForSlice(int sliceId) { + return i -> sliceId + "-" + i; + } + + @Test + void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) { + String name = "myQueue"; + mailQueueFactory.createQueue(name); + + boolean initialized = CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), MailQueueName.fromString(name)); + assertThat(initialized).isTrue(); + } + + @Test + void enQueueShouldNotThrowOnMailNameWithNegativeHash() { + String negativehashedString = "this sting will have a negative hash"; //hash value: -1256871313 + + assertThatCode(() -> getMailQueue().enQueue(defaultMail().name(negativehashedString).build())) + .doesNotThrowAnyException(); + } + + @Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable." + + "The related test is disabled, and need to be re-enabled after investigation and a fix.") + @Test + @Override + public void concurrentEnqueueDequeueShouldNotFail() { + + } + + private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) { + IntStream.rangeClosed(1, emailCount) + .forEach(Throwing.intConsumer(i -> enQueue(defaultMail() + .name(namePattern.apply(i)) + .build()))); + } + + private void dequeueMails(int times) { + Flux.from(getManageableMailQueue() + .deQueue()) + .take(times) + .flatMap(mailQueueItem -> Mono.fromCallable(() -> { + mailQueueItem.done(true); + return mailQueueItem; + })) + .blockLast(); + } } - @Override - public ManageableMailQueue getManageableMailQueue() { - return mailQueue; - } - - @Test - void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception { - ManageableMailQueue mailQueue = getManageableMailQueue(); - int emailCount = 5; - - clock.setInstant(IN_SLICE_1); - enqueueSomeMails(namePatternForSlice(1), emailCount); - - clock.setInstant(IN_SLICE_2); - enqueueSomeMails(namePatternForSlice(2), emailCount); - - clock.setInstant(IN_SLICE_3); - enqueueSomeMails(namePatternForSlice(3), emailCount); - - clock.setInstant(IN_SLICE_5); - enqueueSomeMails(namePatternForSlice(5), emailCount); - - clock.setInstant(IN_SLICE_7); - Stream<String> names = Iterators.toStream(mailQueue.browse()) - .map(ManageableMailQueue.MailQueueItemView::getMail) - .map(Mail::getName); - - assertThat(names).containsExactly( - "1-1", "1-2", "1-3", "1-4", "1-5", - "2-1", "2-2", "2-3", "2-4", "2-5", - "3-1", "3-2", "3-3", "3-4", "3-5", - "5-1", "5-2", "5-3", "5-4", "5-5"); - } - - @Test - void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception { - ManageableMailQueue mailQueue = getManageableMailQueue(); - int emailCount = 5; - - clock.setInstant(IN_SLICE_1); - enqueueSomeMails(namePatternForSlice(1), emailCount); + @Nested + class MailQueueSizeMetricsDisabled { + @RegisterExtension + MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension(); - clock.setInstant(IN_SLICE_2); - enqueueSomeMails(namePatternForSlice(2), emailCount); + @BeforeEach + void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception { + CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); + clock = new UpdatableTickingClock(IN_SLICE_1); - clock.setInstant(IN_SLICE_3); - enqueueSomeMails(namePatternForSlice(3), emailCount); - - clock.setInstant(IN_SLICE_5); - enqueueSomeMails(namePatternForSlice(5), emailCount); - - clock.setInstant(IN_SLICE_7); - dequeueMails(5); - dequeueMails(5); - dequeueMails(3); - - Stream<String> names = Iterators.toStream(mailQueue.browse()) - .map(ManageableMailQueue.MailQueueItemView::getMail) - .map(Mail::getName); - - assertThat(names) - .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"); - } - - private Function<Integer, String> namePatternForSlice(int sliceId) { - return i -> sliceId + "-" + i; - } - - @Test - void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) { - String name = "myQueue"; - mailQueueFactory.createQueue(name); - - boolean initialized = CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), MailQueueName.fromString(name)); - assertThat(initialized).isTrue(); - } - - @Test - void enQueueShouldNotThrowOnMailNameWithNegativeHash() { - String negativehashedString = "this sting will have a negative hash"; //hash value: -1256871313 - - assertThatCode(() -> getMailQueue().enQueue(defaultMail().name(negativehashedString).build())) - .doesNotThrowAnyException(); - } - - @Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable." + - "The related test is disabled, and need to be re-enabled after investigation and a fix.") - @Test - @Override - public void concurrentEnqueueDequeueShouldNotFail() { - - } - - private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) { - IntStream.rangeClosed(1, emailCount) - .forEach(Throwing.intConsumer(i -> enQueue(defaultMail() - .name(namePattern.apply(i)) - .build()))); - } - - private void dequeueMails(int times) { - Flux.from(getManageableMailQueue() - .deQueue()) - .take(times) - .flatMap(mailQueueItem -> Mono.fromCallable(() -> { - mailQueueItem.done(true); - return mailQueueItem; - })) - .blockLast(); + MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), + CassandraMailQueueViewConfiguration.builder() + .bucketCount(THREE_BUCKET_COUNT) + .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) + .sliceWindow(ONE_HOUR_SLICE_WINDOW) + .build(), + mimeMessageStoreFactory); + + RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder() + .sizeMetricsEnabled(false) + .build(); + + RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); + RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( + metricTestSystem.getMetricFactory(), + metricTestSystem.getSpyGaugeRegistry(), + rabbitClient, + mimeMessageStoreFactory, + BLOB_ID_FACTORY, + mailQueueViewFactory, + clock, + new RawMailQueueItemDecoratorFactory(), + configuration); + mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); + mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); + mailQueue = mailQueueFactory.createQueue(SPOOL); + } + + @Test + void constructorShouldNotRegisterGetQueueSizeGaugeWhenSizeMetricsDisabled(MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) { + ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class); + verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture()); + } } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 6280d41..625f988 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -37,6 +37,7 @@ import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueFactoryContract; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; +import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.AfterEach; @@ -61,6 +62,10 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM when(mailQueueViewFactory.create(any())) .thenReturn(mailQueueView); + RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder() + .sizeMetricsEnabled(true) + .build(); + RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), @@ -70,7 +75,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM BLOB_ID_FACTORY, mailQueueViewFactory, Clock.systemUTC(), - new RawMailQueueItemDecoratorFactory()); + new RawMailQueueItemDecoratorFactory(), + configuration); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfigurationTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfigurationTest.java new file mode 100644 index 0000000..fefaff8 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfigurationTest.java @@ -0,0 +1,52 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +class RabbitMQMailQueueConfigurationTest { + @Test + void shouldMatchBeanContract() { + EqualsVerifier.forClass(RabbitMQMailQueueConfiguration.class).verify(); + } + + @Test + void fromShouldReturnDefaultForEmptyConfiguration() { + RabbitMQMailQueueConfiguration actual = RabbitMQMailQueueConfiguration.from(new PropertiesConfiguration()); + + assertThat(actual) + .isEqualTo(RabbitMQMailQueueConfiguration.sizeMetricsEnabled()); + } + + @Test + void fromShouldReturnConfiguredSizeMetricsEnabled() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty(RabbitMQMailQueueConfiguration.SIZE_METRICS_ENABLED_PROPERTY, false); + RabbitMQMailQueueConfiguration actual = RabbitMQMailQueueConfiguration.from(configuration); + + assertThat(actual.isSizeMetricsEnabled()) + .isEqualTo(false); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org