This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 65f52d5c6c72408469345e07c7db1b32a0352a4f Author: Rémi KOWALSKI <[email protected]> AuthorDate: Mon Mar 2 15:05:35 2020 +0100 JAMES-3083 fix test demonstrating rabbitMQ durability --- .../james/backends/rabbitmq/DockerRabbitMQ.java | 7 ++++--- .../james/backends/rabbitmq/RabbitMQTest.java | 21 ++++++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java index 69db7ba..60d4db0 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java @@ -146,9 +146,10 @@ public class DockerRabbitMQ { container.stop(); } - public void restart() { - DockerClientFactory.instance().client() - .restartContainerCmd(container.getContainerId()); + public void restart() throws Exception { + stopApp(); + startApp(); + waitForReadyness(); } public GenericContainer<?> container() { diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java index a41a903..79020df 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java @@ -18,6 +18,7 @@ ****************************************************************/ package org.apache.james.backends.rabbitmq; +import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; import static org.apache.james.backends.rabbitmq.Constants.AUTO_ACK; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE; @@ -39,6 +40,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,6 +59,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -66,6 +69,7 @@ import com.rabbitmq.client.Delivery; class RabbitMQTest { + public static final ImmutableMap<String, Object> NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of(); @RegisterExtension static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ(); @@ -79,6 +83,7 @@ class RabbitMQTest { @BeforeEach void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { connectionFactory = rabbitMQ.connectionFactory(); + connectionFactory.setNetworkRecoveryInterval(1000); connection = connectionFactory.newConnection(); channel = connection.createChannel(); } @@ -101,9 +106,12 @@ class RabbitMQTest { String queueName = createQueue(channel); publishAMessage(channel); + //wait for message to be effectively published + Thread.sleep(200); rabbitMQ.restart(); - awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ)); + + Thread.sleep(connectionFactory.getNetworkRecoveryInterval()); assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull(); } @@ -118,13 +126,20 @@ class RabbitMQTest { private String createQueue(Channel channel) throws IOException { channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE); - String queueName = channel.queueDeclare().getQueue(); + String queueName = UUID.randomUUID().toString(); + channel.queueDeclare(queueName, DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); return queueName; } private void publishAMessage(Channel channel) throws IOException { - channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!")); + AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() + .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode()) + .priority(PERSISTENT_TEXT_PLAIN.getPriority()) + .contentType(PERSISTENT_TEXT_PLAIN.getContentType()) + .build(); + + channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, asBytes("Hello, world!")); } private Boolean messageReceived(Channel channel, String queueName) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
