JAMES-2334 Demonstrate that routing is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/94ddf6a0 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/94ddf6a0 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/94ddf6a0 Branch: refs/heads/master Commit: 94ddf6a0817b5adcf32666075cc1514ebd96e67e Parents: b07fd7e Author: Matthieu Baechler <[email protected]> Authored: Mon May 28 14:55:06 2018 +0200 Committer: Matthieu Baechler <[email protected]> Committed: Thu May 31 09:47:02 2018 +0200 ---------------------------------------------------------------------- .../james/queue/rabbitmq/RabbitMQTest.java | 136 +++++++++++++------ 1 file changed, 96 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/94ddf6a0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java index 2463516..5e8f311 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java @@ -55,8 +55,6 @@ import com.rabbitmq.client.ConnectionFactory; @ExtendWith(DockerRabbitMQExtension.class) class RabbitMQTest { - private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8); - @Nested class SingleConsumerTest { @@ -81,7 +79,7 @@ class RabbitMQTest { } private void publishAMessage(Channel channel) throws IOException { - channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD); + channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!")); } private Boolean messageReceived(Channel channel, String queueName) { @@ -104,10 +102,10 @@ class RabbitMQTest { private Connection connection2; private Connection connection3; private Connection connection4; - private Channel publisherChannel; - private Channel subscriberChannel2; - private Channel subscriberChannel3; - private Channel subscriberChannel4; + private Channel channel1; + private Channel channel2; + private Channel channel3; + private Channel channel4; @BeforeEach void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { @@ -119,16 +117,16 @@ class RabbitMQTest { connection2 = connectionFactory2.newConnection(); connection3 = connectionFactory3.newConnection(); connection4 = connectionFactory4.newConnection(); - publisherChannel = connection1.createChannel(); - subscriberChannel2 = connection2.createChannel(); - subscriberChannel3 = connection3.createChannel(); - subscriberChannel4 = connection4.createChannel(); + channel1 = connection1.createChannel(); + channel2 = connection2.createChannel(); + channel3 = connection3.createChannel(); + channel4 = connection4.createChannel(); } @AfterEach void tearDown() { closeQuietly( - publisherChannel, subscriberChannel2, subscriberChannel3, subscriberChannel4, + channel1, channel2, channel3, channel4, connection1, connection2, connection3, connection4); } @@ -151,28 +149,28 @@ class RabbitMQTest { @Test void rabbitMQShouldSupportTheBroadcastCase() throws Exception { // Declare a single exchange and three queues attached to it. - publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - - String queue2 = subscriberChannel2.queueDeclare().getQueue(); - subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); - String queue3 = subscriberChannel3.queueDeclare().getQueue(); - subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); - String queue4 = subscriberChannel4.queueDeclare().getQueue(); - subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); - - InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4); - subscriberChannel2.basicConsume(queue2, consumer2); - subscriberChannel3.basicConsume(queue3, consumer3); - subscriberChannel4.basicConsume(queue4, consumer4); + channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue2 = channel2.queueDeclare().getQueue(); + channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); + String queue3 = channel3.queueDeclare().getQueue(); + channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); + String queue4 = channel4.queueDeclare().getQueue(); + channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel2.basicConsume(queue2, consumer2); + channel3.basicConsume(queue3, consumer3); + channel4.basicConsume(queue4, consumer4); // the publisher will produce 10 messages IntStream.range(0, 10) .mapToObj(String::valueOf) - .map(s -> s.getBytes(StandardCharsets.UTF_8)) + .map(RabbitMQTest.this::asBytes) .forEach(Throwing.consumer( - bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); awaitAtMostOneMinute.until( () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30); @@ -196,23 +194,23 @@ class RabbitMQTest { int nbMessages = 100; // Declare the exchange and a single queue attached to it. - publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); - publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); - publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); + channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); + channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); // Publisher will produce 100 messages IntStream.range(0, nbMessages) .mapToObj(String::valueOf) - .map(s -> s.getBytes(StandardCharsets.UTF_8)) + .map(RabbitMQTest.this::asBytes) .forEach(Throwing.consumer( - bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2); - InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3); - InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4); - subscriberChannel2.basicConsume(WORK_QUEUE, consumer2); - subscriberChannel3.basicConsume(WORK_QUEUE, consumer3); - subscriberChannel4.basicConsume(WORK_QUEUE, consumer4); + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel2.basicConsume(WORK_QUEUE, consumer2); + channel3.basicConsume(WORK_QUEUE, consumer3); + channel4.basicConsume(WORK_QUEUE, consumer4); awaitAtMostOneMinute.until( () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages); @@ -229,6 +227,61 @@ class RabbitMQTest { } + @Nested + class Routing { + @Test + void rabbitMQShouldSupportRouting() throws Exception { + String conversation1 = "c1"; + String conversation2 = "c2"; + String conversation3 = "c3"; + String conversation4 = "c4"; + + // Declare the exchange and a single queue attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue1 = channel1.queueDeclare().getQueue(); + // 1 will follow conversation 1 and 2 + channel1.queueBind(queue1, EXCHANGE_NAME, conversation1); + channel1.queueBind(queue1, EXCHANGE_NAME, conversation2); + + String queue2 = channel2.queueDeclare().getQueue(); + // 2 will follow conversation 2 and 3 + channel2.queueBind(queue2, EXCHANGE_NAME, conversation2); + channel2.queueBind(queue2, EXCHANGE_NAME, conversation3); + + String queue3 = channel3.queueDeclare().getQueue(); + // 3 will follow conversation 3 and 4 + channel3.queueBind(queue3, EXCHANGE_NAME, conversation3); + channel3.queueBind(queue3, EXCHANGE_NAME, conversation4); + + String queue4 = channel4.queueDeclare().getQueue(); + // 4 will follow conversation 1 and 4 + channel4.queueBind(queue4, EXCHANGE_NAME, conversation1); + channel4.queueBind(queue4, EXCHANGE_NAME, conversation4); + + channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1")); + channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2")); + channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3")); + channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4")); + + InMemoryConsumer consumer1 = new InMemoryConsumer(channel1); + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel1.basicConsume(queue1, consumer1); + channel2.basicConsume(queue2, consumer2); + channel3.basicConsume(queue3, consumer3); + channel4.basicConsume(queue4, consumer4); + + awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8); + + assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2); + assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3); + assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4); + assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4); + } + } + private long countReceivedMessages(InMemoryConsumer... consumers) { return Arrays.stream(consumers) .map(InMemoryConsumer::getConsumedMessages) @@ -238,5 +291,8 @@ class RabbitMQTest { } + private byte[] asBytes(String message) { + return message.getBytes(StandardCharsets.UTF_8); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
