JAMES-2545 Move RabbitMQ tests and utils to backend package
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/cc72f881 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/cc72f881 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/cc72f881 Branch: refs/heads/master Commit: cc72f8810ffcf4af6de7ee6260ad853389e490ef Parents: bf95a14 Author: Benoit Tellier <[email protected]> Authored: Mon Sep 10 11:12:51 2018 +0700 Committer: Antoine Duprat <[email protected]> Committed: Wed Sep 12 10:01:12 2018 +0200 ---------------------------------------------------------------------- backends-common/rabbitmq/pom.xml | 32 ++ .../DockerClusterRabbitMQExtension.java | 130 ++++++++ .../james/backend/mailqueue/DockerRabbitMQ.java | 167 ++++++++++ .../mailqueue/DockerRabbitMQExtension.java | 53 +++ .../mailqueue/DockerRabbitMQExtensionTest.java | 50 +++ .../backend/mailqueue/InMemoryConsumer.java | 61 ++++ .../backend/mailqueue/RabbitMQClusterTest.java | 294 +++++++++++++++++ .../backend/mailqueue/RabbitMQFixture.java | 50 +++ .../james/backend/mailqueue/RabbitMQTest.java | 330 +++++++++++++++++++ .../backend/mailqueue/RabbitMQWaitStrategy.java | 67 ++++ .../ReusableDockerRabbitMQExtension.java | 59 ++++ .../src/test/resources/logback-test.xml | 17 + pom.xml | 6 + server/queue/queue-rabbitmq/pom.xml | 10 + .../DockerClusterRabbitMQExtension.java | 130 -------- .../james/queue/rabbitmq/DockerRabbitMQ.java | 167 ---------- .../queue/rabbitmq/DockerRabbitMQExtension.java | 53 --- .../rabbitmq/DockerRabbitMQExtensionTest.java | 50 --- .../james/queue/rabbitmq/InMemoryConsumer.java | 61 ---- .../queue/rabbitmq/RabbitMQClusterTest.java | 294 ----------------- .../james/queue/rabbitmq/RabbitMQFixture.java | 50 --- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 2 + .../james/queue/rabbitmq/RabbitMQTest.java | 330 ------------------- .../queue/rabbitmq/RabbitMQWaitStrategy.java | 67 ---- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 2 + .../ReusableDockerRabbitMQExtension.java | 59 ---- .../src/test/resources/logback-test.xml | 2 +- 27 files changed, 1331 insertions(+), 1262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml index ee842f2..d518753 100644 --- a/backends-common/rabbitmq/pom.xml +++ b/backends-common/rabbitmq/pom.xml @@ -32,14 +32,41 @@ <dependencies> <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-core</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-util</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <dependency> + <groupId>com.jayway.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + </dependency> + <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> </dependency> <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + </dependency> + <dependency> <groupId>nl.jqno.equalsverifier</groupId> <artifactId>equalsverifier</artifactId> <scope>test</scope> @@ -64,6 +91,11 @@ <artifactId>jcl-over-slf4j</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java new file mode 100644 index 0000000..dd382d0 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java @@ -0,0 +1,130 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import java.nio.charset.StandardCharsets; + +import org.apache.james.util.Runnables; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.testcontainers.containers.Network; + +import com.github.fge.lambdas.Throwing; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; +import com.rabbitmq.client.Address; + +public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { + + public static final String RABBIT_1 = "rabbit1"; + public static final String RABBIT_2 = "rabbit2"; + public static final String RABBIT_3 = "rabbit3"; + private DockerRabbitMQCluster cluster; + private Network network; + + @Override + public void beforeEach(ExtensionContext context) { + String cookie = Hashing.sha256().hashString("secret cookie here", StandardCharsets.UTF_8).toString(); + + network = Network.NetworkImpl.builder() + .enableIpv6(false) + .createNetworkCmdModifiers(ImmutableList.of()) + .build(); + + DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network); + DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network); + DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network); + + Runnables.runParallel( + rabbitMQ1::start, + rabbitMQ2::start, + rabbitMQ3::start); + + Runnables.runParallel( + Throwing.runnable(() -> rabbitMQ2.join(rabbitMQ1)), + Throwing.runnable(() -> rabbitMQ3.join(rabbitMQ1))); + + + + Runnables.runParallel( + Throwing.runnable(rabbitMQ2::startApp), + Throwing.runnable(rabbitMQ3::startApp)); + + cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + cluster.stop(); + network.close(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return cluster; + } + + public static class DockerRabbitMQCluster { + + private final DockerRabbitMQ rabbitMQ1; + private final DockerRabbitMQ rabbitMQ2; + private final DockerRabbitMQ rabbitMQ3; + + public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) { + this.rabbitMQ1 = rabbitMQ1; + this.rabbitMQ2 = rabbitMQ2; + this.rabbitMQ3 = rabbitMQ3; + } + + public void stop() { + Runnables.runParallel( + Throwing.runnable(rabbitMQ1::stop).orDoNothing(), + Throwing.runnable(rabbitMQ2::stop).orDoNothing(), + Throwing.runnable(rabbitMQ3::stop).orDoNothing()); + } + + public DockerRabbitMQ getRabbitMQ1() { + return rabbitMQ1; + } + + public DockerRabbitMQ getRabbitMQ2() { + return rabbitMQ2; + } + + public DockerRabbitMQ getRabbitMQ3() { + return rabbitMQ3; + } + + public ImmutableList<Address> getAddresses() { + return ImmutableList.of( + new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()), + new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()), + new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort())); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java new file mode 100644 index 0000000..6c817cf --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java @@ -0,0 +1,167 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import java.util.Optional; +import java.util.UUID; + +import org.apache.james.util.docker.Images; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitAllStrategy; + +import com.google.common.collect.ImmutableMap; +import com.rabbitmq.client.ConnectionFactory; + +public class DockerRabbitMQ { + private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class); + + private static final String DEFAULT_RABBIT_NODE = "my-rabbit"; + private static final int DEFAULT_RABBITMQ_PORT = 5672; + private static final int DEFAULT_RABBITMQ_ADMIN_PORT = 15672; + private static final String DEFAULT_RABBITMQ_USERNAME = "guest"; + private static final String DEFAULT_RABBITMQ_PASSWORD = "guest"; + private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE"; + private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME"; + + private final GenericContainer<?> container; + private final Optional<String> nodeName; + + public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) { + return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName), + Optional.of(network)); + } + + public static DockerRabbitMQ withoutCookie() { + return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); + } + + @SuppressWarnings("resource") + private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) { + container = new GenericContainer<>(Images.RABBITMQ) + .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse(randomName()))) + .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE))) + .withExposedPorts(DEFAULT_RABBITMQ_PORT, DEFAULT_RABBITMQ_ADMIN_PORT) + .waitingFor(new WaitAllStrategy() + .withStrategy(Wait.forHttp("").forPort(DEFAULT_RABBITMQ_ADMIN_PORT)) + .withStrategy(RabbitMQWaitStrategy.withDefaultTimeout(this))) + .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String())) + .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig() + .withTmpFs(ImmutableMap.of("/var/lib/rabbitmq/mnesia", "rw,noexec,nosuid,size=100m"))); + net.ifPresent(container::withNetwork); + erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie)); + nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name)); + this.nodeName = nodeName; + } + + private String randomName() { + return UUID.randomUUID().toString(); + } + + public String getHostIp() { + return container.getContainerIpAddress(); + } + + public Integer getPort() { + return container.getMappedPort(DEFAULT_RABBITMQ_PORT); + } + + public Integer getAdminPort() { + return container.getMappedPort(DEFAULT_RABBITMQ_ADMIN_PORT); + } + + public String getUsername() { + return DEFAULT_RABBITMQ_USERNAME; + } + + public String getPassword() { + return DEFAULT_RABBITMQ_PASSWORD; + } + + public ConnectionFactory connectionFactory() { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(getHostIp()); + connectionFactory.setPort(getPort()); + connectionFactory.setUsername(getUsername()); + connectionFactory.setPassword(getPassword()); + return connectionFactory; + } + + public void start() { + container.start(); + } + + public void stop() { + container.stop(); + } + + public void restart() { + DockerClientFactory.instance().client() + .restartContainerCmd(container.getContainerId()); + } + + public GenericContainer<?> container() { + return container; + } + + public String node() { + return nodeName.get(); + } + + public void join(DockerRabbitMQ rabbitMQ) throws Exception { + stopApp(); + joinCluster(rabbitMQ); + } + + private void stopApp() throws java.io.IOException, InterruptedException { + String stdout = container() + .execInContainer("rabbitmqctl", "stop_app") + .getStdout(); + LOGGER.debug("stop_app: {}", stdout); + } + + private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException { + String stdout = container() + .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node()) + .getStdout(); + LOGGER.debug("join_cluster: {}", stdout); + } + + public void startApp() throws Exception { + String stdout = container() + .execInContainer("rabbitmqctl", "start_app") + .getStdout(); + LOGGER.debug("start_app: {}", stdout); + } + + public void reset() throws Exception { + stopApp(); + + String stdout = container() + .execInContainer("rabbitmqctl", "reset") + .getStdout(); + LOGGER.debug("reset: {}", stdout); + + startApp(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java new file mode 100644 index 0000000..ea11f81 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { + + private DockerRabbitMQ rabbitMQ; + + @Override + public void beforeEach(ExtensionContext context) { + rabbitMQ = DockerRabbitMQ.withoutCookie(); + rabbitMQ.start(); + } + + @Override + public void afterEach(ExtensionContext context) { + rabbitMQ.stop(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return rabbitMQ; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java new file mode 100644 index 0000000..aaabc5b --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@ExtendWith(DockerRabbitMQExtension.class) +public class DockerRabbitMQExtensionTest { + + private ConnectionFactory connectionFactory; + + @BeforeEach + public void setup(DockerRabbitMQ rabbitMQ) { + connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(rabbitMQ.getHostIp()); + connectionFactory.setPort(rabbitMQ.getPort()); + connectionFactory.setUsername(rabbitMQ.getUsername()); + connectionFactory.setPassword(rabbitMQ.getPassword()); + } + + @Test + public void containerShouldBeUp() throws Exception { + try (Connection connection = connectionFactory.newConnection()) { + assertThat(connection.isOpen()).isTrue(); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java new file mode 100644 index 0000000..e6fe021 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java @@ -0,0 +1,61 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class InMemoryConsumer extends DefaultConsumer { + + @FunctionalInterface + interface Operation { + void perform(); + } + + private final ConcurrentLinkedQueue<Integer> messages; + private final Operation operation; + + public InMemoryConsumer(Channel channel) { + this(channel, () -> { }); + } + + public InMemoryConsumer(Channel channel, Operation operation) { + super(channel); + this.operation = operation; + this.messages = new ConcurrentLinkedQueue<>(); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + operation.perform(); + Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); + messages.add(payload); + } + + public Queue<Integer> getConsumedMessages() { + return messages; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java new file mode 100644 index 0000000..5467840 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java @@ -0,0 +1,294 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_DELETE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.DIRECT; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.DURABLE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCHANGE_NAME; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCLUSIVE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.NO_PROPERTIES; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.ROUTING_KEY; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.james.backend.mailqueue.DockerClusterRabbitMQExtension.DockerRabbitMQCluster; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@ExtendWith(DockerClusterRabbitMQExtension.class) +class RabbitMQClusterTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class); + + private static final String QUEUE = "queue"; + + @Nested + class ClusterSharing { + + private ConnectionFactory node1ConnectionFactory; + private ConnectionFactory node2ConnectionFactory; + private Connection node1Connection; + private Connection node2Connection; + private Channel node1Channel; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node1Connection = node1ConnectionFactory.newConnection(); + node2Connection = node2ConnectionFactory.newConnection(); + node1Channel = node1Connection.createChannel(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); + } + + @Test + void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { + String stdout = cluster.getRabbitMQ1().container() + .execInContainer("rabbitmqctl", "cluster_status") + .getStdout(); + + assertThat(stdout) + .contains( + DockerClusterRabbitMQExtension.RABBIT_1, + DockerClusterRabbitMQExtension.RABBIT_2, + DockerClusterRabbitMQExtension.RABBIT_3); + } + + @Test + void queuesShouldBeShared() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + @Test + void queuesShouldBeDeclarableOnAnotherNode() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + } + + @Nested + class ClusterNodesFailure { + + private ConnectionFactory node1ConnectionFactory; + private Connection resilientConnection; + private Channel resilientChannel; + private Connection node2Connection; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); + resilientChannel = resilientConnection.createChannel(); + ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node2Connection = node2ConnectionFactory.newConnection(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(resilientConnection, resilientChannel); + } + + @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnecting" + + "See https://github.com/rabbitmq/rabbitmq-server/issues/959") + @Test + void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception { + resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 20; + int firstBatchSize = nbMessages / 2; + IntStream.range(0, firstBatchSize) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize); + + cluster.getRabbitMQ1().stop(); + + IntStream.range(firstBatchSize, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(this::tryPublishWithRetry); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void tryPublishWithRetry(byte[] bytes) { + Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes)); + } + + private boolean tryPublish(byte[] bytes) { + try { + resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes); + return true; + } catch (Exception e) { + LOGGER.error("failed publish", e); + return false; + } + } + + @Test + void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception { + ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory(); + ImmutableList<Address> clusterAddresses = cluster.getAddresses(); + cluster.getRabbitMQ3().stop(); + + try (Connection connection = node3ConnectionFactory.newConnection(clusterAddresses); + Channel channel = connection.createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(channel); + channel.basicConsume(QUEUE, consumer); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + } + + @Test + void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception { + node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + AtomicInteger counter = new AtomicInteger(0); + InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, + () -> stopWhenHalfProcessed(cluster, nbMessages, counter)); + resilientChannel.basicConsume(QUEUE, consumer); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void stopWhenHalfProcessed(DockerRabbitMQCluster cluster, int nbMessages, AtomicInteger counter) { + if (counter.incrementAndGet() == nbMessages / 2) { + cluster.getRabbitMQ1().stop(); + } + } + + } + + private void closeQuietly(AutoCloseable... closeables) { + Arrays.stream(closeables).forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + //ignore error + } + } + + private byte[] asBytes(String message) { + return message.getBytes(StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java new file mode 100644 index 0000000..8b83a96 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.mailqueue; + +import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Duration.ONE_MINUTE; + +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.awaitility.core.ConditionFactory; + +import com.rabbitmq.client.AMQP; + +public class RabbitMQFixture { + public static final boolean DURABLE = true; + public static final boolean AUTO_ACK = true; + public static final AMQP.BasicProperties NO_PROPERTIES = null; + public static final String EXCHANGE_NAME = "exchangeName"; + public static final String ROUTING_KEY = "routingKey"; + public static final String DIRECT = "direct"; + public static final boolean EXCLUSIVE = true; + public static final boolean AUTO_DELETE = true; + public static final String WORK_QUEUE = "workQueue"; + + public static Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; + public static ConditionFactory calmlyAwait = Awaitility.with() + .pollInterval(slowPacedPollInterval) + .and() + .with() + .pollDelay(slowPacedPollInterval) + .await(); + public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java new file mode 100644 index 0000000..f54ba25 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java @@ -0,0 +1,330 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_ACK; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_DELETE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.DIRECT; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.DURABLE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCHANGE_NAME; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCLUSIVE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.NO_PROPERTIES; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.ROUTING_KEY; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.WORK_QUEUE; +import static org.apache.james.backend.mailqueue.RabbitMQFixture.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@ExtendWith(DockerRabbitMQExtension.class) +class RabbitMQTest { + + @Nested + class SingleConsumerTest { + + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + + @BeforeEach + void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { + connectionFactory = rabbitMQ.connectionFactory(); + connection = connectionFactory.newConnection(); + channel = connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(connection, channel); + } + + @Test + void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception { + String queueName = createQueue(channel); + publishAMessage(channel); + awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName)); + } + + @Test + void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception { + String queueName = createQueue(channel); + publishAMessage(channel); + + rabbitMQ.restart(); + + awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ)); + assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull(); + } + + private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) { + try { + rabbitMQ.connectionFactory().newConnection(); + return true; + } catch (Exception e) { + return false; + } + } + + private String createQueue(Channel channel) throws IOException { + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); + return queueName; + } + + private void publishAMessage(Channel channel) throws IOException { + channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!")); + } + + private Boolean messageReceived(Channel channel, String queueName) { + try { + return channel.basicGet(queueName, !AUTO_ACK) != null; + } catch (Exception e) { + return false; + } + } + } + + @Nested + class FourConnections { + + private ConnectionFactory connectionFactory1; + private ConnectionFactory connectionFactory2; + private ConnectionFactory connectionFactory3; + private ConnectionFactory connectionFactory4; + private Connection connection1; + private Connection connection2; + private Connection connection3; + private Connection connection4; + private Channel channel1; + private Channel channel2; + private Channel channel3; + private Channel channel4; + + @BeforeEach + void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { + connectionFactory1 = rabbitMQ.connectionFactory(); + connectionFactory2 = rabbitMQ.connectionFactory(); + connectionFactory3 = rabbitMQ.connectionFactory(); + connectionFactory4 = rabbitMQ.connectionFactory(); + connection1 = connectionFactory1.newConnection(); + connection2 = connectionFactory2.newConnection(); + connection3 = connectionFactory3.newConnection(); + connection4 = connectionFactory4.newConnection(); + channel1 = connection1.createChannel(); + channel2 = connection2.createChannel(); + channel3 = connection3.createChannel(); + channel4 = connection4.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly( + channel1, channel2, channel3, channel4, + connection1, connection2, connection3, connection4); + } + + @Nested + class BroadCast { + + // In the following case, each consumer will receive the messages produced by the + // producer + // To do so, each consumer will bind it's queue to the producer exchange. + @Test + void rabbitMQShouldSupportTheBroadcastCase() throws Exception { + // Declare a single exchange and three queues attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue2 = channel2.queueDeclare().getQueue(); + channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); + String queue3 = channel3.queueDeclare().getQueue(); + channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); + String queue4 = channel4.queueDeclare().getQueue(); + channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel2.basicConsume(queue2, consumer2); + channel3.basicConsume(queue3, consumer3); + channel4.basicConsume(queue4, consumer4); + + // the publisher will produce 10 messages + IntStream.range(0, 10) + .mapToObj(String::valueOf) + .map(RabbitMQTest.this::asBytes) + .forEach(Throwing.consumer( + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + awaitAtMostOneMinute.until( + () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30); + + ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); + // Check every subscriber have receive all the messages. + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + } + + @Nested + class WorkQueue { + + // In the following case, consumers will receive the messages produced by the + // producer but will share them. + // To do so, we will bind a single queue to the producer exchange. + @Test + void rabbitMQShouldSupportTheWorkQueueCase() throws Exception { + int nbMessages = 100; + + // Declare the exchange and a single queue attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); + channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + // Publisher will produce 100 messages + IntStream.range(0, nbMessages) + .mapToObj(String::valueOf) + .map(RabbitMQTest.this::asBytes) + .forEach(Throwing.consumer( + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel2.basicConsume(WORK_QUEUE, consumer2); + channel3.basicConsume(WORK_QUEUE, consumer3); + channel4.basicConsume(WORK_QUEUE, consumer4); + + awaitAtMostOneMinute.until( + () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages); + + ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + + assertThat( + Iterables.concat( + consumer2.getConsumedMessages(), + consumer3.getConsumedMessages(), + consumer4.getConsumedMessages())) + .containsOnlyElementsOf(expectedResult); + } + + } + + @Nested + class Routing { + @Test + void rabbitMQShouldSupportRouting() throws Exception { + String conversation1 = "c1"; + String conversation2 = "c2"; + String conversation3 = "c3"; + String conversation4 = "c4"; + + // Declare the exchange and a single queue attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue1 = channel1.queueDeclare().getQueue(); + // 1 will follow conversation 1 and 2 + channel1.queueBind(queue1, EXCHANGE_NAME, conversation1); + channel1.queueBind(queue1, EXCHANGE_NAME, conversation2); + + String queue2 = channel2.queueDeclare().getQueue(); + // 2 will follow conversation 2 and 3 + channel2.queueBind(queue2, EXCHANGE_NAME, conversation2); + channel2.queueBind(queue2, EXCHANGE_NAME, conversation3); + + String queue3 = channel3.queueDeclare().getQueue(); + // 3 will follow conversation 3 and 4 + channel3.queueBind(queue3, EXCHANGE_NAME, conversation3); + channel3.queueBind(queue3, EXCHANGE_NAME, conversation4); + + String queue4 = channel4.queueDeclare().getQueue(); + // 4 will follow conversation 1 and 4 + channel4.queueBind(queue4, EXCHANGE_NAME, conversation1); + channel4.queueBind(queue4, EXCHANGE_NAME, conversation4); + + channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1")); + channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2")); + channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3")); + channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4")); + + InMemoryConsumer consumer1 = new InMemoryConsumer(channel1); + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel1.basicConsume(queue1, consumer1); + channel2.basicConsume(queue2, consumer2); + channel3.basicConsume(queue3, consumer3); + channel4.basicConsume(queue4, consumer4); + + awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8); + + assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2); + assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3); + assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4); + assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4); + } + } + + private long countReceivedMessages(InMemoryConsumer... consumers) { + return Arrays.stream(consumers) + .map(InMemoryConsumer::getConsumedMessages) + .mapToLong(Queue::size) + .sum(); + } + + } + + private void closeQuietly(AutoCloseable... closeables) { + Arrays.stream(closeables).forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + //ignore error + } + } + + private byte[] asBytes(String message) { + return message.getBytes(StandardCharsets.UTF_8); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java new file mode 100644 index 0000000..2f397ce --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.mailqueue; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; + +import com.google.common.primitives.Ints; +import com.rabbitmq.client.Connection; + +public class RabbitMQWaitStrategy implements WaitStrategy { + + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); + + public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) { + return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT); + } + + private final DockerRabbitMQ rabbitMQ; + private final Duration timeout; + + public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) { + this.rabbitMQ = rabbitMQ; + this.timeout = timeout; + } + + @Override + public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { + int seconds = Ints.checkedCast(this.timeout.getSeconds()); + + Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected); + } + + private Boolean isConnected() throws IOException, TimeoutException { + try (Connection connection = rabbitMQ.connectionFactory().newConnection()) { + return connection.isOpen(); + } + } + + @Override + public WaitStrategy withStartupTimeout(Duration startupTimeout) { + return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java new file mode 100644 index 0000000..12c385b --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java @@ -0,0 +1,59 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.mailqueue; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class ReusableDockerRabbitMQExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback, ParameterResolver { + + private DockerRabbitMQ rabbitMQ; + + @Override + public void beforeAll(ExtensionContext context) { + rabbitMQ = DockerRabbitMQ.withoutCookie(); + rabbitMQ.start(); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + rabbitMQ.reset(); + } + + @Override + public void afterAll(ExtensionContext extensionContext) { + rabbitMQ.stop(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return rabbitMQ; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/resources/logback-test.xml b/backends-common/rabbitmq/src/test/resources/logback-test.xml new file mode 100644 index 0000000..d1b28c0 --- /dev/null +++ b/backends-common/rabbitmq/src/test/resources/logback-test.xml @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <immediateFlush>false</immediateFlush> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern> + </encoder> + </appender> + + <logger name="org.apache.james" level="WARN"/> + + <root level="ERROR"> + <appender-ref ref="CONSOLE" /> + </root> + + +</configuration> http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e552d7d..79669fc 100644 --- a/pom.xml +++ b/pom.xml @@ -707,6 +707,12 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>apache-james-backends-rabbitmq</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>apache-james-spamassassin</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml index 2884684..3a70738 100644 --- a/server/queue/queue-rabbitmq/pom.xml +++ b/server/queue/queue-rabbitmq/pom.xml @@ -38,6 +38,16 @@ <dependencies> <dependency> + <groupId>org.apache.james</groupId> + <artifactId>apache-james-backends-rabbitmq</artifactId> + </dependency> + <dependency> + <groupId>org.apache.james</groupId> + <artifactId>apache-james-backends-rabbitmq</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>${james.groupId}</groupId> <artifactId>apache-james-backends-cassandra</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java deleted file mode 100644 index 5e24840..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java +++ /dev/null @@ -1,130 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import java.nio.charset.StandardCharsets; - -import org.apache.james.util.Runnables; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; -import org.testcontainers.containers.Network; - -import com.github.fge.lambdas.Throwing; -import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; -import com.rabbitmq.client.Address; - -public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { - - public static final String RABBIT_1 = "rabbit1"; - public static final String RABBIT_2 = "rabbit2"; - public static final String RABBIT_3 = "rabbit3"; - private DockerRabbitMQCluster cluster; - private Network network; - - @Override - public void beforeEach(ExtensionContext context) { - String cookie = Hashing.sha256().hashString("secret cookie here", StandardCharsets.UTF_8).toString(); - - network = Network.NetworkImpl.builder() - .enableIpv6(false) - .createNetworkCmdModifiers(ImmutableList.of()) - .build(); - - DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network); - DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network); - DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network); - - Runnables.runParallel( - rabbitMQ1::start, - rabbitMQ2::start, - rabbitMQ3::start); - - Runnables.runParallel( - Throwing.runnable(() -> rabbitMQ2.join(rabbitMQ1)), - Throwing.runnable(() -> rabbitMQ3.join(rabbitMQ1))); - - - - Runnables.runParallel( - Throwing.runnable(rabbitMQ2::startApp), - Throwing.runnable(rabbitMQ3::startApp)); - - cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3); - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - cluster.stop(); - network.close(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return cluster; - } - - public static class DockerRabbitMQCluster { - - private final DockerRabbitMQ rabbitMQ1; - private final DockerRabbitMQ rabbitMQ2; - private final DockerRabbitMQ rabbitMQ3; - - public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) { - this.rabbitMQ1 = rabbitMQ1; - this.rabbitMQ2 = rabbitMQ2; - this.rabbitMQ3 = rabbitMQ3; - } - - public void stop() { - Runnables.runParallel( - Throwing.runnable(rabbitMQ1::stop).orDoNothing(), - Throwing.runnable(rabbitMQ2::stop).orDoNothing(), - Throwing.runnable(rabbitMQ3::stop).orDoNothing()); - } - - public DockerRabbitMQ getRabbitMQ1() { - return rabbitMQ1; - } - - public DockerRabbitMQ getRabbitMQ2() { - return rabbitMQ2; - } - - public DockerRabbitMQ getRabbitMQ3() { - return rabbitMQ3; - } - - public ImmutableList<Address> getAddresses() { - return ImmutableList.of( - new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()), - new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()), - new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort())); - } - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java deleted file mode 100644 index bba3315..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java +++ /dev/null @@ -1,167 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import java.util.Optional; -import java.util.UUID; - -import org.apache.james.util.docker.Images; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.containers.wait.strategy.WaitAllStrategy; - -import com.google.common.collect.ImmutableMap; -import com.rabbitmq.client.ConnectionFactory; - -public class DockerRabbitMQ { - private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class); - - private static final String DEFAULT_RABBIT_NODE = "my-rabbit"; - private static final int DEFAULT_RABBITMQ_PORT = 5672; - private static final int DEFAULT_RABBITMQ_ADMIN_PORT = 15672; - private static final String DEFAULT_RABBITMQ_USERNAME = "guest"; - private static final String DEFAULT_RABBITMQ_PASSWORD = "guest"; - private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE"; - private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME"; - - private final GenericContainer<?> container; - private final Optional<String> nodeName; - - public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) { - return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName), - Optional.of(network)); - } - - public static DockerRabbitMQ withoutCookie() { - return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); - } - - @SuppressWarnings("resource") - private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) { - container = new GenericContainer<>(Images.RABBITMQ) - .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse(randomName()))) - .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE))) - .withExposedPorts(DEFAULT_RABBITMQ_PORT, DEFAULT_RABBITMQ_ADMIN_PORT) - .waitingFor(new WaitAllStrategy() - .withStrategy(Wait.forHttp("").forPort(DEFAULT_RABBITMQ_ADMIN_PORT)) - .withStrategy(RabbitMQWaitStrategy.withDefaultTimeout(this))) - .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String())) - .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig() - .withTmpFs(ImmutableMap.of("/var/lib/rabbitmq/mnesia", "rw,noexec,nosuid,size=100m"))); - net.ifPresent(container::withNetwork); - erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie)); - nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name)); - this.nodeName = nodeName; - } - - private String randomName() { - return UUID.randomUUID().toString(); - } - - public String getHostIp() { - return container.getContainerIpAddress(); - } - - public Integer getPort() { - return container.getMappedPort(DEFAULT_RABBITMQ_PORT); - } - - public Integer getAdminPort() { - return container.getMappedPort(DEFAULT_RABBITMQ_ADMIN_PORT); - } - - public String getUsername() { - return DEFAULT_RABBITMQ_USERNAME; - } - - public String getPassword() { - return DEFAULT_RABBITMQ_PASSWORD; - } - - public ConnectionFactory connectionFactory() { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setHost(getHostIp()); - connectionFactory.setPort(getPort()); - connectionFactory.setUsername(getUsername()); - connectionFactory.setPassword(getPassword()); - return connectionFactory; - } - - public void start() { - container.start(); - } - - public void stop() { - container.stop(); - } - - public void restart() { - DockerClientFactory.instance().client() - .restartContainerCmd(container.getContainerId()); - } - - public GenericContainer<?> container() { - return container; - } - - public String node() { - return nodeName.get(); - } - - public void join(DockerRabbitMQ rabbitMQ) throws Exception { - stopApp(); - joinCluster(rabbitMQ); - } - - private void stopApp() throws java.io.IOException, InterruptedException { - String stdout = container() - .execInContainer("rabbitmqctl", "stop_app") - .getStdout(); - LOGGER.debug("stop_app: {}", stdout); - } - - private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException { - String stdout = container() - .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node()) - .getStdout(); - LOGGER.debug("join_cluster: {}", stdout); - } - - public void startApp() throws Exception { - String stdout = container() - .execInContainer("rabbitmqctl", "start_app") - .getStdout(); - LOGGER.debug("start_app: {}", stdout); - } - - public void reset() throws Exception { - stopApp(); - - String stdout = container() - .execInContainer("rabbitmqctl", "reset") - .getStdout(); - LOGGER.debug("reset: {}", stdout); - - startApp(); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java deleted file mode 100644 index b0cbfd7..0000000 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java +++ /dev/null @@ -1,53 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.queue.rabbitmq; - -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; - -public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { - - private DockerRabbitMQ rabbitMQ; - - @Override - public void beforeEach(ExtensionContext context) { - rabbitMQ = DockerRabbitMQ.withoutCookie(); - rabbitMQ.start(); - } - - @Override - public void afterEach(ExtensionContext context) { - rabbitMQ.stop(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return rabbitMQ; - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
