JAMES-2334 Demonstrate that workQueue is working

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b07fd7e0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b07fd7e0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b07fd7e0

Branch: refs/heads/master
Commit: b07fd7e0abdc60a37f41f26e8daa152d49adf596
Parents: 9f78f0d
Author: benwa <[email protected]>
Authored: Wed Feb 7 10:28:01 2018 +0700
Committer: Matthieu Baechler <[email protected]>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/queue/rabbitmq/InMemoryConsumer.java  |  49 ++++++
 .../james/queue/rabbitmq/RabbitMQFixture.java   |   3 +
 .../james/queue/rabbitmq/RabbitMQTest.java      | 154 +++++++++++++------
 3 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
new file mode 100644
index 0000000..f5f8fa2
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+
+public class InMemoryConsumer extends DefaultConsumer {
+
+    private final ConcurrentLinkedQueue<Integer> messages;
+
+    public InMemoryConsumer(Channel channel) {
+        super(channel);
+        messages = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
+        Integer payload = Integer.valueOf(new String(body, 
StandardCharsets.UTF_8));
+        messages.add(payload);
+    }
+
+    public Queue<Integer> getConsumedMessages() {
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
index e216690..3ed6237 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
@@ -34,6 +34,9 @@ public class RabbitMQFixture {
     public static final String EXCHANGE_NAME = "exchangeName";
     public static final String ROUTING_KEY = "routingKey";
     public static final String DIRECT = "direct";
+    public static final boolean EXCLUSIVE = true;
+    public static final boolean AUTO_DELETE = true;
+    public static final String WORK_QUEUE = "workQueue";
 
     public static Duration slowPacedPollInterval = FIVE_HUNDRED_MILLISECONDS;
     public static ConditionFactory calmlyAwait = Awaitility.with()

http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index cad0303..2463516 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -19,34 +19,38 @@
 package org.apache.james.queue.rabbitmq;
 
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.WORK_QUEUE;
 import static 
org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
 
 @ExtendWith(DockerRabbitMQExtension.class)
 class RabbitMQTest {
@@ -87,44 +91,65 @@ class RabbitMQTest {
                 return false;
             }
         }
-
     }
 
     @Nested
-    class BroadcastTest {
+    class FourConnections {
 
         private ConnectionFactory connectionFactory1;
         private ConnectionFactory connectionFactory2;
         private ConnectionFactory connectionFactory3;
         private ConnectionFactory connectionFactory4;
+        private Connection connection1;
+        private Connection connection2;
+        private Connection connection3;
+        private Connection connection4;
+        private Channel publisherChannel;
+        private Channel subscriberChannel2;
+        private Channel subscriberChannel3;
+        private Channel subscriberChannel4;
 
         @BeforeEach
-        public void setup(DockerRabbitMQ rabbitMQ) {
+        void setup(DockerRabbitMQ rabbitMQ) throws IOException, 
TimeoutException {
             connectionFactory1 = rabbitMQ.connectionFactory();
             connectionFactory2 = rabbitMQ.connectionFactory();
             connectionFactory3 = rabbitMQ.connectionFactory();
             connectionFactory4 = rabbitMQ.connectionFactory();
+            connection1 = connectionFactory1.newConnection();
+            connection2 = connectionFactory2.newConnection();
+            connection3 = connectionFactory3.newConnection();
+            connection4 = connectionFactory4.newConnection();
+            publisherChannel = connection1.createChannel();
+            subscriberChannel2 = connection2.createChannel();
+            subscriberChannel3 = connection3.createChannel();
+            subscriberChannel4 = connection4.createChannel();
         }
 
-        // In the following case, each consumer will receive the messages 
produced by the
-        // producer
-        // To do so, each consumer will bind it's queue to the producer 
exchange.
-        @Test
-        public void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
-            ImmutableList<Integer> expectedResult = IntStream.range(0, 
10).boxed().collect(Guavate.toImmutableList());
-            ConcurrentLinkedQueue<Integer> results2 = new 
ConcurrentLinkedQueue<>();
-            ConcurrentLinkedQueue<Integer> results3 = new 
ConcurrentLinkedQueue<>();
-            ConcurrentLinkedQueue<Integer> results4 = new 
ConcurrentLinkedQueue<>();
-
-            try (Connection connection1 = connectionFactory1.newConnection();
-                 Channel publisherChannel = connection1.createChannel();
-                 Connection connection2 = connectionFactory2.newConnection();
-                 Channel subscriberChannel2 = connection2.createChannel();
-                 Connection connection3 = connectionFactory3.newConnection();
-                 Channel subscriberChannel3 = connection3.createChannel();
-                 Connection connection4 = connectionFactory4.newConnection();
-                 Channel subscriberChannel4 = connection4.createChannel()) {
+        @AfterEach
+        void tearDown() {
+            closeQuietly(
+                publisherChannel, subscriberChannel2, subscriberChannel3, 
subscriberChannel4,
+                connection1, connection2, connection3, connection4);
+        }
+
+        private void closeQuietly(AutoCloseable... closeables) {
+            for (AutoCloseable closeable : closeables) {
+                try {
+                    closeable.close();
+                } catch (Exception e) {
+                    //ignoring exception
+                }
+            }
+        }
+
+        @Nested
+        class BroadCast {
 
+            // In the following case, each consumer will receive the messages 
produced by the
+            // producer
+            // To do so, each consumer will bind it's queue to the producer 
exchange.
+            @Test
+            void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
                 // Declare a single exchange and three queues attached to it.
                 publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, 
DURABLE);
 
@@ -135,9 +160,12 @@ class RabbitMQTest {
                 String queue4 = subscriberChannel4.queueDeclare().getQueue();
                 subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, 
ROUTING_KEY);
 
-                subscriberChannel2.basicConsume(queue2, 
storeInResultCallBack(subscriberChannel2, results2));
-                subscriberChannel3.basicConsume(queue3, 
storeInResultCallBack(subscriberChannel3, results3));
-                subscriberChannel4.basicConsume(queue4, 
storeInResultCallBack(subscriberChannel4, results4));
+                InMemoryConsumer consumer2 = new 
InMemoryConsumer(subscriberChannel2);
+                InMemoryConsumer consumer3 = new 
InMemoryConsumer(subscriberChannel3);
+                InMemoryConsumer consumer4 = new 
InMemoryConsumer(subscriberChannel4);
+                subscriberChannel2.basicConsume(queue2, consumer2);
+                subscriberChannel3.basicConsume(queue3, consumer3);
+                subscriberChannel4.basicConsume(queue4, consumer4);
 
                 // the publisher will produce 10 messages
                 IntStream.range(0, 10)
@@ -146,30 +174,68 @@ class RabbitMQTest {
                     .forEach(Throwing.consumer(
                         bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-                awaitAtMostOneMinute.until(() -> 
allMessageReceived(expectedResult, results2, results3, results4));
+                awaitAtMostOneMinute.until(
+                    () -> countReceivedMessages(consumer2, consumer3, 
consumer4) == 30);
 
+                ImmutableList<Integer> expectedResult = IntStream.range(0, 
10).boxed().collect(Guavate.toImmutableList());
                 // Check every subscriber have receive all the messages.
-                assertThat(results2).containsOnlyElementsOf(expectedResult);
-                assertThat(results3).containsOnlyElementsOf(expectedResult);
-                assertThat(results4).containsOnlyElementsOf(expectedResult);
+                
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+                
assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+                
assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
             }
         }
 
-        private boolean allMessageReceived(ImmutableList<Integer> 
expectedResult, ConcurrentLinkedQueue<Integer> results2, 
ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> 
results4) {
-            return Iterables.size(
-                Iterables.concat(results2, results3, results4))
-                == expectedResult.size() * 3;
+        @Nested
+        class WorkQueue {
+
+            // In the following case, consumers will receive the messages 
produced by the
+            // producer but will share them.
+            // To do so, we will bind a single queue to the producer exchange.
+            @Test
+            void rabbitMQShouldSupportTheWorkQueueCase() throws Exception {
+                int nbMessages = 100;
+
+                // Declare the exchange and a single queue attached to it.
+                publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", 
DURABLE);
+                publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, 
AUTO_DELETE, ImmutableMap.of());
+                publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, 
ROUTING_KEY);
+
+                // Publisher will produce 100 messages
+                IntStream.range(0, nbMessages)
+                    .mapToObj(String::valueOf)
+                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .forEach(Throwing.consumer(
+                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                InMemoryConsumer consumer2 = new 
InMemoryConsumer(subscriberChannel2);
+                InMemoryConsumer consumer3 = new 
InMemoryConsumer(subscriberChannel3);
+                InMemoryConsumer consumer4 = new 
InMemoryConsumer(subscriberChannel4);
+                subscriberChannel2.basicConsume(WORK_QUEUE, consumer2);
+                subscriberChannel3.basicConsume(WORK_QUEUE, consumer3);
+                subscriberChannel4.basicConsume(WORK_QUEUE, consumer4);
+
+                awaitAtMostOneMinute.until(
+                    () -> countReceivedMessages(consumer2, consumer3, 
consumer4) == nbMessages);
+
+                ImmutableList<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Guavate.toImmutableList());
+
+                assertThat(
+                    Iterables.concat(
+                        consumer2.getConsumedMessages(),
+                        consumer3.getConsumedMessages(),
+                        consumer4.getConsumedMessages()))
+                    .containsOnlyElementsOf(expectedResult);
+            }
+
         }
 
-        private DefaultConsumer storeInResultCallBack(Channel channel, 
ConcurrentLinkedQueue<Integer> results) {
-            return new DefaultConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag, Envelope 
envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    Integer payload = Integer.valueOf(new String(body, 
StandardCharsets.UTF_8));
-                    results.add(payload);
-                }
-            };
+        private long countReceivedMessages(InMemoryConsumer... consumers) {
+            return Arrays.stream(consumers)
+                .map(InMemoryConsumer::getConsumedMessages)
+                .mapToLong(Queue::size)
+                .sum();
         }
+
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to