JAMES-2334 Test cross cluster operations

 - Pub/sub on different nodes
 - Exchange/queue binding on different nodes


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/749e6413
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/749e6413
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/749e6413

Branch: refs/heads/master
Commit: 749e64138816e220b96134389ddd5a03cac188d5
Parents: 4439551
Author: benwa <[email protected]>
Authored: Fri Feb 9 10:20:17 2018 +0700
Committer: Matthieu Baechler <[email protected]>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 91 ++++++++++++++++++++
 1 file changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/749e6413/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 8874641..f88ecf4 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -18,15 +18,45 @@
  ****************************************************************/
 package org.apache.james.queue.rabbitmq;
 
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
+import static 
org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static 
org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.concurrent.ConcurrentLinkedDeque;
+
 import 
org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 
 @ExtendWith(DockerClusterRabbitMQExtention.class)
 class RabbitMQClusterTest {
 
+    public static final String QUEUE = "queue";
+
+    @AfterEach
+    public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
+        cluster.getRabbitMQ2()
+            .connectionFactory()
+            .newConnection()
+            .createChannel()
+            .queueDelete(QUEUE);
+    }
+
     @Test
     void 
rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster 
cluster) throws Exception {
         String stdout = cluster.getRabbitMQ1().container()
@@ -40,4 +70,65 @@ class RabbitMQClusterTest {
                 DockerClusterRabbitMQExtention.RABBIT_3);
     }
 
+    @Test
+    public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws 
Exception {
+        ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+
+        ConnectionFactory connectionFactory1 = 
cluster.getRabbitMQ1().connectionFactory();
+        ConnectionFactory connectionFactory2 = 
cluster.getRabbitMQ2().connectionFactory();
+
+        try (Connection connection = connectionFactory1.newConnection();
+             Channel channel = connection.createChannel()) {
+
+            channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+            channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, 
ImmutableMap.of()).getQueue();
+            channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
+                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, 
NO_PROPERTIES, bytes)));
+        }
+
+        try (Connection connection2 = connectionFactory2.newConnection();
+             Channel channel2 = connection2.createChannel()) {
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+            channel2.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> 
consumer2.getConsumedMessages().size() == MESSAGES.size());
+
+            assertThat(consumer2.getConsumedMessages())
+                .containsOnlyElementsOf(MESSAGES);
+        }
+
+        assertThat(result)
+            .containsOnlyElementsOf(MESSAGES);
+    }
+
+    @Test
+    public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster 
cluster) throws Exception {
+        ConnectionFactory connectionFactory1 = 
cluster.getRabbitMQ1().connectionFactory();
+        ConnectionFactory connectionFactory2 = 
cluster.getRabbitMQ2().connectionFactory();
+
+        try (Connection connection = connectionFactory1.newConnection();
+             Channel channel = connection.createChannel();
+             Connection connection2 = connectionFactory2.newConnection();
+             Channel channel2 = connection2.createChannel()) {
+
+            channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, 
ImmutableMap.of()).getQueue();
+            channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
+                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, 
NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+            channel2.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> 
consumer2.getConsumedMessages().size() == MESSAGES.size());
+
+            assertThat(consumer2.getConsumedMessages())
+                .containsOnlyElementsOf(MESSAGES);
+        }
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to