JAMES-2334 Add node killing tests for RabbitMQ cluster

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/552e44d0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/552e44d0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/552e44d0

Branch: refs/heads/master
Commit: 552e44d073580b2565d163e007b3a3144c671bcc
Parents: 00adfa8
Author: benwa <[email protected]>
Authored: Fri Feb 9 10:50:27 2018 +0700
Committer: Matthieu Baechler <[email protected]>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../DockerClusterRabbitMQExtension.java         | 120 ++++++++
 .../DockerClusterRabbitMQExtention.java         | 112 --------
 .../james/queue/rabbitmq/InMemoryConsumer.java  |  14 +-
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 281 ++++++++++++++-----
 4 files changed, 345 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
new file mode 100644
index 0000000..000fef6
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
@@ -0,0 +1,120 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.containers.Network;
+
+import com.google.common.collect.ImmutableList;
+import com.rabbitmq.client.Address;
+
+public class DockerClusterRabbitMQExtension implements BeforeEachCallback, 
AfterEachCallback, ParameterResolver {
+
+    public static final String RABBIT_1 = "rabbit1";
+    public static final String RABBIT_2 = "rabbit2";
+    public static final String RABBIT_3 = "rabbit3";
+    private DockerRabbitMQCluster cluster;
+    private Network network;
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        String cookie = DigestUtils.sha1Hex("secret cookie here");
+
+        network = Network.NetworkImpl.builder()
+            .enableIpv6(false)
+            .createNetworkCmdModifiers(ImmutableList.of())
+            .build();
+
+        DockerRabbitMQ rabbitMQ1 = 
DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", 
network);
+        DockerRabbitMQ rabbitMQ2 = 
DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", 
network);
+        DockerRabbitMQ rabbitMQ3 = 
DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", 
network);
+
+        rabbitMQ1.start();
+        rabbitMQ2.start();
+        rabbitMQ3.start();
+
+        rabbitMQ2.join(rabbitMQ1);
+        rabbitMQ3.join(rabbitMQ1);
+
+        rabbitMQ2.startApp();
+        rabbitMQ3.startApp();
+
+        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        cluster.stop();
+        network.close();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, 
ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == 
DockerRabbitMQCluster.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, 
ExtensionContext extensionContext) throws ParameterResolutionException {
+        return cluster;
+    }
+
+    public static class DockerRabbitMQCluster {
+
+        private final DockerRabbitMQ rabbitMQ1;
+        private final DockerRabbitMQ rabbitMQ2;
+        private final DockerRabbitMQ rabbitMQ3;
+
+        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ 
rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
+            this.rabbitMQ1 = rabbitMQ1;
+            this.rabbitMQ2 = rabbitMQ2;
+            this.rabbitMQ3 = rabbitMQ3;
+        }
+
+        public void stop() {
+            rabbitMQ1.stop();
+            rabbitMQ2.stop();
+            rabbitMQ3.stop();
+        }
+
+        public DockerRabbitMQ getRabbitMQ1() {
+            return rabbitMQ1;
+        }
+
+        public DockerRabbitMQ getRabbitMQ2() {
+            return rabbitMQ2;
+        }
+
+        public DockerRabbitMQ getRabbitMQ3() {
+            return rabbitMQ3;
+        }
+
+        public ImmutableList<Address> getAddresses() {
+            return ImmutableList.of(
+                new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
+                new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
+                new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
deleted file mode 100644
index fae2016..0000000
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.queue.rabbitmq;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.testcontainers.containers.Network;
-
-import com.google.common.collect.ImmutableList;
-
-public class DockerClusterRabbitMQExtention implements BeforeEachCallback, 
AfterEachCallback, ParameterResolver {
-
-    public static final String RABBIT_1 = "rabbit1";
-    public static final String RABBIT_2 = "rabbit2";
-    public static final String RABBIT_3 = "rabbit3";
-    private DockerRabbitMQCluster cluster;
-    private Network network;
-
-    @Override
-    public void beforeEach(ExtensionContext context) throws Exception {
-        String cookie = DigestUtils.sha1Hex("secret cookie here");
-
-        network = Network.NetworkImpl.builder()
-            .enableIpv6(false)
-            .createNetworkCmdModifiers(ImmutableList.of())
-            .build();
-
-        DockerRabbitMQ rabbitMQ1 = 
DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", 
network);
-        DockerRabbitMQ rabbitMQ2 = 
DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", 
network);
-        DockerRabbitMQ rabbitMQ3 = 
DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", 
network);
-
-        rabbitMQ1.start();
-        rabbitMQ2.start();
-        rabbitMQ3.start();
-
-        rabbitMQ2.join(rabbitMQ1);
-        rabbitMQ3.join(rabbitMQ1);
-
-        rabbitMQ2.startApp();
-        rabbitMQ3.startApp();
-
-        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
-    }
-
-    @Override
-    public void afterEach(ExtensionContext context) throws Exception {
-        cluster.stop();
-        network.close();
-    }
-
-    @Override
-    public boolean supportsParameter(ParameterContext parameterContext, 
ExtensionContext extensionContext) throws ParameterResolutionException {
-        return (parameterContext.getParameter().getType() == 
DockerRabbitMQCluster.class);
-    }
-
-    @Override
-    public Object resolveParameter(ParameterContext parameterContext, 
ExtensionContext extensionContext) throws ParameterResolutionException {
-        return cluster;
-    }
-
-    public static class DockerRabbitMQCluster {
-
-        private final DockerRabbitMQ rabbitMQ1;
-        private final DockerRabbitMQ rabbitMQ2;
-        private final DockerRabbitMQ rabbitMQ3;
-
-        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ 
rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
-            this.rabbitMQ1 = rabbitMQ1;
-            this.rabbitMQ2 = rabbitMQ2;
-            this.rabbitMQ3 = rabbitMQ3;
-        }
-
-        public void stop() {
-            rabbitMQ1.stop();
-            rabbitMQ2.stop();
-            rabbitMQ3.stop();
-        }
-
-        public DockerRabbitMQ getRabbitMQ1() {
-            return rabbitMQ1;
-        }
-
-        public DockerRabbitMQ getRabbitMQ2() {
-            return rabbitMQ2;
-        }
-
-        public DockerRabbitMQ getRabbitMQ3() {
-            return rabbitMQ3;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
index f5f8fa2..6dd29af 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -30,15 +30,27 @@ import com.rabbitmq.client.Envelope;
 
 public class InMemoryConsumer extends DefaultConsumer {
 
+    @FunctionalInterface
+    interface Operation {
+        void perform();
+    }
+
     private final ConcurrentLinkedQueue<Integer> messages;
+    private final Operation operation;
 
     public InMemoryConsumer(Channel channel) {
+        this(channel, () -> {});
+    }
+
+    public InMemoryConsumer(Channel channel, Operation operation) {
         super(channel);
-        messages = new ConcurrentLinkedQueue<>();
+        this.operation = operation;
+        this.messages = new ConcurrentLinkedQueue<>();
     }
 
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
+        operation.perform();
         Integer payload = Integer.valueOf(new String(body, 
StandardCharsets.UTF_8));
         messages.add(payload);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 4a7dd07..d3f2cc1 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -30,116 +30,259 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
-import 
org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import 
org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-@ExtendWith(DockerClusterRabbitMQExtention.class)
+@ExtendWith(DockerClusterRabbitMQExtension.class)
 class RabbitMQClusterTest {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQClusterTest.class);
+
     private static final String QUEUE = "queue";
 
-    private Connection node1Connection;
-    private Channel node1Channel;
-    private Connection node2Connection;
-    private Channel node2Channel;
-
-    @BeforeEach
-    void setup(DockerRabbitMQCluster cluster) throws IOException, 
TimeoutException {
-        ConnectionFactory node1ConnectionFactory = 
cluster.getRabbitMQ1().connectionFactory();
-        ConnectionFactory node2ConnectionFactory = 
cluster.getRabbitMQ2().connectionFactory();
-        node1Connection = node1ConnectionFactory.newConnection();
-        node2Connection = node2ConnectionFactory.newConnection();
-        node1Channel = node1Connection.createChannel();
-        node2Channel = node2Connection.createChannel();
-    }
+    @Nested
+    class ClusterSharing {
+
+        private ConnectionFactory node1ConnectionFactory;
+        private ConnectionFactory node2ConnectionFactory;
+        private Connection node1Connection;
+        private Connection node2Connection;
+        private Channel node1Channel;
+        private Channel node2Channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQCluster cluster) throws IOException, 
TimeoutException {
+            node1ConnectionFactory = 
cluster.getRabbitMQ1().connectionFactory();
+            node2ConnectionFactory = 
cluster.getRabbitMQ2().connectionFactory();
+            node1Connection = node1ConnectionFactory.newConnection();
+            node2Connection = node2ConnectionFactory.newConnection();
+            node1Channel = node1Connection.createChannel();
+            node2Channel = node2Connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(node1Channel, node2Channel, node1Connection, 
node2Connection);
+        }
+
+        @Test
+        void 
rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster 
cluster) throws Exception {
+            String stdout = cluster.getRabbitMQ1().container()
+                .execInContainer("rabbitmqctl", "cluster_status")
+                .getStdout();
+
+            assertThat(stdout)
+                .contains(
+                    DockerClusterRabbitMQExtension.RABBIT_1,
+                    DockerClusterRabbitMQExtension.RABBIT_2,
+                    DockerClusterRabbitMQExtension.RABBIT_3);
+        }
+
+        @Test
+        void queuesShouldBeShared() throws Exception {
+            node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> node1Channel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> 
consumer2.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Guavate.toImmutableList());
+            
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        @Test
+        void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
+            node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> node1Channel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> 
consumer2.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Guavate.toImmutableList());
+            
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
 
