This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e644a6f591a3fe4423538aa98941d53b048560d0
Author: Matthieu Baechler <[email protected]>
AuthorDate: Wed Aug 21 13:46:10 2019 +0200

    JAMES-2813 Demonstrate how exclusive consumer works
---
 .../apache/james/backend/rabbitmq/Constants.java   |  1 +
 .../james/backend/rabbitmq/InMemoryConsumer.java   |  6 +-
 .../backend/rabbitmq/RabbitMQClusterTest.java      |  2 +-
 .../james/backend/rabbitmq/RabbitMQTest.java       | 79 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 4 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java
index 4fc2410..e6cea0f 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/Constants.java
@@ -26,6 +26,7 @@ public interface Constants {
     boolean DURABLE = true;
     boolean AUTO_DELETE = true;
     boolean EXCLUSIVE = true;
+    boolean NO_LOCAL = true;
 
     boolean AUTO_ACK = true;
     boolean MULTIPLE = true;
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java
index 712c70d..3cc93ae 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java
@@ -32,14 +32,14 @@ public class InMemoryConsumer extends DefaultConsumer {
 
     @FunctionalInterface
     interface Operation {
-        void perform();
+        void perform(Envelope envelope, byte[] body) throws IOException;
     }
 
     private final ConcurrentLinkedQueue<Integer> messages;
     private final Operation operation;
 
     public InMemoryConsumer(Channel channel) {
-        this(channel, () -> { });
+        this(channel, (envelope, body) -> { });
     }
 
     public InMemoryConsumer(Channel channel, Operation operation) {
@@ -50,7 +50,7 @@ public class InMemoryConsumer extends DefaultConsumer {
 
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-        operation.perform();
+        operation.perform(envelope, body);
         Integer payload = Integer.valueOf(new String(body, 
StandardCharsets.UTF_8));
         messages.add(payload);
     }
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java
index 348d3ff..b4066a1 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java
@@ -278,7 +278,7 @@ class RabbitMQClusterTest {
 
             AtomicInteger counter = new AtomicInteger(0);
             InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
-                () -> stopWhenHalfProcessed(cluster, nbMessages, counter));
+                (envelope, body) -> stopWhenHalfProcessed(cluster, nbMessages, 
counter));
             resilientChannel.basicConsume(QUEUE, consumer);
 
             awaitAtMostOneMinute.until(() -> 
consumer.getConsumedMessages().size() == nbMessages);
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java
index b6f1b31..d858258 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java
@@ -23,17 +23,23 @@ import static 
org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.MULTIPLE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_LOCAL;
 import static org.apache.james.backend.rabbitmq.Constants.NO_PROPERTIES;
+import static org.apache.james.backend.rabbitmq.Constants.REQUEUE;
 import static org.apache.james.backend.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
 import static org.apache.james.backend.rabbitmq.RabbitMQFixture.ROUTING_KEY;
 import static org.apache.james.backend.rabbitmq.RabbitMQFixture.WORK_QUEUE;
 import static 
org.apache.james.backend.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 
@@ -48,9 +54,11 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.rabbitmq.client.CancelCallback;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
 
 class RabbitMQTest {
 
@@ -248,6 +256,77 @@ class RabbitMQTest {
                     .containsOnlyElementsOf(expectedResult);
             }
 
+            @Test
+            void rabbitMQShouldRejectSecondConsumerInExclusiveWorkQueueCase() 
throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of());
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                IntStream.range(0, 10)
+                        .mapToObj(String::valueOf)
+                        .map(RabbitMQTest.this::asBytes)
+                        .forEach(Throwing.consumer(
+                                bytes -> channel1.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                ConcurrentLinkedQueue<Integer> receivedMessages = new 
ConcurrentLinkedQueue<>();
+                String dyingConsumerTag = "dyingConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, AUTO_ACK, dyingConsumerTag, 
!NO_LOCAL, EXCLUSIVE, arguments,
+                        (consumerTag, message) -> {
+                            try {
+                                TimeUnit.SECONDS.sleep(1);
+                            } catch (InterruptedException e) {
+                                //do nothing
+                            }
+                        },
+                        (consumerTag -> { }));
+                assertThatThrownBy(() ->
+                        channel3.basicConsume(WORK_QUEUE, AUTO_ACK, 
"fallbackConsumer", !NO_LOCAL, EXCLUSIVE, arguments,
+                                (consumerTag, message) -> { },
+                                consumerTag -> { }))
+                    .isInstanceOf(IOException.class)
+                    .hasStackTraceContaining("ACCESS_REFUSED");
+            }
+
+            @Test
+            void rabbitMQShouldSupportTheExclusiveWorkQueueCase() throws 
Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of());
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                IntStream.range(0, 10)
+                        .mapToObj(String::valueOf)
+                        .map(RabbitMQTest.this::asBytes)
+                        .forEach(Throwing.consumer(
+                                bytes -> channel1.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                String dyingConsumerTag = "dyingConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                ConcurrentLinkedQueue<Integer> receivedMessages = new 
ConcurrentLinkedQueue<>();
+                CancelCallback doNothingOnCancel = consumerTag -> { };
+                DeliverCallback ackFirstMessageOnly = (consumerTag, message) 
-> {
+                    if (receivedMessages.size() == 0) {
+                        receivedMessages.add(Integer.valueOf(new 
String(message.getBody(), StandardCharsets.UTF_8)));
+                        
channel2.basicAck(message.getEnvelope().getDeliveryTag(), !MULTIPLE);
+                    } else {
+                        
channel2.basicNack(message.getEnvelope().getDeliveryTag(), !MULTIPLE, REQUEUE);
+                    }
+                };
+                channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, dyingConsumerTag, 
!NO_LOCAL, EXCLUSIVE, arguments, ackFirstMessageOnly, doNothingOnCancel);
+
+                awaitAtMostOneMinute.until(() -> receivedMessages.size() == 1);
+
+                channel2.basicCancel(dyingConsumerTag);
+
+                InMemoryConsumer fallbackConsumer = new 
InMemoryConsumer(channel3);
+                channel3.basicConsume(WORK_QUEUE, AUTO_ACK, 
"fallbackConsumer", !NO_LOCAL, EXCLUSIVE, arguments, fallbackConsumer);
+
+                awaitAtMostOneMinute.until(() -> 
countReceivedMessages(fallbackConsumer) >= 1);
+
+                assertThat(receivedMessages).containsExactly(0);
+                assertThat(fallbackConsumer.getConsumedMessages()).contains(1, 
2).doesNotContain(0);
+            }
+
         }
 
         @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to