JAMES-2334 Demonstrate that routing is working

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/94ddf6a0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/94ddf6a0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/94ddf6a0

Branch: refs/heads/master
Commit: 94ddf6a0817b5adcf32666075cc1514ebd96e67e
Parents: b07fd7e
Author: Matthieu Baechler <[email protected]>
Authored: Mon May 28 14:55:06 2018 +0200
Committer: Matthieu Baechler <[email protected]>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/queue/rabbitmq/RabbitMQTest.java      | 136 +++++++++++++------
 1 file changed, 96 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/94ddf6a0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 2463516..5e8f311 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -55,8 +55,6 @@ import com.rabbitmq.client.ConnectionFactory;
 @ExtendWith(DockerRabbitMQExtension.class)
 class RabbitMQTest {
 
-    private static final byte[] PAYLOAD = "Hello, 
world!".getBytes(StandardCharsets.UTF_8);
-
     @Nested
     class SingleConsumerTest {
 
@@ -81,7 +79,7 @@ class RabbitMQTest {
         }
 
         private void publishAMessage(Channel channel) throws IOException {
-            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, 
PAYLOAD);
+            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, 
asBytes("Hello, world!"));
         }
 
         private Boolean messageReceived(Channel channel, String queueName) {
@@ -104,10 +102,10 @@ class RabbitMQTest {
         private Connection connection2;
         private Connection connection3;
         private Connection connection4;
-        private Channel publisherChannel;
-        private Channel subscriberChannel2;
-        private Channel subscriberChannel3;
-        private Channel subscriberChannel4;
+        private Channel channel1;
+        private Channel channel2;
+        private Channel channel3;
+        private Channel channel4;
 
         @BeforeEach
         void setup(DockerRabbitMQ rabbitMQ) throws IOException, 
TimeoutException {
@@ -119,16 +117,16 @@ class RabbitMQTest {
             connection2 = connectionFactory2.newConnection();
             connection3 = connectionFactory3.newConnection();
             connection4 = connectionFactory4.newConnection();
-            publisherChannel = connection1.createChannel();
-            subscriberChannel2 = connection2.createChannel();
-            subscriberChannel3 = connection3.createChannel();
-            subscriberChannel4 = connection4.createChannel();
+            channel1 = connection1.createChannel();
+            channel2 = connection2.createChannel();
+            channel3 = connection3.createChannel();
+            channel4 = connection4.createChannel();
         }
 
         @AfterEach
         void tearDown() {
             closeQuietly(
-                publisherChannel, subscriberChannel2, subscriberChannel3, 
subscriberChannel4,
+                channel1, channel2, channel3, channel4,
                 connection1, connection2, connection3, connection4);
         }
 
@@ -151,28 +149,28 @@ class RabbitMQTest {
             @Test
             void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
                 // Declare a single exchange and three queues attached to it.
-                publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, 
DURABLE);
-
-                String queue2 = subscriberChannel2.queueDeclare().getQueue();
-                subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, 
ROUTING_KEY);
-                String queue3 = subscriberChannel3.queueDeclare().getQueue();
-                subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, 
ROUTING_KEY);
-                String queue4 = subscriberChannel4.queueDeclare().getQueue();
-                subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, 
ROUTING_KEY);
-
-                InMemoryConsumer consumer2 = new 
InMemoryConsumer(subscriberChannel2);
-                InMemoryConsumer consumer3 = new 
InMemoryConsumer(subscriberChannel3);
-                InMemoryConsumer consumer4 = new 
InMemoryConsumer(subscriberChannel4);
-                subscriberChannel2.basicConsume(queue2, consumer2);
-                subscriberChannel3.basicConsume(queue3, consumer3);
-                subscriberChannel4.basicConsume(queue4, consumer4);
+                channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue2 = channel2.queueDeclare().getQueue();
+                channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
+                String queue3 = channel3.queueDeclare().getQueue();
+                channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
+                String queue4 = channel4.queueDeclare().getQueue();
+                channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
+
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel2.basicConsume(queue2, consumer2);
+                channel3.basicConsume(queue3, consumer3);
+                channel4.basicConsume(queue4, consumer4);
 
                 // the publisher will produce 10 messages
                 IntStream.range(0, 10)
                     .mapToObj(String::valueOf)