-    @AfterEach
-    void tearDown() {
-        closeQuietly(node1Channel, node2Channel, node1Connection, 
node2Connection);
     }
 
-    private void closeQuietly(AutoCloseable... closeables) {
-        for (AutoCloseable closeable : closeables) {
+    @Nested
+    class ClusterNodesFailure {
+
+        private ConnectionFactory node1ConnectionFactory;
+        private Connection resilientConnection;
+        private Channel resilientChannel;
+        private Connection node2Connection;
+        private Channel node2Channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQCluster cluster) throws IOException, 
TimeoutException {
+            node1ConnectionFactory = 
cluster.getRabbitMQ1().connectionFactory();
+            resilientConnection = 
node1ConnectionFactory.newConnection(cluster.getAddresses());
+            resilientChannel = resilientConnection.createChannel();
+            ConnectionFactory node2ConnectionFactory = 
cluster.getRabbitMQ2().connectionFactory();
+            node2Connection = node2ConnectionFactory.newConnection();
+            node2Channel = node2Connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(resilientConnection, resilientChannel);
+        }
+
+        @Disabled("For some reason, we are unable to recover topology when 
reconnecting")
+        @Test
+        void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws 
Exception {
+            resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of()).getQueue();
+            resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 20;
+            int firstBatchSize = nbMessages / 2;
+            IntStream.range(0, firstBatchSize)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer);
+            awaitAtMostOneMinute.until(() -> 
consumer.getConsumedMessages().size() == firstBatchSize);
+
+            cluster.getRabbitMQ1().stop();
+
+            IntStream.range(firstBatchSize, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(this::tryPublishWithRetry);
+
+            awaitAtMostOneMinute.until(() -> 
consumer.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Guavate.toImmutableList());
+            
assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        private void tryPublishWithRetry(byte[] bytes) {
+            
Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(()
 -> tryPublish(bytes));
+        }
+
+        private boolean tryPublish(byte[] bytes) {
             try {
-                closeable.close();
+                resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, 
NO_PROPERTIES, bytes);
+                return true;
             } catch (Exception e) {
-                //ignoring exception
+                LOGGER.error("failed publish", e);
+                return false;
             }
         }
-    }
 
-    @Test
-    void 
rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster 
cluster) throws Exception {
-        String stdout = cluster.getRabbitMQ1().container()
-            .execInContainer("rabbitmqctl", "cluster_status")
-            .getStdout();
-
-        assertThat(stdout)
-            .contains(
-                DockerClusterRabbitMQExtention.RABBIT_1,
-                DockerClusterRabbitMQExtention.RABBIT_2,
-                DockerClusterRabbitMQExtention.RABBIT_3);
-    }
+        @Test
+        void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster 
cluster) throws Exception {
+            ConnectionFactory node3ConnectionFactory = 
cluster.getRabbitMQ3().connectionFactory();
+            cluster.getRabbitMQ3().stop();
 
-    @Test
-    void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
-        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-        node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, 
ImmutableMap.of()).getQueue();
-        node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+            try (Connection connection = 
node3ConnectionFactory.newConnection(cluster.getAddresses());
+                 Channel channel = connection.createChannel()) {
 
-        int nbMessages = 10;
-        IntStream.range(0, nbMessages)
-            .mapToObj(i -> asBytes(String.valueOf(i)))
-            .forEach(Throwing.consumer(
-                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, 
NO_PROPERTIES, bytes)));
+                channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+                channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, 
ImmutableMap.of()).getQueue();
+                channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
+                int nbMessages = 10;
+                IntStream.range(0, nbMessages)
+                    .mapToObj(i -> asBytes(String.valueOf(i)))
+                    .forEach(Throwing.consumer(
+                        bytes -> channel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
-        node2Channel.basicConsume(QUEUE, consumer2);
+                InMemoryConsumer consumer = new InMemoryConsumer(channel);
+                channel.basicConsume(QUEUE, consumer);
 
-        awaitAtMostOneMinute.until(() -> 
consumer2.getConsumedMessages().size() == nbMessages);
+                awaitAtMostOneMinute.until(() -> 
consumer.getConsumedMessages().size() == nbMessages);
 
-        List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Collectors.toList());
-        
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
-    }
+                List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Guavate.toImmutableList());
+                
assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+            }
+        }
 
