This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 890f20c323e94a3ee95e2eab455e9e4846d8015a Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Tue Mar 3 14:13:48 2020 +0100 JAMES-3080 use reworked rabbitMQrestart method in tests and make the code more resilient --- .../distributed/RabbitMQWorkQueue.java | 22 +++++++++++++++++++--- .../RabbitMQWorkQueuePersistenceTest.java | 20 +++++--------------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 1efa7cd..005b86d 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -23,6 +23,7 @@ package org.apache.james.task.eventsourcing.distributed; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Optional; import java.util.UUID; @@ -69,6 +70,10 @@ public class RabbitMQWorkQueue implements WorkQueue { private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = "taskManagerCancelRequestsQueue"; public static final String TASK_ID = "taskId"; + public static final int NUM_RETRIES = 8; + public static final Duration FIRST_BACKOFF = Duration.ofMillis(100); + public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); + private final TaskManagerWorker worker; private final ReactorRabbitMQChannelPool channelPool; private final JsonTaskSerializer taskSerializer; @@ -99,9 +104,20 @@ public class RabbitMQWorkQueue implements WorkQueue { @VisibleForTesting void declareQueue() { - channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); - channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block(); - channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block(); + Mono<AMQP.Exchange.DeclareOk> declareExchange = channelPool.getSender() + .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)) + .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER); + Mono<AMQP.Queue.DeclareOk> declareQueue = channelPool.getSender() + .declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)) + .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER); + Mono<AMQP.Queue.BindOk> bindQueueToExchange = channelPool.getSender() + .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)) + .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER); + + declareExchange + .then(declareQueue) + .then(bindQueueToExchange) + .block(); } private void consumeWorkqueue() { diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java index 7040957..298b45d 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java @@ -21,8 +21,6 @@ package org.apache.james.task.eventsourcing.distributed; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS; -import static org.awaitility.Duration.FIVE_SECONDS; import static org.mockito.Mockito.spy; import org.apache.james.backends.rabbitmq.RabbitMQExtension; @@ -33,6 +31,7 @@ import org.apache.james.task.MemoryReferenceTask; import org.apache.james.task.Task; import org.apache.james.task.TaskId; import org.apache.james.task.TaskWithId; +import org.awaitility.Duration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -72,7 +71,6 @@ class RabbitMQWorkQueuePersistenceTest { @Test void submittedMessageShouldSurviveRabbitMQRestart() throws Exception { Task TASK = new MemoryReferenceTask(() -> Task.Result.COMPLETED); - TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK); testee.submit(TASK_WITH_ID); @@ -81,27 +79,19 @@ class RabbitMQWorkQueuePersistenceTest { Thread.sleep(500); testee.close(); - restartRabbitMQ(); + rabbitMQExtension.getRabbitMQ().restart(); - startNewWorkqueue(); + startNewConsumingWorkqueue(); - await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty()); + await().atMost(Duration.ONE_MINUTE).until(() -> !worker.results.isEmpty()); assertThat(worker.tasks).containsExactly(TASK_WITH_ID); assertThat(worker.results).containsExactly(Task.Result.COMPLETED); } - private void startNewWorkqueue() { + private void startNewConsumingWorkqueue() { worker = spy(new ImmediateWorker()); testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer); testee.start(); } - - private void restartRabbitMQ() throws Exception { - rabbitMQExtension.getRabbitMQ().stopApp(); - rabbitMQExtension.getRabbitMQ().startApp(); - //wait until healthcheck is ok - await().atMost(FIVE_SECONDS).until(() -> rabbitMQExtension.managementAPI().listQueues().size() > 0); - rabbitMQExtension.getRabbitChannelPool().start(); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org