-                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .map(RabbitMQTest.this::asBytes)
                     .forEach(Throwing.consumer(
-                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
 
                 awaitAtMostOneMinute.until(
                     () -> countReceivedMessages(consumer2, consumer3, 
consumer4) == 30);
@@ -196,23 +194,23 @@ class RabbitMQTest {
                 int nbMessages = 100;
 
                 // Declare the exchange and a single queue attached to it.
-                publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", 
DURABLE);
-                publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, 
AUTO_DELETE, ImmutableMap.of());
-                publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, 
ROUTING_KEY);
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, 
AUTO_DELETE, ImmutableMap.of());
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
                 // Publisher will produce 100 messages
                 IntStream.range(0, nbMessages)
                     .mapToObj(String::valueOf)
-                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .map(RabbitMQTest.this::asBytes)
                     .forEach(Throwing.consumer(
-                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-                InMemoryConsumer consumer2 = new 
InMemoryConsumer(subscriberChannel2);
-                InMemoryConsumer consumer3 = new 
InMemoryConsumer(subscriberChannel3);
-                InMemoryConsumer consumer4 = new 
InMemoryConsumer(subscriberChannel4);
-                subscriberChannel2.basicConsume(WORK_QUEUE, consumer2);
-                subscriberChannel3.basicConsume(WORK_QUEUE, consumer3);
-                subscriberChannel4.basicConsume(WORK_QUEUE, consumer4);
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel2.basicConsume(WORK_QUEUE, consumer2);
+                channel3.basicConsume(WORK_QUEUE, consumer3);
+                channel4.basicConsume(WORK_QUEUE, consumer4);
 
                 awaitAtMostOneMinute.until(
                     () -> countReceivedMessages(consumer2, consumer3, 
consumer4) == nbMessages);
@@ -229,6 +227,61 @@ class RabbitMQTest {
 
         }
 
+        @Nested
+        class Routing {
+            @Test
+            void rabbitMQShouldSupportRouting() throws Exception {
+                String conversation1 = "c1";
+                String conversation2 = "c2";
+                String conversation3 = "c3";
+                String conversation4 = "c4";
+
+                // Declare the exchange and a single queue attached to it.
+                channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue1 = channel1.queueDeclare().getQueue();
+                // 1 will follow conversation 1 and 2
+                channel1.queueBind(queue1, EXCHANGE_NAME, conversation1);
+                channel1.queueBind(queue1, EXCHANGE_NAME, conversation2);
+
+                String queue2 = channel2.queueDeclare().getQueue();
+                // 2 will follow conversation 2 and 3
+                channel2.queueBind(queue2, EXCHANGE_NAME, conversation2);
+                channel2.queueBind(queue2, EXCHANGE_NAME, conversation3);
+
+                String queue3 = channel3.queueDeclare().getQueue();
+                // 3 will follow conversation 3 and 4
+                channel3.queueBind(queue3, EXCHANGE_NAME, conversation3);
+                channel3.queueBind(queue3, EXCHANGE_NAME, conversation4);
+
+                String queue4 = channel4.queueDeclare().getQueue();
+                // 4 will follow conversation 1 and 4
+                channel4.queueBind(queue4, EXCHANGE_NAME, conversation1);
+                channel4.queueBind(queue4, EXCHANGE_NAME, conversation4);
+
+                channel1.basicPublish(EXCHANGE_NAME, conversation1, 
NO_PROPERTIES, asBytes("1"));
+                channel2.basicPublish(EXCHANGE_NAME, conversation2, 
NO_PROPERTIES, asBytes("2"));
+                channel3.basicPublish(EXCHANGE_NAME, conversation3, 
NO_PROPERTIES, asBytes("3"));
+                channel4.basicPublish(EXCHANGE_NAME, conversation4, 
NO_PROPERTIES, asBytes("4"));
+
+                InMemoryConsumer consumer1 = new InMemoryConsumer(channel1);
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel1.basicConsume(queue1, consumer1);
+                channel2.basicConsume(queue2, consumer2);
+                channel3.basicConsume(queue3, consumer3);
+                channel4.basicConsume(queue4, consumer4);
+
+                awaitAtMostOneMinute.until(() -> 
countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8);
+
+                assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2);
+                assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3);
+                assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4);
+                assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4);
+            }
+        }
+
         private long countReceivedMessages(InMemoryConsumer... consumers) {
             return Arrays.stream(consumers)
                 .map(InMemoryConsumer::getConsumedMessages)
@@ -238,5 +291,8 @@ class RabbitMQTest {
 
     }
 
+    private byte[] asBytes(String message) {
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to