JAMES-2334 Add node killing tests for RabbitMQ cluster
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/552e44d0 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/552e44d0 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/552e44d0 Branch: refs/heads/master Commit: 552e44d073580b2565d163e007b3a3144c671bcc Parents: 00adfa8 Author: benwa <[email protected]> Authored: Fri Feb 9 10:50:27 2018 +0700 Committer: Matthieu Baechler <[email protected]> Committed: Thu May 31 09:47:02 2018 +0200 ---------------------------------------------------------------------- .../DockerClusterRabbitMQExtension.java | 120 ++++++++ .../DockerClusterRabbitMQExtention.java | 112 -------- .../james/queue/rabbitmq/InMemoryConsumer.java | 14 +- .../queue/rabbitmq/RabbitMQClusterTest.java | 281 ++++++++++++++----- 4 files changed, 345 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java new file mode 100644 index 0000000..000fef6 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java @@ -0,0 +1,120 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.queue.rabbitmq; + +import org.apache.commons.codec.digest.DigestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.testcontainers.containers.Network; + +import com.google.common.collect.ImmutableList; +import com.rabbitmq.client.Address; + +public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { + + public static final String RABBIT_1 = "rabbit1"; + public static final String RABBIT_2 = "rabbit2"; + public static final String RABBIT_3 = "rabbit3"; + private DockerRabbitMQCluster cluster; + private Network network; + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + String cookie = DigestUtils.sha1Hex("secret cookie here"); + + network = Network.NetworkImpl.builder() + .enableIpv6(false) + .createNetworkCmdModifiers(ImmutableList.of()) + .build(); + + DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network); + DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network); + DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network); + + rabbitMQ1.start(); + rabbitMQ2.start(); + rabbitMQ3.start(); + + rabbitMQ2.join(rabbitMQ1); + rabbitMQ3.join(rabbitMQ1); + + rabbitMQ2.startApp(); + rabbitMQ3.startApp(); + + cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + cluster.stop(); + network.close(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return cluster; + } + + public static class DockerRabbitMQCluster { + + private final DockerRabbitMQ rabbitMQ1; + private final DockerRabbitMQ rabbitMQ2; + private final DockerRabbitMQ rabbitMQ3; + + public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) { + this.rabbitMQ1 = rabbitMQ1; + this.rabbitMQ2 = rabbitMQ2; + this.rabbitMQ3 = rabbitMQ3; + } + + public void stop() { + rabbitMQ1.stop(); + rabbitMQ2.stop(); + rabbitMQ3.stop(); + } + + public DockerRabbitMQ getRabbitMQ1() { + return rabbitMQ1; + } + + public DockerRabbitMQ getRabbitMQ2() { + return rabbitMQ2; + } + + public DockerRabbitMQ getRabbitMQ3() { + return rabbitMQ3; + } + + public ImmutableList<Address> getAddresses() { + return ImmutableList.of( + new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()), + new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()), + new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort())); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java deleted file mode 100644 index fae2016..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java +++ /dev/null @@ -1,112 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import org.apache.commons.codec.digest.DigestUtils; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; -import org.testcontainers.containers.Network; - -import com.google.common.collect.ImmutableList; - -public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver { - - public static final String RABBIT_1 = "rabbit1"; - public static final String RABBIT_2 = "rabbit2"; - public static final String RABBIT_3 = "rabbit3"; - private DockerRabbitMQCluster cluster; - private Network network; - - @Override - public void beforeEach(ExtensionContext context) throws Exception { - String cookie = DigestUtils.sha1Hex("secret cookie here"); - - network = Network.NetworkImpl.builder() - .enableIpv6(false) - .createNetworkCmdModifiers(ImmutableList.of()) - .build(); - - DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network); - DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network); - DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network); - - rabbitMQ1.start(); - rabbitMQ2.start(); - rabbitMQ3.start(); - - rabbitMQ2.join(rabbitMQ1); - rabbitMQ3.join(rabbitMQ1); - - rabbitMQ2.startApp(); - rabbitMQ3.startApp(); - - cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3); - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - cluster.stop(); - network.close(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return cluster; - } - - public static class DockerRabbitMQCluster { - - private final DockerRabbitMQ rabbitMQ1; - private final DockerRabbitMQ rabbitMQ2; - private final DockerRabbitMQ rabbitMQ3; - - public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) { - this.rabbitMQ1 = rabbitMQ1; - this.rabbitMQ2 = rabbitMQ2; - this.rabbitMQ3 = rabbitMQ3; - } - - public void stop() { - rabbitMQ1.stop(); - rabbitMQ2.stop(); - rabbitMQ3.stop(); - } - - public DockerRabbitMQ getRabbitMQ1() { - return rabbitMQ1; - } - - public DockerRabbitMQ getRabbitMQ2() { - return rabbitMQ2; - } - - public DockerRabbitMQ getRabbitMQ3() { - return rabbitMQ3; - } - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java index f5f8fa2..6dd29af 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java @@ -30,15 +30,27 @@ import com.rabbitmq.client.Envelope; public class InMemoryConsumer extends DefaultConsumer { + @FunctionalInterface + interface Operation { + void perform(); + } + private final ConcurrentLinkedQueue<Integer> messages; + private final Operation operation; public InMemoryConsumer(Channel channel) { + this(channel, () -> {}); + } + + public InMemoryConsumer(Channel channel, Operation operation) { super(channel); - messages = new ConcurrentLinkedQueue<>(); + this.operation = operation; + this.messages = new ConcurrentLinkedQueue<>(); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + operation.perform(); Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); messages.add(payload); } http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java index 4a7dd07..d3f2cc1 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java @@ -30,116 +30,259 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; -import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster; +import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.Duration; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -@ExtendWith(DockerClusterRabbitMQExtention.class) +@ExtendWith(DockerClusterRabbitMQExtension.class) class RabbitMQClusterTest { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class); + private static final String QUEUE = "queue"; - private Connection node1Connection; - private Channel node1Channel; - private Connection node2Connection; - private Channel node2Channel; - - @BeforeEach - void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { - ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); - ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); - node1Connection = node1ConnectionFactory.newConnection(); - node2Connection = node2ConnectionFactory.newConnection(); - node1Channel = node1Connection.createChannel(); - node2Channel = node2Connection.createChannel(); - } + @Nested + class ClusterSharing { + + private ConnectionFactory node1ConnectionFactory; + private ConnectionFactory node2ConnectionFactory; + private Connection node1Connection; + private Connection node2Connection; + private Channel node1Channel; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node1Connection = node1ConnectionFactory.newConnection(); + node2Connection = node2ConnectionFactory.newConnection(); + node1Channel = node1Connection.createChannel(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); + } + + @Test + void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { + String stdout = cluster.getRabbitMQ1().container() + .execInContainer("rabbitmqctl", "cluster_status") + .getStdout(); + + assertThat(stdout) + .contains( + DockerClusterRabbitMQExtension.RABBIT_1, + DockerClusterRabbitMQExtension.RABBIT_2, + DockerClusterRabbitMQExtension.RABBIT_3); + } + + @Test + void queuesShouldBeShared() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + @Test + void queuesShouldBeDeclarableOnAnotherNode() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } - @AfterEach - void tearDown() { - closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); } - private void closeQuietly(AutoCloseable... closeables) { - for (AutoCloseable closeable : closeables) { + @Nested + class ClusterNodesFailure { + + private ConnectionFactory node1ConnectionFactory; + private Connection resilientConnection; + private Channel resilientChannel; + private Connection node2Connection; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); + resilientChannel = resilientConnection.createChannel(); + ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node2Connection = node2ConnectionFactory.newConnection(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(resilientConnection, resilientChannel); + } + + @Disabled("For some reason, we are unable to recover topology when reconnecting") + @Test + void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception { + resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 20; + int firstBatchSize = nbMessages / 2; + IntStream.range(0, firstBatchSize) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize); + + cluster.getRabbitMQ1().stop(); + + IntStream.range(firstBatchSize, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(this::tryPublishWithRetry); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void tryPublishWithRetry(byte[] bytes) { + Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes)); + } + + private boolean tryPublish(byte[] bytes) { try { - closeable.close(); + resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes); + return true; } catch (Exception e) { - //ignoring exception + LOGGER.error("failed publish", e); + return false; } } - } - @Test - void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { - String stdout = cluster.getRabbitMQ1().container() - .execInContainer("rabbitmqctl", "cluster_status") - .getStdout(); - - assertThat(stdout) - .contains( - DockerClusterRabbitMQExtention.RABBIT_1, - DockerClusterRabbitMQExtention.RABBIT_2, - DockerClusterRabbitMQExtention.RABBIT_3); - } + @Test + void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception { + ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory(); + cluster.getRabbitMQ3().stop(); - @Test - void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + try (Connection connection = node3ConnectionFactory.newConnection(cluster.getAddresses()); + Channel channel = connection.createChannel()) { - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); + InMemoryConsumer consumer = new InMemoryConsumer(channel); + channel.basicConsume(QUEUE, consumer); - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); - } + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + } - @Test - void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception { - node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); - node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); - node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + @Test + void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception { + node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); - int nbMessages = 10; - IntStream.range(0, nbMessages) - .mapToObj(i -> asBytes(String.valueOf(i))) - .forEach(Throwing.consumer( - bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); - InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); - node2Channel.basicConsume(QUEUE, consumer2); + AtomicInteger counter = new AtomicInteger(0); + InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, + () -> { + if (counter.incrementAndGet() == nbMessages / 2) { + cluster.getRabbitMQ1().stop(); + } + }); + resilientChannel.basicConsume(QUEUE, consumer); - awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } - List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList()); - assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void closeQuietly(AutoCloseable... closeables) { + Arrays.stream(closeables).forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + //ignore error + } } private byte[] asBytes(String message) { return message.getBytes(StandardCharsets.UTF_8); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
