JAMES-2334 Test cross cluster operations
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/00adfa82 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/00adfa82 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/00adfa82 Branch: refs/heads/master Commit: 00adfa82472cf1ed8e4cc04a5d386c397cda9801 Parents: 749e641 Author: Matthieu Baechler <[email protected]> Authored: Mon May 28 15:30:14 2018 +0200 Committer: Matthieu Baechler <[email protected]> Committed: Thu May 31 09:47:02 2018 +0200 ---------------------------------------------------------------------- .../queue/rabbitmq/RabbitMQClusterTest.java | 121 ++++++++++--------- 1 file changed, 66 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/00adfa82/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java index f88ecf4..4a7dd07 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java @@ -23,17 +23,21 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES; -import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; import static org.assertj.core.api.Assertions.assertThat; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -46,15 +50,36 @@ import com.rabbitmq.client.ConnectionFactory; @ExtendWith(DockerClusterRabbitMQExtention.class) class RabbitMQClusterTest { - public static final String QUEUE = "queue"; + private static final String QUEUE = "queue"; + + private Connection node1Connection; + private Channel node1Channel; + private Connection node2Connection; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node1Connection = node1ConnectionFactory.newConnection(); + node2Connection = node2ConnectionFactory.newConnection(); + node1Channel = node1Connection.createChannel(); + node2Channel = node2Connection.createChannel(); + } @AfterEach - public void tearDown(DockerRabbitMQCluster cluster) throws Exception { - cluster.getRabbitMQ2() - .connectionFactory() - .newConnection() - .createChannel() - .queueDelete(QUEUE); + void tearDown() { + closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); + } + + private void closeQuietly(AutoCloseable... closeables) { + for (AutoCloseable closeable : closeables) { + try { + closeable.close(); + } catch (Exception e) { + //ignoring exception + } + } } @Test @@ -71,64 +96,50 @@ class RabbitMQClusterTest { } @Test - public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception { - ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>(); + void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory(); - ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory(); + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - try (Connection connection = connectionFactory1.newConnection(); - Channel channel = connection.createChannel()) { - - channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); - channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - - MESSAGES_AS_BYTES.forEach(Throwing.consumer( - bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - } - try (Connection connection2 = connectionFactory2.newConnection(); - Channel channel2 = connection2.createChannel()) { + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - channel2.basicConsume(QUEUE, consumer2); + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size()); - - assertThat(consumer2.getConsumedMessages()) - .containsOnlyElementsOf(MESSAGES); - } - - assertThat(result) - .containsOnlyElementsOf(MESSAGES); + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); } @Test - public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception { - ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory(); - ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory(); + void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - try (Connection connection = connectionFactory1.newConnection(); - Channel channel = connection.createChannel(); - Connection connection2 = connectionFactory2.newConnection(); - Channel channel2 = connection2.createChannel()) { + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); - MESSAGES_AS_BYTES.forEach(Throwing.consumer( - bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); - InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); - channel2.basicConsume(QUEUE, consumer2); - - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size()); + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } - assertThat(consumer2.getConsumedMessages()) - .containsOnlyElementsOf(MESSAGES); - } + private byte[] asBytes(String message) { + return message.getBytes(StandardCharsets.UTF_8); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
