JAMES-2334 Test cross cluster operations - Pub/sub on different nodes - Exchange/queue binding on different nodes
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/749e6413 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/749e6413 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/749e6413 Branch: refs/heads/master Commit: 749e64138816e220b96134389ddd5a03cac188d5 Parents: 4439551 Author: benwa <[email protected]> Authored: Fri Feb 9 10:20:17 2018 +0700 Committer: Matthieu Baechler <[email protected]> Committed: Thu May 31 09:47:02 2018 +0200 ---------------------------------------------------------------------- .../queue/rabbitmq/RabbitMQClusterTest.java | 91 ++++++++++++++++++++ 1 file changed, 91 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/749e6413/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java index 8874641..f88ecf4 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java @@ -18,15 +18,45 @@ ****************************************************************/ package org.apache.james.queue.rabbitmq; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.ConcurrentLinkedDeque; + import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +import com.github.fge.lambdas.Throwing; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; @ExtendWith(DockerClusterRabbitMQExtention.class) class RabbitMQClusterTest { + public static final String QUEUE = "queue"; + + @AfterEach + public void tearDown(DockerRabbitMQCluster cluster) throws Exception { + cluster.getRabbitMQ2() + .connectionFactory() + .newConnection() + .createChannel() + .queueDelete(QUEUE); + } + @Test void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { String stdout = cluster.getRabbitMQ1().container() @@ -40,4 +70,65 @@ class RabbitMQClusterTest { DockerClusterRabbitMQExtention.RABBIT_3); } + @Test + public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception { + ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>(); + + ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory(); + ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory(); + + try (Connection connection = connectionFactory1.newConnection(); + Channel channel = connection.createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + MESSAGES_AS_BYTES.forEach(Throwing.consumer( + bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + } + + try (Connection connection2 = connectionFactory2.newConnection(); + Channel channel2 = connection2.createChannel()) { + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + channel2.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size()); + + assertThat(consumer2.getConsumedMessages()) + .containsOnlyElementsOf(MESSAGES); + } + + assertThat(result) + .containsOnlyElementsOf(MESSAGES); + } + + @Test + public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception { + ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory(); + ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory(); + + try (Connection connection = connectionFactory1.newConnection(); + Channel channel = connection.createChannel(); + Connection connection2 = connectionFactory2.newConnection(); + Channel channel2 = connection2.createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + MESSAGES_AS_BYTES.forEach(Throwing.consumer( + bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + channel2.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size()); + + assertThat(consumer2.getConsumedMessages()) + .containsOnlyElementsOf(MESSAGES); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
