This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e644a6f591a3fe4423538aa98941d53b048560d0 Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Aug 21 13:46:10 2019 +0200 JAMES-2813 Demonstrate how exclusive consumer works --- .../apache/james/backend/rabbitmq/Constants.java | 1 + .../james/backend/rabbitmq/InMemoryConsumer.java | 6 +- .../backend/rabbitmq/RabbitMQClusterTest.java | 2 +- .../james/backend/rabbitmq/RabbitMQTest.java | 79 ++++++++++++++++++++++ 4 files changed, 84 insertions(+), 4 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java index 4fc2410..e6cea0f 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java @@ -26,6 +26,7 @@ public interface Constants { boolean DURABLE = true; boolean AUTO_DELETE = true; boolean EXCLUSIVE = true; + boolean NO_LOCAL = true; boolean AUTO_ACK = true; boolean MULTIPLE = true; diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java index 712c70d..3cc93ae 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java @@ -32,14 +32,14 @@ public class InMemoryConsumer extends DefaultConsumer { @FunctionalInterface interface Operation { - void perform(); + void perform(Envelope envelope, byte[] body) throws IOException; } private final ConcurrentLinkedQueue<Integer> messages; private final Operation operation; public InMemoryConsumer(Channel channel) { - this(channel, () -> { }); + this(channel, (envelope, body) -> { }); } public InMemoryConsumer(Channel channel, Operation operation) { @@ -50,7 +50,7 @@ public class InMemoryConsumer extends DefaultConsumer { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - operation.perform(); + operation.perform(envelope, body); Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); messages.add(payload); } diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java index 348d3ff..b4066a1 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java @@ -278,7 +278,7 @@ class RabbitMQClusterTest { AtomicInteger counter = new AtomicInteger(0); InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, - () -> stopWhenHalfProcessed(cluster, nbMessages, counter)); + (envelope, body) -> stopWhenHalfProcessed(cluster, nbMessages, counter)); resilientChannel.basicConsume(QUEUE, consumer); awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java index b6f1b31..d858258 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java @@ -23,17 +23,23 @@ import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE; import static org.apache.james.backend.rabbitmq.Constants.DURABLE; import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE; +import static org.apache.james.backend.rabbitmq.Constants.MULTIPLE; +import static org.apache.james.backend.rabbitmq.Constants.NO_LOCAL; import static org.apache.james.backend.rabbitmq.Constants.NO_PROPERTIES; +import static org.apache.james.backend.rabbitmq.Constants.REQUEUE; import static org.apache.james.backend.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; import static org.apache.james.backend.rabbitmq.RabbitMQFixture.ROUTING_KEY; import static org.apache.james.backend.rabbitmq.RabbitMQFixture.WORK_QUEUE; import static org.apache.james.backend.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; @@ -48,9 +54,11 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; class RabbitMQTest { @@ -248,6 +256,77 @@ class RabbitMQTest { .containsOnlyElementsOf(expectedResult); } + @Test + void rabbitMQShouldRejectSecondConsumerInExclusiveWorkQueueCase() throws Exception { + channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()); + channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + IntStream.range(0, 10) + .mapToObj(String::valueOf) + .map(RabbitMQTest.this::asBytes) + .forEach(Throwing.consumer( + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + ConcurrentLinkedQueue<Integer> receivedMessages = new ConcurrentLinkedQueue<>(); + String dyingConsumerTag = "dyingConsumer"; + ImmutableMap<String, Object> arguments = ImmutableMap.of(); + channel2.basicConsume(WORK_QUEUE, AUTO_ACK, dyingConsumerTag, !NO_LOCAL, EXCLUSIVE, arguments, + (consumerTag, message) -> { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + //do nothing + } + }, + (consumerTag -> { })); + assertThatThrownBy(() -> + channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "fallbackConsumer", !NO_LOCAL, EXCLUSIVE, arguments, + (consumerTag, message) -> { }, + consumerTag -> { })) + .isInstanceOf(IOException.class) + .hasStackTraceContaining("ACCESS_REFUSED"); + } + + @Test + void rabbitMQShouldSupportTheExclusiveWorkQueueCase() throws Exception { + channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()); + channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + IntStream.range(0, 10) + .mapToObj(String::valueOf) + .map(RabbitMQTest.this::asBytes) + .forEach(Throwing.consumer( + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + String dyingConsumerTag = "dyingConsumer"; + ImmutableMap<String, Object> arguments = ImmutableMap.of(); + ConcurrentLinkedQueue<Integer> receivedMessages = new ConcurrentLinkedQueue<>(); + CancelCallback doNothingOnCancel = consumerTag -> { }; + DeliverCallback ackFirstMessageOnly = (consumerTag, message) -> { + if (receivedMessages.size() == 0) { + receivedMessages.add(Integer.valueOf(new String(message.getBody(), StandardCharsets.UTF_8))); + channel2.basicAck(message.getEnvelope().getDeliveryTag(), !MULTIPLE); + } else { + channel2.basicNack(message.getEnvelope().getDeliveryTag(), !MULTIPLE, REQUEUE); + } + }; + channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, dyingConsumerTag, !NO_LOCAL, EXCLUSIVE, arguments, ackFirstMessageOnly, doNothingOnCancel); + + awaitAtMostOneMinute.until(() -> receivedMessages.size() == 1); + + channel2.basicCancel(dyingConsumerTag); + + InMemoryConsumer fallbackConsumer = new InMemoryConsumer(channel3); + channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "fallbackConsumer", !NO_LOCAL, EXCLUSIVE, arguments, fallbackConsumer); + + awaitAtMostOneMinute.until(() -> countReceivedMessages(fallbackConsumer) >= 1); + + assertThat(receivedMessages).containsExactly(0); + assertThat(fallbackConsumer.getConsumedMessages()).contains(1, 2).doesNotContain(0); + } + } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
