http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java deleted file mode 100644 index 2f397ce..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java +++ /dev/null @@ -1,67 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backend.mailqueue; - -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.rnorth.ducttape.unreliables.Unreliables; -import org.testcontainers.containers.wait.strategy.WaitStrategy; -import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; - -import com.google.common.primitives.Ints; -import com.rabbitmq.client.Connection; - -public class RabbitMQWaitStrategy implements WaitStrategy { - - private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); - - public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) { - return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT); - } - - private final DockerRabbitMQ rabbitMQ; - private final Duration timeout; - - public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) { - this.rabbitMQ = rabbitMQ; - this.timeout = timeout; - } - - @Override - public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { - int seconds = Ints.checkedCast(this.timeout.getSeconds()); - - Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected); - } - - private Boolean isConnected() throws IOException, TimeoutException { - try (Connection connection = rabbitMQ.connectionFactory().newConnection()) { - return connection.isOpen(); - } - } - - @Override - public WaitStrategy withStartupTimeout(Duration startupTimeout) { - return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout); - } -}
http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java deleted file mode 100644 index 12c385b..0000000 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java +++ /dev/null @@ -1,59 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.backend.mailqueue; - -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.ParameterContext; -import org.junit.jupiter.api.extension.ParameterResolutionException; -import org.junit.jupiter.api.extension.ParameterResolver; - -public class ReusableDockerRabbitMQExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback, ParameterResolver { - - private DockerRabbitMQ rabbitMQ; - - @Override - public void beforeAll(ExtensionContext context) { - rabbitMQ = DockerRabbitMQ.withoutCookie(); - rabbitMQ.start(); - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - rabbitMQ.reset(); - } - - @Override - public void afterAll(ExtensionContext extensionContext) { - rabbitMQ.stop(); - } - - @Override - public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); - } - - @Override - public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return rabbitMQ; - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerClusterRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerClusterRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerClusterRabbitMQExtension.java new file mode 100644 index 0000000..99e95da --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerClusterRabbitMQExtension.java @@ -0,0 +1,130 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import java.nio.charset.StandardCharsets; + +import org.apache.james.util.Runnables; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.testcontainers.containers.Network; + +import com.github.fge.lambdas.Throwing; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; +import com.rabbitmq.client.Address; + +public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { + + public static final String RABBIT_1 = "rabbit1"; + public static final String RABBIT_2 = "rabbit2"; + public static final String RABBIT_3 = "rabbit3"; + private DockerRabbitMQCluster cluster; + private Network network; + + @Override + public void beforeEach(ExtensionContext context) { + String cookie = Hashing.sha256().hashString("secret cookie here", StandardCharsets.UTF_8).toString(); + + network = Network.NetworkImpl.builder() + .enableIpv6(false) + .createNetworkCmdModifiers(ImmutableList.of()) + .build(); + + DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network); + DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network); + DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network); + + Runnables.runParallel( + rabbitMQ1::start, + rabbitMQ2::start, + rabbitMQ3::start); + + Runnables.runParallel( + Throwing.runnable(() -> rabbitMQ2.join(rabbitMQ1)), + Throwing.runnable(() -> rabbitMQ3.join(rabbitMQ1))); + + + + Runnables.runParallel( + Throwing.runnable(rabbitMQ2::startApp), + Throwing.runnable(rabbitMQ3::startApp)); + + cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + cluster.stop(); + network.close(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return cluster; + } + + public static class DockerRabbitMQCluster { + + private final DockerRabbitMQ rabbitMQ1; + private final DockerRabbitMQ rabbitMQ2; + private final DockerRabbitMQ rabbitMQ3; + + public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) { + this.rabbitMQ1 = rabbitMQ1; + this.rabbitMQ2 = rabbitMQ2; + this.rabbitMQ3 = rabbitMQ3; + } + + public void stop() { + Runnables.runParallel( + Throwing.runnable(rabbitMQ1::stop).orDoNothing(), + Throwing.runnable(rabbitMQ2::stop).orDoNothing(), + Throwing.runnable(rabbitMQ3::stop).orDoNothing()); + } + + public DockerRabbitMQ getRabbitMQ1() { + return rabbitMQ1; + } + + public DockerRabbitMQ getRabbitMQ2() { + return rabbitMQ2; + } + + public DockerRabbitMQ getRabbitMQ3() { + return rabbitMQ3; + } + + public ImmutableList<Address> getAddresses() { + return ImmutableList.of( + new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()), + new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()), + new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort())); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java new file mode 100644 index 0000000..d98313b --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java @@ -0,0 +1,167 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import java.util.Optional; +import java.util.UUID; + +import org.apache.james.util.docker.Images; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitAllStrategy; + +import com.google.common.collect.ImmutableMap; +import com.rabbitmq.client.ConnectionFactory; + +public class DockerRabbitMQ { + private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class); + + private static final String DEFAULT_RABBIT_NODE = "my-rabbit"; + private static final int DEFAULT_RABBITMQ_PORT = 5672; + private static final int DEFAULT_RABBITMQ_ADMIN_PORT = 15672; + private static final String DEFAULT_RABBITMQ_USERNAME = "guest"; + private static final String DEFAULT_RABBITMQ_PASSWORD = "guest"; + private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE"; + private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME"; + + private final GenericContainer<?> container; + private final Optional<String> nodeName; + + public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) { + return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName), + Optional.of(network)); + } + + public static DockerRabbitMQ withoutCookie() { + return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); + } + + @SuppressWarnings("resource") + private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) { + container = new GenericContainer<>(Images.RABBITMQ) + .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse(randomName()))) + .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE))) + .withExposedPorts(DEFAULT_RABBITMQ_PORT, DEFAULT_RABBITMQ_ADMIN_PORT) + .waitingFor(new WaitAllStrategy() + .withStrategy(Wait.forHttp("").forPort(DEFAULT_RABBITMQ_ADMIN_PORT)) + .withStrategy(RabbitMQWaitStrategy.withDefaultTimeout(this))) + .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String())) + .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig() + .withTmpFs(ImmutableMap.of("/var/lib/rabbitmq/mnesia", "rw,noexec,nosuid,size=100m"))); + net.ifPresent(container::withNetwork); + erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie)); + nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name)); + this.nodeName = nodeName; + } + + private String randomName() { + return UUID.randomUUID().toString(); + } + + public String getHostIp() { + return container.getContainerIpAddress(); + } + + public Integer getPort() { + return container.getMappedPort(DEFAULT_RABBITMQ_PORT); + } + + public Integer getAdminPort() { + return container.getMappedPort(DEFAULT_RABBITMQ_ADMIN_PORT); + } + + public String getUsername() { + return DEFAULT_RABBITMQ_USERNAME; + } + + public String getPassword() { + return DEFAULT_RABBITMQ_PASSWORD; + } + + public ConnectionFactory connectionFactory() { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(getHostIp()); + connectionFactory.setPort(getPort()); + connectionFactory.setUsername(getUsername()); + connectionFactory.setPassword(getPassword()); + return connectionFactory; + } + + public void start() { + container.start(); + } + + public void stop() { + container.stop(); + } + + public void restart() { + DockerClientFactory.instance().client() + .restartContainerCmd(container.getContainerId()); + } + + public GenericContainer<?> container() { + return container; + } + + public String node() { + return nodeName.get(); + } + + public void join(DockerRabbitMQ rabbitMQ) throws Exception { + stopApp(); + joinCluster(rabbitMQ); + } + + public void stopApp() throws java.io.IOException, InterruptedException { + String stdout = container() + .execInContainer("rabbitmqctl", "stop_app") + .getStdout(); + LOGGER.debug("stop_app: {}", stdout); + } + + private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException { + String stdout = container() + .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node()) + .getStdout(); + LOGGER.debug("join_cluster: {}", stdout); + } + + public void startApp() throws Exception { + String stdout = container() + .execInContainer("rabbitmqctl", "start_app") + .getStdout(); + LOGGER.debug("start_app: {}", stdout); + } + + public void reset() throws Exception { + stopApp(); + + String stdout = container() + .execInContainer("rabbitmqctl", "reset") + .getStdout(); + LOGGER.debug("reset: {}", stdout); + + startApp(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtension.java new file mode 100644 index 0000000..9c0a00e --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtension.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver { + + private DockerRabbitMQ rabbitMQ; + + @Override + public void beforeEach(ExtensionContext context) { + rabbitMQ = DockerRabbitMQ.withoutCookie(); + rabbitMQ.start(); + } + + @Override + public void afterEach(ExtensionContext context) { + rabbitMQ.stop(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return rabbitMQ; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtensionTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtensionTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtensionTest.java new file mode 100644 index 0000000..687ac31 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQExtensionTest.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@ExtendWith(DockerRabbitMQExtension.class) +public class DockerRabbitMQExtensionTest { + + private ConnectionFactory connectionFactory; + + @BeforeEach + public void setup(DockerRabbitMQ rabbitMQ) { + connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(rabbitMQ.getHostIp()); + connectionFactory.setPort(rabbitMQ.getPort()); + connectionFactory.setUsername(rabbitMQ.getUsername()); + connectionFactory.setPassword(rabbitMQ.getPassword()); + } + + @Test + public void containerShouldBeUp() throws Exception { + try (Connection connection = connectionFactory.newConnection()) { + assertThat(connection.isOpen()).isTrue(); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java new file mode 100644 index 0000000..712c70d --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/InMemoryConsumer.java @@ -0,0 +1,61 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +public class InMemoryConsumer extends DefaultConsumer { + + @FunctionalInterface + interface Operation { + void perform(); + } + + private final ConcurrentLinkedQueue<Integer> messages; + private final Operation operation; + + public InMemoryConsumer(Channel channel) { + this(channel, () -> { }); + } + + public InMemoryConsumer(Channel channel, Operation operation) { + super(channel); + this.operation = operation; + this.messages = new ConcurrentLinkedQueue<>(); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + operation.perform(); + Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8)); + messages.add(payload); + } + + public Queue<Integer> getConsumedMessages() { + return messages; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java new file mode 100644 index 0000000..6486a84 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQClusterTest.java @@ -0,0 +1,294 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.AUTO_DELETE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DIRECT; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DURABLE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.EXCLUSIVE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.NO_PROPERTIES; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.ROUTING_KEY; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.james.backend.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@ExtendWith(DockerClusterRabbitMQExtension.class) +class RabbitMQClusterTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class); + + private static final String QUEUE = "queue"; + + @Nested + class ClusterSharing { + + private ConnectionFactory node1ConnectionFactory; + private ConnectionFactory node2ConnectionFactory; + private Connection node1Connection; + private Connection node2Connection; + private Channel node1Channel; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node1Connection = node1ConnectionFactory.newConnection(); + node2Connection = node2ConnectionFactory.newConnection(); + node1Channel = node1Connection.createChannel(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection); + } + + @Test + void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception { + String stdout = cluster.getRabbitMQ1().container() + .execInContainer("rabbitmqctl", "cluster_status") + .getStdout(); + + assertThat(stdout) + .contains( + DockerClusterRabbitMQExtension.RABBIT_1, + DockerClusterRabbitMQExtension.RABBIT_2, + DockerClusterRabbitMQExtension.RABBIT_3); + } + + @Test + void queuesShouldBeShared() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + @Test + void queuesShouldBeDeclarableOnAnotherNode() throws Exception { + node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer2); + + awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + } + + @Nested + class ClusterNodesFailure { + + private ConnectionFactory node1ConnectionFactory; + private Connection resilientConnection; + private Channel resilientChannel; + private Connection node2Connection; + private Channel node2Channel; + + @BeforeEach + void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException { + node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory(); + resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses()); + resilientChannel = resilientConnection.createChannel(); + ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory(); + node2Connection = node2ConnectionFactory.newConnection(); + node2Channel = node2Connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(resilientConnection, resilientChannel); + } + + @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnecting" + + "See https://github.com/rabbitmq/rabbitmq-server/issues/959") + @Test + void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception { + resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 20; + int firstBatchSize = nbMessages / 2; + IntStream.range(0, firstBatchSize) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(node2Channel); + node2Channel.basicConsume(QUEUE, consumer); + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize); + + cluster.getRabbitMQ1().stop(); + + IntStream.range(firstBatchSize, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(this::tryPublishWithRetry); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void tryPublishWithRetry(byte[] bytes) { + Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes)); + } + + private boolean tryPublish(byte[] bytes) { + try { + resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes); + return true; + } catch (Exception e) { + LOGGER.error("failed publish", e); + return false; + } + } + + @Test + void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception { + ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory(); + ImmutableList<Address> clusterAddresses = cluster.getAddresses(); + cluster.getRabbitMQ3().stop(); + + try (Connection connection = node3ConnectionFactory.newConnection(clusterAddresses); + Channel channel = connection.createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer = new InMemoryConsumer(channel); + channel.basicConsume(QUEUE, consumer); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + } + + @Test + void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception { + node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue(); + node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + int nbMessages = 10; + IntStream.range(0, nbMessages) + .mapToObj(i -> asBytes(String.valueOf(i))) + .forEach(Throwing.consumer( + bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + AtomicInteger counter = new AtomicInteger(0); + InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel, + () -> stopWhenHalfProcessed(cluster, nbMessages, counter)); + resilientChannel.basicConsume(QUEUE, consumer); + + awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages); + + List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + + private void stopWhenHalfProcessed(DockerRabbitMQCluster cluster, int nbMessages, AtomicInteger counter) { + if (counter.incrementAndGet() == nbMessages / 2) { + cluster.getRabbitMQ1().stop(); + } + } + + } + + private void closeQuietly(AutoCloseable... closeables) { + Arrays.stream(closeables).forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + //ignore error + } + } + + private byte[] asBytes(String message) { + return message.getBytes(StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConfigurationTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConfigurationTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConfigurationTest.java new file mode 100644 index 0000000..3066e48 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConfigurationTest.java @@ -0,0 +1,134 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.net.URI; + +import org.apache.commons.configuration.PropertiesConfiguration; +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +class RabbitMQConfigurationTest { + + @Test + void shouldRespectBeanContract() { + EqualsVerifier.forClass(RabbitMQConfiguration.class).verify(); + } + + @Test + void fromShouldThrowWhenURIIsNotInTheConfiguration() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify the URI of RabbitMQ"); + } + + @Test + void fromShouldThrowWhenURIIsNull() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", null); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify the URI of RabbitMQ"); + } + + @Test + void fromShouldThrowWhenURIIsEmpty() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", ""); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify the URI of RabbitMQ"); + } + + @Test + void fromShouldThrowWhenURIIsInvalid() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", ":invalid"); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify a valid URI"); + } + + @Test + void fromShouldThrowWhenManagementURIIsNotInTheConfiguration() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify the management URI of RabbitMQ"); + } + + @Test + void fromShouldThrowWhenManagementURIIsNull() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); + configuration.addProperty("management.uri", null); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify the management URI of RabbitMQ"); + } + + @Test + void fromShouldThrowWhenManagementURIIsEmpty() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); + configuration.addProperty("management.uri", ""); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify the management URI of RabbitMQ"); + } + + @Test + void fromShouldThrowWhenManagementURIIsInvalid() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("uri", "amqp://james:james@rabbitmq_host:5672"); + configuration.addProperty("management.uri", ":invalid"); + + assertThatThrownBy(() -> RabbitMQConfiguration.from(configuration)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("You need to specify a valid URI"); + } + + @Test + void fromShouldReturnTheConfigurationWhenRequiredParametersAreGiven() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + String amqpUri = "amqp://james:james@rabbitmq_host:5672"; + configuration.addProperty("uri", amqpUri); + String managementUri = "http://james:james@rabbitmq_host:15672/api/"; + configuration.addProperty("management.uri", managementUri); + + assertThat(RabbitMQConfiguration.from(configuration)) + .isEqualTo(RabbitMQConfiguration.builder() + .amqpUri(URI.create(amqpUri)) + .managementUri(URI.create(managementUri)) + .build()); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQFixture.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQFixture.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQFixture.java new file mode 100644 index 0000000..f8d62c9 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQFixture.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Duration.ONE_MINUTE; + +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.awaitility.core.ConditionFactory; + +import com.rabbitmq.client.AMQP; + +public class RabbitMQFixture { + public static final boolean DURABLE = true; + public static final boolean AUTO_ACK = true; + public static final AMQP.BasicProperties NO_PROPERTIES = null; + public static final String EXCHANGE_NAME = "exchangeName"; + public static final String ROUTING_KEY = "routingKey"; + public static final String DIRECT = "direct"; + public static final boolean EXCLUSIVE = true; + public static final boolean AUTO_DELETE = true; + public static final String WORK_QUEUE = "workQueue"; + + public static Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; + public static ConditionFactory calmlyAwait = Awaitility.with() + .pollInterval(slowPacedPollInterval) + .and() + .with() + .pollDelay(slowPacedPollInterval) + .await(); + public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java new file mode 100644 index 0000000..4ba13fe --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java @@ -0,0 +1,83 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; + +import org.apache.james.core.healthcheck.Result; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DockerRabbitMQExtension.class) +class RabbitMQHealthCheckTest { + private RabbitMQHealthCheck healthCheck; + + @BeforeEach + void setUp(DockerRabbitMQ rabbitMQ) throws Exception { + URI amqpUri = URI.create("amqp://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getPort()); + URI managementUri = URI.create("http://" + rabbitMQ.getHostIp() + ":" + rabbitMQ.getAdminPort()); + + healthCheck = new RabbitMQHealthCheck( + RabbitMQConfiguration.builder() + .amqpUri(amqpUri) + .managementUri(managementUri) + .build()); + } + + @Test + void checkShouldReturnHealthyWhenRabbitMQIsRunning() { + Result check = healthCheck.check(); + + assertThat(check.isHealthy()).isTrue(); + } + + @Test + void checkShouldReturnUnhealthyWhenRabbitMQIsNotRunning(DockerRabbitMQ rabbitMQ) throws Exception { + rabbitMQ.stopApp(); + + Result check = healthCheck.check(); + + assertThat(check.isHealthy()).isFalse(); + } + + @Test + void checkShouldDetectWhenRabbitMQRecovered(DockerRabbitMQ rabbitMQ) throws Exception { + rabbitMQ.stopApp(); + healthCheck.check(); + + rabbitMQ.startApp(); + + Result check = healthCheck.check(); + assertThat(check.isHealthy()).isTrue(); + } + + @Test + void checkShouldDetectWhenRabbitMQFail(DockerRabbitMQ rabbitMQ) throws Exception { + healthCheck.check(); + + rabbitMQ.stopApp(); + + Result check = healthCheck.check(); + assertThat(check.isHealthy()).isFalse(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java new file mode 100644 index 0000000..b2fb28e --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQTest.java @@ -0,0 +1,330 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.AUTO_ACK; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.AUTO_DELETE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DIRECT; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DURABLE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.EXCHANGE_NAME; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.EXCLUSIVE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.NO_PROPERTIES; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.ROUTING_KEY; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.WORK_QUEUE; +import static org.apache.james.backend.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@ExtendWith(DockerRabbitMQExtension.class) +class RabbitMQTest { + + @Nested + class SingleConsumerTest { + + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + + @BeforeEach + void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { + connectionFactory = rabbitMQ.connectionFactory(); + connection = connectionFactory.newConnection(); + channel = connection.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly(connection, channel); + } + + @Test + void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception { + String queueName = createQueue(channel); + publishAMessage(channel); + awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName)); + } + + @Test + void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception { + String queueName = createQueue(channel); + publishAMessage(channel); + + rabbitMQ.restart(); + + awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ)); + assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull(); + } + + private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) { + try { + rabbitMQ.connectionFactory().newConnection(); + return true; + } catch (Exception e) { + return false; + } + } + + private String createQueue(Channel channel) throws IOException { + channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); + return queueName; + } + + private void publishAMessage(Channel channel) throws IOException { + channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!")); + } + + private Boolean messageReceived(Channel channel, String queueName) { + try { + return channel.basicGet(queueName, !AUTO_ACK) != null; + } catch (Exception e) { + return false; + } + } + } + + @Nested + class FourConnections { + + private ConnectionFactory connectionFactory1; + private ConnectionFactory connectionFactory2; + private ConnectionFactory connectionFactory3; + private ConnectionFactory connectionFactory4; + private Connection connection1; + private Connection connection2; + private Connection connection3; + private Connection connection4; + private Channel channel1; + private Channel channel2; + private Channel channel3; + private Channel channel4; + + @BeforeEach + void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException { + connectionFactory1 = rabbitMQ.connectionFactory(); + connectionFactory2 = rabbitMQ.connectionFactory(); + connectionFactory3 = rabbitMQ.connectionFactory(); + connectionFactory4 = rabbitMQ.connectionFactory(); + connection1 = connectionFactory1.newConnection(); + connection2 = connectionFactory2.newConnection(); + connection3 = connectionFactory3.newConnection(); + connection4 = connectionFactory4.newConnection(); + channel1 = connection1.createChannel(); + channel2 = connection2.createChannel(); + channel3 = connection3.createChannel(); + channel4 = connection4.createChannel(); + } + + @AfterEach + void tearDown() { + closeQuietly( + channel1, channel2, channel3, channel4, + connection1, connection2, connection3, connection4); + } + + @Nested + class BroadCast { + + // In the following case, each consumer will receive the messages produced by the + // producer + // To do so, each consumer will bind it's queue to the producer exchange. + @Test + void rabbitMQShouldSupportTheBroadcastCase() throws Exception { + // Declare a single exchange and three queues attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue2 = channel2.queueDeclare().getQueue(); + channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY); + String queue3 = channel3.queueDeclare().getQueue(); + channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY); + String queue4 = channel4.queueDeclare().getQueue(); + channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY); + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel2.basicConsume(queue2, consumer2); + channel3.basicConsume(queue3, consumer3); + channel4.basicConsume(queue4, consumer4); + + // the publisher will produce 10 messages + IntStream.range(0, 10) + .mapToObj(String::valueOf) + .map(RabbitMQTest.this::asBytes) + .forEach(Throwing.consumer( + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + awaitAtMostOneMinute.until( + () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30); + + ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList()); + // Check every subscriber have receive all the messages. + assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult); + } + } + + @Nested + class WorkQueue { + + // In the following case, consumers will receive the messages produced by the + // producer but will share them. + // To do so, we will bind a single queue to the producer exchange. + @Test + void rabbitMQShouldSupportTheWorkQueueCase() throws Exception { + int nbMessages = 100; + + // Declare the exchange and a single queue attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE); + channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); + channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY); + + // Publisher will produce 100 messages + IntStream.range(0, nbMessages) + .mapToObj(String::valueOf) + .map(RabbitMQTest.this::asBytes) + .forEach(Throwing.consumer( + bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes))); + + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel2.basicConsume(WORK_QUEUE, consumer2); + channel3.basicConsume(WORK_QUEUE, consumer3); + channel4.basicConsume(WORK_QUEUE, consumer4); + + awaitAtMostOneMinute.until( + () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages); + + ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList()); + + assertThat( + Iterables.concat( + consumer2.getConsumedMessages(), + consumer3.getConsumedMessages(), + consumer4.getConsumedMessages())) + .containsOnlyElementsOf(expectedResult); + } + + } + + @Nested + class Routing { + @Test + void rabbitMQShouldSupportRouting() throws Exception { + String conversation1 = "c1"; + String conversation2 = "c2"; + String conversation3 = "c3"; + String conversation4 = "c4"; + + // Declare the exchange and a single queue attached to it. + channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE); + + String queue1 = channel1.queueDeclare().getQueue(); + // 1 will follow conversation 1 and 2 + channel1.queueBind(queue1, EXCHANGE_NAME, conversation1); + channel1.queueBind(queue1, EXCHANGE_NAME, conversation2); + + String queue2 = channel2.queueDeclare().getQueue(); + // 2 will follow conversation 2 and 3 + channel2.queueBind(queue2, EXCHANGE_NAME, conversation2); + channel2.queueBind(queue2, EXCHANGE_NAME, conversation3); + + String queue3 = channel3.queueDeclare().getQueue(); + // 3 will follow conversation 3 and 4 + channel3.queueBind(queue3, EXCHANGE_NAME, conversation3); + channel3.queueBind(queue3, EXCHANGE_NAME, conversation4); + + String queue4 = channel4.queueDeclare().getQueue(); + // 4 will follow conversation 1 and 4 + channel4.queueBind(queue4, EXCHANGE_NAME, conversation1); + channel4.queueBind(queue4, EXCHANGE_NAME, conversation4); + + channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1")); + channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2")); + channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3")); + channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4")); + + InMemoryConsumer consumer1 = new InMemoryConsumer(channel1); + InMemoryConsumer consumer2 = new InMemoryConsumer(channel2); + InMemoryConsumer consumer3 = new InMemoryConsumer(channel3); + InMemoryConsumer consumer4 = new InMemoryConsumer(channel4); + channel1.basicConsume(queue1, consumer1); + channel2.basicConsume(queue2, consumer2); + channel3.basicConsume(queue3, consumer3); + channel4.basicConsume(queue4, consumer4); + + awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8); + + assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2); + assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3); + assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4); + assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4); + } + } + + private long countReceivedMessages(InMemoryConsumer... consumers) { + return Arrays.stream(consumers) + .map(InMemoryConsumer::getConsumedMessages) + .mapToLong(Queue::size) + .sum(); + } + + } + + private void closeQuietly(AutoCloseable... closeables) { + Arrays.stream(closeables).forEach(this::closeQuietly); + } + + private void closeQuietly(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + //ignore error + } + } + + private byte[] asBytes(String message) { + return message.getBytes(StandardCharsets.UTF_8); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQWaitStrategy.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQWaitStrategy.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQWaitStrategy.java new file mode 100644 index 0000000..29b5193 --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQWaitStrategy.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; + +import com.google.common.primitives.Ints; +import com.rabbitmq.client.Connection; + +public class RabbitMQWaitStrategy implements WaitStrategy { + + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1); + + public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) { + return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT); + } + + private final DockerRabbitMQ rabbitMQ; + private final Duration timeout; + + public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) { + this.rabbitMQ = rabbitMQ; + this.timeout = timeout; + } + + @Override + public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { + int seconds = Ints.checkedCast(this.timeout.getSeconds()); + + Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected); + } + + private Boolean isConnected() throws IOException, TimeoutException { + try (Connection connection = rabbitMQ.connectionFactory().newConnection()) { + return connection.isOpen(); + } + } + + @Override + public WaitStrategy withStartupTimeout(Duration startupTimeout) { + return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/ReusableDockerRabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/ReusableDockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/ReusableDockerRabbitMQExtension.java new file mode 100644 index 0000000..fb98c2a --- /dev/null +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/ReusableDockerRabbitMQExtension.java @@ -0,0 +1,59 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backend.rabbitmq; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class ReusableDockerRabbitMQExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback, ParameterResolver { + + private DockerRabbitMQ rabbitMQ; + + @Override + public void beforeAll(ExtensionContext context) { + rabbitMQ = DockerRabbitMQ.withoutCookie(); + rabbitMQ.start(); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + rabbitMQ.reset(); + } + + @Override + public void afterAll(ExtensionContext extensionContext) { + rabbitMQ.stop(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerRabbitMQ.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return rabbitMQ; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java index eb61313..cd40fac 100644 --- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java +++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java @@ -24,7 +24,7 @@ import javax.inject.Singleton; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.james.backend.mailqueue.RabbitMQConfiguration; +import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; import org.apache.james.utils.PropertiesProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index be48674..aaf57f7 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -27,8 +27,8 @@ import java.util.concurrent.TimeoutException; import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; -import org.apache.james.backend.mailqueue.DockerRabbitMQ; -import org.apache.james.backend.mailqueue.ReusableDockerRabbitMQExtension; +import org.apache.james.backend.rabbitmq.DockerRabbitMQ; +import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index c5ffc27..cf1cb88 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -27,8 +27,8 @@ import java.util.concurrent.TimeoutException; import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; -import org.apache.james.backend.mailqueue.DockerRabbitMQ; -import org.apache.james.backend.mailqueue.ReusableDockerRabbitMQExtension; +import org.apache.james.backend.rabbitmq.DockerRabbitMQ; +import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; http://git-wip-us.apache.org/repos/asf/james-project/blob/74114e93/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml index 57f07d9..a82f039 100644 --- a/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml +++ b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml @@ -9,7 +9,7 @@ <logger name="org.testcontainers" level="ERROR"/> <logger name="org.apache.james" level="WARN"/> - <logger name="org.apache.james.backend.mailqueue.DockerRabbitMQ" level="WARN"/> + <logger name="org.apache.james.backend.rabbitmq.DockerRabbitMQ" level="WARN"/> <root level="ERROR"> <appender-ref ref="CONSOLE" /> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
