MAILET-149 Use a test rule for AMQP queue
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6d1f913e Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6d1f913e Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6d1f913e Branch: refs/heads/master Commit: 6d1f913e9865c0f26f73bfff9f5c9cf8b37fad40 Parents: 28eedd5 Author: Benoit Tellier <[email protected]> Authored: Fri Jan 20 10:03:30 2017 +0700 Committer: Antoine Duprat <[email protected]> Committed: Mon Jan 23 16:19:24 2017 +0100 ---------------------------------------------------------------------- server/mailet/integration-testing/pom.xml | 2 +- .../mailets/AmqpForwardAttachmentTest.java | 66 ++--------- .../mailets/ICSAttachmentWorkflowTest.java | 76 +++--------- .../james/transport/mailets/amqp/AmqpRule.java | 115 +++++++++++++++++++ 4 files changed, 142 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/pom.xml ---------------------------------------------------------------------- diff --git a/server/mailet/integration-testing/pom.xml b/server/mailet/integration-testing/pom.xml index 31eb20c..be3cd62 100644 --- a/server/mailet/integration-testing/pom.xml +++ b/server/mailet/integration-testing/pom.xml @@ -220,7 +220,7 @@ <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> - <version>${assertj-1.version}</version> + <version>${assertj-3.version}</version> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java ---------------------------------------------------------------------- diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java index 67f63cd..400f965 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java @@ -22,12 +22,8 @@ package org.apache.james.transport.mailets; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.InetAddress; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import javax.mail.MessagingException; import javax.mail.Session; @@ -43,6 +39,7 @@ import org.apache.james.mailets.configuration.MailetContainer; import org.apache.james.mailets.configuration.ProcessorConfiguration; import org.apache.james.mailets.utils.IMAPMessageReader; import org.apache.james.mailets.utils.SMTPMessageSender; +import org.apache.james.transport.mailets.amqp.AmqpRule; import org.apache.james.util.streams.SwarmGenericContainer; import org.apache.mailet.Mail; import org.apache.mailet.MailAddress; @@ -55,16 +52,10 @@ import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; import com.google.common.base.Charsets; -import com.google.common.net.InetAddresses; import com.google.common.primitives.Bytes; import com.jayway.awaitility.Awaitility; import com.jayway.awaitility.Duration; import com.jayway.awaitility.core.ConditionFactory; -import com.rabbitmq.client.BuiltinExchangeType; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.GetResponse; public class AmqpForwardAttachmentTest { @@ -88,22 +79,16 @@ public class AmqpForwardAttachmentTest { .withAffinityToContainer(); public TemporaryFolder temporaryFolder = new TemporaryFolder(); + public AmqpRule amqpRule = new AmqpRule(rabbitMqContainer, EXCHANGE_NAME, ROUTING_KEY); @Rule - public final RuleChain chain = RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer); + public final RuleChain chain = RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer).around(amqpRule); private TemporaryJamesServer jamesServer; private ConditionFactory calmlyAwait; - private Channel channel; - private String queueName; - private Connection connection; @Before public void setup() throws Exception { - @SuppressWarnings("deprecation") - InetAddress containerIp = InetAddresses.forString(rabbitMqContainer.getContainerInfo().getNetworkSettings().getIpAddress()); - String amqpUri = "amqp://" + containerIp.getHostAddress(); - MailetContainer mailetContainer = MailetContainer.builder() .postmaster("postmaster@" + JAMES_APACHE_ORG) .threads(5) @@ -131,7 +116,7 @@ public class AmqpForwardAttachmentTest { .addMailet(MailetConfiguration.builder() .match("All") .clazz("AmqpForwardAttribute") - .addProperty("uri", amqpUri) + .addProperty("uri", amqpRule.getAmqpUri()) .addProperty("exchange", EXCHANGE_NAME) .addProperty("attribute", MAIL_ATTRIBUTE) .addProperty("routing_key", ROUTING_KEY) @@ -149,27 +134,21 @@ public class AmqpForwardAttachmentTest { jamesServer = new TemporaryJamesServer(temporaryFolder, mailetContainer); Duration slowPacedPollInterval = Duration.FIVE_HUNDRED_MILLISECONDS; - calmlyAwait = Awaitility.with().pollInterval(slowPacedPollInterval).and().with().pollDelay(slowPacedPollInterval).await(); + calmlyAwait = Awaitility.with() + .pollInterval(slowPacedPollInterval) + .and() + .with() + .pollDelay(slowPacedPollInterval) + .await(); jamesServer.getServerProbe().addDomain(JAMES_APACHE_ORG); jamesServer.getServerProbe().addUser(FROM, PASSWORD); jamesServer.getServerProbe().addUser(RECIPIENT, PASSWORD); jamesServer.getServerProbe().createMailbox(MailboxConstants.USER_NAMESPACE, RECIPIENT, "INBOX"); - - ConnectionFactory factory = new ConnectionFactory(); - factory.setUri(amqpUri); - waitingForRabbitToBeReady(factory); - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); - queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); } @After public void tearDown() throws Exception { - channel.close(); - connection.close(); jamesServer.shutdown(); } @@ -200,10 +179,8 @@ public class AmqpForwardAttachmentTest { calmlyAwait.atMost(Duration.ONE_MINUTE).until(messageSender::messageHasBeenSent); calmlyAwait.atMost(Duration.ONE_MINUTE).until(() -> imapMessageReader.userReceivedMessage(RECIPIENT, PASSWORD)); } - - boolean autoAck = true; - GetResponse basicGet = channel.basicGet(queueName, autoAck); - assertThat(basicGet.getBody()).isEqualTo(TEST_ATTACHMENT_CONTENT); + + assertThat(amqpRule.readContentAsBytes()).contains(TEST_ATTACHMENT_CONTENT); } private MimeBodyPart createAttachmentBodyPart(byte[] body, String fileName) throws MessagingException, UnsupportedEncodingException { @@ -219,23 +196,4 @@ public class AmqpForwardAttachmentTest { body))); } - private void waitingForRabbitToBeReady(ConnectionFactory factory) { - Awaitility - .await() - .atMost(30, TimeUnit.SECONDS) - .with() - .pollInterval(10, TimeUnit.MILLISECONDS) - .until(() -> isReady(factory)); - } - - private boolean isReady(ConnectionFactory factory) { - try (Connection connection = factory.newConnection()) { - return true; - } catch (IOException e) { - return false; - } catch (TimeoutException e) { - return false; - } - } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java ---------------------------------------------------------------------- diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java index 51a23e2..1fb694d 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java @@ -22,12 +22,9 @@ package org.apache.james.transport.mailets; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.InetAddress; +import java.util.Optional; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import javax.mail.MessagingException; import javax.mail.Session; @@ -43,6 +40,7 @@ import org.apache.james.mailets.configuration.MailetContainer; import org.apache.james.mailets.configuration.ProcessorConfiguration; import org.apache.james.mailets.utils.IMAPMessageReader; import org.apache.james.mailets.utils.SMTPMessageSender; +import org.apache.james.transport.mailets.amqp.AmqpRule; import org.apache.james.util.streams.SwarmGenericContainer; import org.apache.mailet.Mail; import org.apache.mailet.MailAddress; @@ -56,7 +54,6 @@ import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; import com.google.common.base.Charsets; -import com.google.common.net.InetAddresses; import com.google.common.primitives.Bytes; import com.jayway.awaitility.Awaitility; import com.jayway.awaitility.Duration; @@ -65,11 +62,6 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; -import com.rabbitmq.client.BuiltinExchangeType; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.GetResponse; public class ICSAttachmentWorkflowTest { @@ -82,7 +74,7 @@ public class ICSAttachmentWorkflowTest { private static final String FROM = "fromUser@" + JAMES_APACHE_ORG; private static final String RECIPIENT = "touser@" + JAMES_APACHE_ORG; - + private static final String MAIL_ATTRIBUTE = "my.attribute"; private static final String EXCHANGE_NAME = "myExchange"; private static final String ROUTING_KEY = "myRoutingKey"; @@ -219,24 +211,17 @@ public class ICSAttachmentWorkflowTest { public SwarmGenericContainer rabbitMqContainer = new SwarmGenericContainer("rabbitmq:3") .withAffinityToContainer(); - public TemporaryFolder temporaryFolder = new TemporaryFolder(); + public AmqpRule amqpRule = new AmqpRule(rabbitMqContainer, EXCHANGE_NAME, ROUTING_KEY); @Rule - public final RuleChain chain = RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer); - - private TemporaryJamesServer jamesServer; + public final RuleChain chain = RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer).around(amqpRule); + private ConditionFactory calmlyAwait; - private Channel channel; - private String queueName; - private Connection connection; + private TemporaryJamesServer jamesServer; @Before public void setup() throws Exception { - @SuppressWarnings("deprecation") - InetAddress containerIp = InetAddresses.forString(rabbitMqContainer.getContainerInfo().getNetworkSettings().getIpAddress()); - String amqpUri = "amqp://" + containerIp.getHostAddress(); - MailetContainer mailetContainer = MailetContainer.builder() .postmaster("postmaster@" + JAMES_APACHE_ORG) .threads(5) @@ -281,7 +266,7 @@ public class ICSAttachmentWorkflowTest { .addMailet(MailetConfiguration.builder() .match("All") .clazz("AmqpForwardAttribute") - .addProperty("uri", amqpUri) + .addProperty("uri", amqpRule.getAmqpUri()) .addProperty("exchange", EXCHANGE_NAME) .addProperty("attribute", MAIL_ATTRIBUTE) .addProperty("routing_key", ROUTING_KEY) @@ -305,21 +290,10 @@ public class ICSAttachmentWorkflowTest { jamesServer.getServerProbe().addUser(FROM, PASSWORD); jamesServer.getServerProbe().addUser(RECIPIENT, PASSWORD); jamesServer.getServerProbe().createMailbox(MailboxConstants.USER_NAMESPACE, RECIPIENT, "INBOX"); - - ConnectionFactory factory = new ConnectionFactory(); - factory.setUri(amqpUri); - waitingForRabbitToBeReady(factory); - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); - queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); } @After public void tearDown() throws Exception { - channel.close(); - connection.close(); jamesServer.shutdown(); } @@ -350,9 +324,7 @@ public class ICSAttachmentWorkflowTest { calmlyAwait.atMost(Duration.ONE_MINUTE).until(() -> imapMessageReader.userReceivedMessage(RECIPIENT, PASSWORD)); } - boolean autoAck = true; - GetResponse basicGet = channel.basicGet(queueName, autoAck); - assertThat(basicGet).isNull(); + assertThat(amqpRule.readContent()).isEmpty(); } @Test @@ -382,10 +354,9 @@ public class ICSAttachmentWorkflowTest { calmlyAwait.atMost(Duration.ONE_MINUTE).until(() -> imapMessageReader.userReceivedMessage(RECIPIENT, PASSWORD)); } - boolean autoAck = true; - GetResponse basicGet = channel.basicGet(queueName, autoAck); - - DocumentContext jsonPath = toJsonPath(basicGet); + Optional<String> content = amqpRule.readContent(); + assertThat(content).isPresent(); + DocumentContext jsonPath = toJsonPath(content.get()); assertThat(jsonPath.<String> read("ical")).isEqualTo(ICS_1); assertThat(jsonPath.<String> read("sender")).isEqualTo(FROM); assertThat(jsonPath.<String> read("recipient")).isEqualTo(RECIPIENT); @@ -396,10 +367,10 @@ public class ICSAttachmentWorkflowTest { assertThat(jsonPath.<String> read("recurrence-id")).isNull(); } - private DocumentContext toJsonPath(GetResponse basicGet) { + private DocumentContext toJsonPath(String content) { return JsonPath.using(Configuration.defaultConfiguration() .addOptions(Option.SUPPRESS_EXCEPTIONS)) - .parse(new String(basicGet.getBody(), Charsets.UTF_8)); + .parse(content); } @Test @@ -528,23 +499,4 @@ public class ICSAttachmentWorkflowTest { body))); } - private void waitingForRabbitToBeReady(ConnectionFactory factory) { - Awaitility - .await() - .atMost(30, TimeUnit.SECONDS) - .with() - .pollInterval(10, TimeUnit.MILLISECONDS) - .until(() -> isReady(factory)); - } - - private boolean isReady(ConnectionFactory factory) { - try (Connection connection = factory.newConnection()) { - return true; - } catch (IOException e) { - return false; - } catch (TimeoutException e) { - return false; - } - } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java ---------------------------------------------------------------------- diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java new file mode 100644 index 0000000..c59f2c3 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java @@ -0,0 +1,115 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets.amqp; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.james.util.streams.SwarmGenericContainer; +import org.junit.rules.ExternalResource; + +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.net.InetAddresses; +import com.jayway.awaitility.Awaitility; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.GetResponse; + +public class AmqpRule extends ExternalResource { + + private final SwarmGenericContainer rabbitMqContainer; + private final String exchangeName; + private final String routingKey; + private Channel channel; + private String queueName; + private Connection connection; + private String amqpUri; + + public AmqpRule(SwarmGenericContainer rabbitMqContainer, String exchangeName, String routingKey) { + this.rabbitMqContainer = rabbitMqContainer; + this.exchangeName = exchangeName; + this.routingKey = routingKey; + } + + @Override + protected void before() throws Throwable { + @SuppressWarnings("deprecation") + InetAddress containerIp = InetAddresses.forString(rabbitMqContainer.getContainerInfo().getNetworkSettings().getIpAddress()); + amqpUri = "amqp://" + containerIp.getHostAddress(); + ConnectionFactory factory = new ConnectionFactory(); + factory.setUri(amqpUri); + waitingForRabbitToBeReady(factory); + connection = factory.newConnection(); + channel = connection.createChannel(); + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); + queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, exchangeName, routingKey); + } + + public String getAmqpUri() { + return amqpUri; + } + + public Optional<String> readContent() throws IOException { + return readContentAsBytes() + .map(value -> new String(value, Charsets.UTF_8)); + } + + public Optional<byte[]> readContentAsBytes() throws IOException { + boolean autoAck = true; + return Optional.ofNullable(channel.basicGet(queueName, autoAck)) + .map(GetResponse::getBody); + } + + @Override + protected void after() { + try { + channel.close(); + connection.close(); + } catch (Exception e) { + Throwables.propagate(e); + } + } + + private void waitingForRabbitToBeReady(ConnectionFactory factory) { + Awaitility + .await() + .atMost(30, TimeUnit.SECONDS) + .with() + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> isReady(factory)); + } + + private boolean isReady(ConnectionFactory factory) { + try (Connection connection = factory.newConnection()) { + return true; + } catch (IOException e) { + return false; + } catch (TimeoutException e) { + return false; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
