JAMES-2334 Demonstrate that workQueue is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b07fd7e0 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b07fd7e0 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b07fd7e0 Branch: refs/heads/master Commit: b07fd7e0abdc60a37f41f26e8daa152d49adf596 Parents: 9f78f0d Author: benwa <[email protected]> Authored: Wed Feb 7 10:28:01 2018 +0700 Committer: Matthieu Baechler <[email protected]> Committed: Thu May 31 09:47:02 2018 +0200 ---------------------------------------------------------------------- .../james/queue/rabbitmq/InMemoryConsumer.java | 49 ++++++ .../james/queue/rabbitmq/RabbitMQFixture.java | 3 + .../james/queue/rabbitmq/RabbitMQTest.java | 154 +++++++++++++------ 3 files changed, 162 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java new file mode 100644 index 0000000..f5f8fa2 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.queue.rabbitmq; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class InMemoryConsumer extends DefaultConsumer { + + private final ConcurrentLinkedQueue<Integer> messages; + + public InMemoryConsumer(Channel channel) { + super(channel); + messages = new ConcurrentLinkedQueue<>(); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); + messages.add(payload); + } + + public Queue<Integer> getConsumedMessages() { + return messages; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java index e216690..3ed6237 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java @@ -34,6 +34,9 @@ public class RabbitMQFixture { public static final String EXCHANGE_NAME = "exchangeName"; public static final String ROUTING_KEY = "routingKey"; public static final String DIRECT = "direct"; + public static final boolean EXCLUSIVE = true; + public static final boolean AUTO_DELETE = true; + public static final String WORK_QUEUE = "workQueue"; public static Duration slowPacedPollInterval = FIVE_HUNDRED_MILLISECONDS; public static ConditionFactory calmlyAwait = Awaitility.with() http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java index cad0303..2463516 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java @@ -19,34 +19,38 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY; +import static org.apache.james.queue.rabbitmq.RabbitMQFixture.WORK_QUEUE; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; @ExtendWith(DockerRabbitMQExtension.class) class RabbitMQTest { @@ -87,44 +91,65 @@ class RabbitMQTest { return false; } } - } @Nested - class BroadcastTest { + class FourConnections { private ConnectionFactory connectionFactory1; private ConnectionFactory connectionFactory2; private ConnectionFactory connectionFactory3; private ConnectionFactory connectionFactory4; + private Connection connection1; + private Connection connection2; + private Connection connection3; + private Connection connection4; + private Channel publisherChannel; + private Channel subscriberChannel2; + private Channel subscriberChannel3; + private Channel subscriberChannel4; @BeforeEach - public void setup(DockerRabbitMQ rabbitMQ) { + void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { connectionFactory1 = rabbitMQ.connectionFactory(); connectionFactory2 = rabbitMQ.connectionFactory(); connectionFactory3 = rabbitMQ.connectionFactory(); connectionFactory4 = rabbitMQ.connectionFactory(); + connection1 = connectionFactory1.newConnection(); + connection2 = connectionFactory2.newConnection(); + connection3 = connectionFactory3.newConnection(); + connection4 = connectionFactory4.newConnection(); + publisherChannel = connection1.createChannel(); + subscriberChannel2 = connection2.createChannel(); + subscriberChannel3 = connection3.createChannel(); + subscriberChannel4 = connection4.createChannel(); } - // In the following case, each consumer will receive the messages produced by the - // producer - // To do so, each consumer will bind it's queue to the producer exchange. - @Test - public void rabbitMQShouldSupportTheBroadcastCase() throws Exception { - ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); - ConcurrentLinkedQueue<Integer> results2 = new ConcurrentLinkedQueue<>(); - ConcurrentLinkedQueue<Integer> results3 = new ConcurrentLinkedQueue<>(); - ConcurrentLinkedQueue<Integer> results4 = new ConcurrentLinkedQueue<>(); - - try (Connection connection1 = connectionFactory1.newConnection(); - Channel publisherChannel = connection1.createChannel(); - Connection connection2 = connectionFactory2.newConnection(); - Channel subscriberChannel2 = connection2.createChannel(); - Connection connection3 = connectionFactory3.newConnection(); - Channel subscriberChannel3 = connection3.createChannel(); - Connection connection4 = connectionFactory4.newConnection(); - Channel subscriberChannel4 = connection4.createChannel()) { + @AfterEach + void tearDown() { + closeQuietly( + publisherChannel, subscriberChannel2, subscriberChannel3, subscriberChannel4, + connection1, connection2, connection3, connection4); + } + + private void closeQuietly(AutoCloseable... closeables) { + for (AutoCloseable closeable : closeables) { + try { + closeable.close(); + } catch (Exception e) { + //ignoring exception + } + } + } + + @Nested + class BroadCast { + // In the following case, each consumer will receive the messages produced by the + // producer + // To do so, each consumer will bind it's queue to the producer exchange. + @Test + void rabbitMQShouldSupportTheBroadcastCase() throws Exception { // Declare a single exchange and three queues attached to it. publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); @@ -135,9 +160,12 @@ class RabbitMQTest { String queue4 = subscriberChannel4.queueDeclare().getQueue(); subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); - subscriberChannel2.basicConsume(queue2, storeInResultCallBack(subscriberChannel2, results2)); - subscriberChannel3.basicConsume(queue3, storeInResultCallBack(subscriberChannel3, results3)); - subscriberChannel4.basicConsume(queue4, storeInResultCallBack(subscriberChannel4, results4)); + InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4); + subscriberChannel2.basicConsume(queue2, consumer2); + subscriberChannel3.basicConsume(queue3, consumer3); + subscriberChannel4.basicConsume(queue4, consumer4); // the publisher will produce 10 messages IntStream.range(0, 10) @@ -146,30 +174,68 @@ class RabbitMQTest { .forEach(Throwing.consumer( bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - awaitAtMostOneMinute.until(() -> allMessageReceived(expectedResult, results2, results3, results4)); + awaitAtMostOneMinute.until( + () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30); + ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); // Check every subscriber have receive all the messages. - assertThat(results2).containsOnlyElementsOf(expectedResult); - assertThat(results3).containsOnlyElementsOf(expectedResult); - assertThat(results4).containsOnlyElementsOf(expectedResult); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult); } } - private boolean allMessageReceived(ImmutableList<Integer> expectedResult, ConcurrentLinkedQueue<Integer> results2, ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> results4) { - return Iterables.size( - Iterables.concat(results2, results3, results4)) - == expectedResult.size() * 3; + @Nested + class WorkQueue { + + // In the following case, consumers will receive the messages produced by the + // producer but will share them. + // To do so, we will bind a single queue to the producer exchange. + @Test + void rabbitMQShouldSupportTheWorkQueueCase() throws Exception { + int nbMessages = 100; + + // Declare the exchange and a single queue attached to it. + publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); + publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + // Publisher will produce 100 messages + IntStream.range(0, nbMessages) + .mapToObj(String::valueOf) + .map(s -> s.getBytes(StandardCharsets.UTF_8)) + .forEach(Throwing.consumer( + bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4); + subscriberChannel2.basicConsume(WORK_QUEUE, consumer2); + subscriberChannel3.basicConsume(WORK_QUEUE, consumer3); + subscriberChannel4.basicConsume(WORK_QUEUE, consumer4); + + awaitAtMostOneMinute.until( + () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages); + + ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + + assertThat( + Iterables.concat( + consumer2.getConsumedMessages(), + consumer3.getConsumedMessages(), + consumer4.getConsumedMessages())) + .containsOnlyElementsOf(expectedResult); + } + } - private DefaultConsumer storeInResultCallBack(Channel channel, ConcurrentLinkedQueue<Integer> results) { - return new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); - results.add(payload); - } - }; + private long countReceivedMessages(InMemoryConsumer... consumers) { + return Arrays.stream(consumers) + .map(InMemoryConsumer::getConsumedMessages) + .mapToLong(Queue::size) + .sum(); } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
