JAMES-2334 Demonstrate that broadcast is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9f78f0d7 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9f78f0d7 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9f78f0d7 Branch: refs/heads/master Commit: 9f78f0d75f5690b6680b44972423966629a3296a Parents: 30679c4 Author: benwa <[email protected]> Authored: Wed Feb 7 10:01:45 2018 +0700 Committer: Matthieu Baechler <[email protected]> Committed: Thu May 31 09:47:02 2018 +0200 ---------------------------------------------------------------------- server/queue/queue-rabbitmq/pom.xml | 8 + .../james/queue/rabbitmq/RabbitMQTest.java | 145 ++++++++++++++++--- 2 files changed, 131 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/9f78f0d7/server/queue/queue-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml index 27b0c84..3df0fb6 100644 --- a/server/queue/queue-rabbitmq/pom.xml +++ b/server/queue/queue-rabbitmq/pom.xml @@ -34,6 +34,14 @@ <dependencies> <dependency> + <groupId>com.github.fge</groupId> + <artifactId>throwing-lambdas</artifactId> + </dependency> + <dependency> + <groupId>com.github.steveash.guavate</groupId> + <artifactId>guavate</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/9f78f0d7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java index 465be65..cad0303 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java @@ -25,51 +25,152 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY; import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; @ExtendWith(DockerRabbitMQExtension.class) class RabbitMQTest { private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8); - @Test - void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception { - ConnectionFactory connectionFactory = rabbitMQ.connectionFactory(); - try (Connection connection = connectionFactory.newConnection(); - Channel channel = connection.createChannel()) { - String queueName = createQueue(channel); + @Nested + class SingleConsumerTest { - publishAMessage(channel); + @Test + void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception { + ConnectionFactory connectionFactory = rabbitMQ.connectionFactory(); + try (Connection connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel()) { + String queueName = createQueue(channel); - awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName)); + publishAMessage(channel); + + awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName)); + } } - } - private String createQueue(Channel channel) throws IOException { - channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); - return queueName; - } + private String createQueue(Channel channel) throws IOException { + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); + return queueName; + } + + private void publishAMessage(Channel channel) throws IOException { + channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD); + } + + private Boolean messageReceived(Channel channel, String queueName) { + try { + return channel.basicGet(queueName, !AUTO_ACK) != null; + } catch (Exception e) { + return false; + } + } - private void publishAMessage(Channel channel) throws IOException { - channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD); } - private Boolean messageReceived(Channel channel, String queueName) { - try { - return channel.basicGet(queueName, !AUTO_ACK) != null; - } catch (Exception e) { - return false; + @Nested + class BroadcastTest { + + private ConnectionFactory connectionFactory1; + private ConnectionFactory connectionFactory2; + private ConnectionFactory connectionFactory3; + private ConnectionFactory connectionFactory4; + + @BeforeEach + public void setup(DockerRabbitMQ rabbitMQ) { + connectionFactory1 = rabbitMQ.connectionFactory(); + connectionFactory2 = rabbitMQ.connectionFactory(); + connectionFactory3 = rabbitMQ.connectionFactory(); + connectionFactory4 = rabbitMQ.connectionFactory(); + } + + // In the following case, each consumer will receive the messages produced by the + // producer + // To do so, each consumer will bind it's queue to the producer exchange. + @Test + public void rabbitMQShouldSupportTheBroadcastCase() throws Exception { + ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); + ConcurrentLinkedQueue<Integer> results2 = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue<Integer> results3 = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue<Integer> results4 = new ConcurrentLinkedQueue<>(); + + try (Connection connection1 = connectionFactory1.newConnection(); + Channel publisherChannel = connection1.createChannel(); + Connection connection2 = connectionFactory2.newConnection(); + Channel subscriberChannel2 = connection2.createChannel(); + Connection connection3 = connectionFactory3.newConnection(); + Channel subscriberChannel3 = connection3.createChannel(); + Connection connection4 = connectionFactory4.newConnection(); + Channel subscriberChannel4 = connection4.createChannel()) { + + // Declare a single exchange and three queues attached to it. + publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue2 = subscriberChannel2.queueDeclare().getQueue(); + subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); + String queue3 = subscriberChannel3.queueDeclare().getQueue(); + subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); + String queue4 = subscriberChannel4.queueDeclare().getQueue(); + subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); + + subscriberChannel2.basicConsume(queue2, storeInResultCallBack(subscriberChannel2, results2)); + subscriberChannel3.basicConsume(queue3, storeInResultCallBack(subscriberChannel3, results3)); + subscriberChannel4.basicConsume(queue4, storeInResultCallBack(subscriberChannel4, results4)); + + // the publisher will produce 10 messages + IntStream.range(0, 10) + .mapToObj(String::valueOf) + .map(s -> s.getBytes(StandardCharsets.UTF_8)) + .forEach(Throwing.consumer( + bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + awaitAtMostOneMinute.until(() -> allMessageReceived(expectedResult, results2, results3, results4)); + + // Check every subscriber have receive all the messages. + assertThat(results2).containsOnlyElementsOf(expectedResult); + assertThat(results3).containsOnlyElementsOf(expectedResult); + assertThat(results4).containsOnlyElementsOf(expectedResult); + } + } + + private boolean allMessageReceived(ImmutableList<Integer> expectedResult, ConcurrentLinkedQueue<Integer> results2, ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> results4) { + return Iterables.size( + Iterables.concat(results2, results3, results4)) + == expectedResult.size() * 3; + } + + private DefaultConsumer storeInResultCallBack(Channel channel, ConcurrentLinkedQueue<Integer> results) { + return new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); + results.add(payload); + } + }; } } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