-    @Test
-    void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) 
throws Exception {
-        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-        node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, 
ImmutableMap.of()).getQueue();
-        node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+        @Test
+        void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws 
Exception {
+            node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, 
!AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
-        int nbMessages = 10;
-        IntStream.range(0, nbMessages)
-            .mapToObj(i -> asBytes(String.valueOf(i)))
-            .forEach(Throwing.consumer(
-                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, 
NO_PROPERTIES, bytes)));
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, 
ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
-        node2Channel.basicConsume(QUEUE, consumer2);
+            AtomicInteger counter = new AtomicInteger(0);
+            InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
+                () -> {
+                    if (counter.incrementAndGet() == nbMessages / 2) {
+                        cluster.getRabbitMQ1().stop();
+                    }
+                });
+            resilientChannel.basicConsume(QUEUE, consumer);
 
-        awaitAtMostOneMinute.until(() -> 
consumer2.getConsumedMessages().size() == nbMessages);
+            awaitAtMostOneMinute.until(() -> 
consumer.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Guavate.toImmutableList());
+            
assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
 
-        List<Integer> expectedResult = IntStream.range(0, 
nbMessages).boxed().collect(Collectors.toList());
-        
assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
     }
 
     private byte[] asBytes(String message) {
         return message.getBytes(StandardCharsets.UTF_8);
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to