MAILET-149 Use a test rule for AMQP queue

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6d1f913e
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6d1f913e
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6d1f913e

Branch: refs/heads/master
Commit: 6d1f913e9865c0f26f73bfff9f5c9cf8b37fad40
Parents: 28eedd5
Author: Benoit Tellier <[email protected]>
Authored: Fri Jan 20 10:03:30 2017 +0700
Committer: Antoine Duprat <[email protected]>
Committed: Mon Jan 23 16:19:24 2017 +0100

----------------------------------------------------------------------
 server/mailet/integration-testing/pom.xml       |   2 +-
 .../mailets/AmqpForwardAttachmentTest.java      |  66 ++---------
 .../mailets/ICSAttachmentWorkflowTest.java      |  76 +++---------
 .../james/transport/mailets/amqp/AmqpRule.java  | 115 +++++++++++++++++++
 4 files changed, 142 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/pom.xml
----------------------------------------------------------------------
diff --git a/server/mailet/integration-testing/pom.xml 
b/server/mailet/integration-testing/pom.xml
index 31eb20c..be3cd62 100644
--- a/server/mailet/integration-testing/pom.xml
+++ b/server/mailet/integration-testing/pom.xml
@@ -220,7 +220,7 @@
               <dependency>
                   <groupId>org.assertj</groupId>
                   <artifactId>assertj-core</artifactId>
-                  <version>${assertj-1.version}</version>
+                  <version>${assertj-3.version}</version>
                   <scope>test</scope>
               </dependency>
               <dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java
 
b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java
index 67f63cd..400f965 100644
--- 
a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java
+++ 
b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttachmentTest.java
@@ -22,12 +22,8 @@ package org.apache.james.transport.mailets;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.mail.MessagingException;
 import javax.mail.Session;
@@ -43,6 +39,7 @@ import org.apache.james.mailets.configuration.MailetContainer;
 import org.apache.james.mailets.configuration.ProcessorConfiguration;
 import org.apache.james.mailets.utils.IMAPMessageReader;
 import org.apache.james.mailets.utils.SMTPMessageSender;
+import org.apache.james.transport.mailets.amqp.AmqpRule;
 import org.apache.james.util.streams.SwarmGenericContainer;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailAddress;
@@ -55,16 +52,10 @@ import org.junit.rules.RuleChain;
 import org.junit.rules.TemporaryFolder;
 
 import com.google.common.base.Charsets;
-import com.google.common.net.InetAddresses;
 import com.google.common.primitives.Bytes;
 import com.jayway.awaitility.Awaitility;
 import com.jayway.awaitility.Duration;
 import com.jayway.awaitility.core.ConditionFactory;
-import com.rabbitmq.client.BuiltinExchangeType;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.GetResponse;
 
 public class AmqpForwardAttachmentTest {
 
@@ -88,22 +79,16 @@ public class AmqpForwardAttachmentTest {
             .withAffinityToContainer();
 
     public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    public AmqpRule amqpRule = new AmqpRule(rabbitMqContainer, EXCHANGE_NAME, 
ROUTING_KEY);
 
     @Rule
-    public final RuleChain chain = 
RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer);
+    public final RuleChain chain = 
RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer).around(amqpRule);
     
     private TemporaryJamesServer jamesServer;
     private ConditionFactory calmlyAwait;
-    private Channel channel;
-    private String queueName;
-    private Connection connection;
 
     @Before
     public void setup() throws Exception {
-        @SuppressWarnings("deprecation")
-        InetAddress containerIp = 
InetAddresses.forString(rabbitMqContainer.getContainerInfo().getNetworkSettings().getIpAddress());
-        String amqpUri = "amqp://" + containerIp.getHostAddress();
-
         MailetContainer mailetContainer = MailetContainer.builder()
             .postmaster("postmaster@" + JAMES_APACHE_ORG)
             .threads(5)
@@ -131,7 +116,7 @@ public class AmqpForwardAttachmentTest {
                     .addMailet(MailetConfiguration.builder()
                             .match("All")
                             .clazz("AmqpForwardAttribute")
-                            .addProperty("uri", amqpUri)
+                            .addProperty("uri", amqpRule.getAmqpUri())
                             .addProperty("exchange", EXCHANGE_NAME)
                             .addProperty("attribute", MAIL_ATTRIBUTE)
                             .addProperty("routing_key", ROUTING_KEY)
@@ -149,27 +134,21 @@ public class AmqpForwardAttachmentTest {
 
         jamesServer = new TemporaryJamesServer(temporaryFolder, 
mailetContainer);
         Duration slowPacedPollInterval = Duration.FIVE_HUNDRED_MILLISECONDS;
-        calmlyAwait = 
Awaitility.with().pollInterval(slowPacedPollInterval).and().with().pollDelay(slowPacedPollInterval).await();
+        calmlyAwait = Awaitility.with()
+            .pollInterval(slowPacedPollInterval)
+            .and()
+            .with()
+            .pollDelay(slowPacedPollInterval)
+            .await();
 
         jamesServer.getServerProbe().addDomain(JAMES_APACHE_ORG);
         jamesServer.getServerProbe().addUser(FROM, PASSWORD);
         jamesServer.getServerProbe().addUser(RECIPIENT, PASSWORD);
         
jamesServer.getServerProbe().createMailbox(MailboxConstants.USER_NAMESPACE, 
RECIPIENT, "INBOX");
-        
-        ConnectionFactory factory = new ConnectionFactory();
-        factory.setUri(amqpUri);
-        waitingForRabbitToBeReady(factory);
-        connection = factory.newConnection();
-        channel = connection.createChannel();
-        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-        queueName = channel.queueDeclare().getQueue();
-        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
     }
 
     @After
     public void tearDown() throws Exception {
-        channel.close();
-        connection.close();
         jamesServer.shutdown();
     }
 
@@ -200,10 +179,8 @@ public class AmqpForwardAttachmentTest {
             
calmlyAwait.atMost(Duration.ONE_MINUTE).until(messageSender::messageHasBeenSent);
             calmlyAwait.atMost(Duration.ONE_MINUTE).until(() -> 
imapMessageReader.userReceivedMessage(RECIPIENT, PASSWORD));
         }
-        
-        boolean autoAck = true;
-        GetResponse basicGet = channel.basicGet(queueName, autoAck);
-        assertThat(basicGet.getBody()).isEqualTo(TEST_ATTACHMENT_CONTENT);
+
+        
assertThat(amqpRule.readContentAsBytes()).contains(TEST_ATTACHMENT_CONTENT);
     }
 
     private MimeBodyPart createAttachmentBodyPart(byte[] body, String 
fileName) throws MessagingException, UnsupportedEncodingException {
@@ -219,23 +196,4 @@ public class AmqpForwardAttachmentTest {
                         body)));
     }
 
-    private void waitingForRabbitToBeReady(ConnectionFactory factory) {
-        Awaitility
-            .await()
-            .atMost(30, TimeUnit.SECONDS)
-            .with()
-            .pollInterval(10, TimeUnit.MILLISECONDS)
-            .until(() -> isReady(factory));
-    }
-
-    private boolean isReady(ConnectionFactory factory) {
-        try (Connection connection = factory.newConnection()) {
-            return true;
-        } catch (IOException e) {
-            return false;
-        } catch (TimeoutException e) {
-            return false;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java
 
b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java
index 51a23e2..1fb694d 100644
--- 
a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java
+++ 
b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/ICSAttachmentWorkflowTest.java
@@ -22,12 +22,9 @@ package org.apache.james.transport.mailets;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
+import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.mail.MessagingException;
 import javax.mail.Session;
@@ -43,6 +40,7 @@ import org.apache.james.mailets.configuration.MailetContainer;
 import org.apache.james.mailets.configuration.ProcessorConfiguration;
 import org.apache.james.mailets.utils.IMAPMessageReader;
 import org.apache.james.mailets.utils.SMTPMessageSender;
+import org.apache.james.transport.mailets.amqp.AmqpRule;
 import org.apache.james.util.streams.SwarmGenericContainer;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailAddress;
@@ -56,7 +54,6 @@ import org.junit.rules.RuleChain;
 import org.junit.rules.TemporaryFolder;
 
 import com.google.common.base.Charsets;
-import com.google.common.net.InetAddresses;
 import com.google.common.primitives.Bytes;
 import com.jayway.awaitility.Awaitility;
 import com.jayway.awaitility.Duration;
@@ -65,11 +62,6 @@ import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.Option;
-import com.rabbitmq.client.BuiltinExchangeType;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.GetResponse;
 
 public class ICSAttachmentWorkflowTest {
 
@@ -82,7 +74,7 @@ public class ICSAttachmentWorkflowTest {
 
     private static final String FROM = "fromUser@" + JAMES_APACHE_ORG;
     private static final String RECIPIENT = "touser@" + JAMES_APACHE_ORG;
-    
+
     private static final String MAIL_ATTRIBUTE = "my.attribute";
     private static final String EXCHANGE_NAME = "myExchange";
     private static final String ROUTING_KEY = "myRoutingKey";
@@ -219,24 +211,17 @@ public class ICSAttachmentWorkflowTest {
 
     public SwarmGenericContainer rabbitMqContainer = new 
SwarmGenericContainer("rabbitmq:3")
             .withAffinityToContainer();
-
     public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    public AmqpRule amqpRule = new AmqpRule(rabbitMqContainer, EXCHANGE_NAME, 
ROUTING_KEY);
 
     @Rule
-    public final RuleChain chain = 
RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer);
-    
-    private TemporaryJamesServer jamesServer;
+    public final RuleChain chain = 
RuleChain.outerRule(temporaryFolder).around(rabbitMqContainer).around(amqpRule);
+
     private ConditionFactory calmlyAwait;
-    private Channel channel;
-    private String queueName;
-    private Connection connection;
+    private TemporaryJamesServer jamesServer;
 
     @Before
     public void setup() throws Exception {
-        @SuppressWarnings("deprecation")
-        InetAddress containerIp = 
InetAddresses.forString(rabbitMqContainer.getContainerInfo().getNetworkSettings().getIpAddress());
-        String amqpUri = "amqp://" + containerIp.getHostAddress();
-
         MailetContainer mailetContainer = MailetContainer.builder()
             .postmaster("postmaster@" + JAMES_APACHE_ORG)
             .threads(5)
@@ -281,7 +266,7 @@ public class ICSAttachmentWorkflowTest {
                     .addMailet(MailetConfiguration.builder()
                             .match("All")
                             .clazz("AmqpForwardAttribute")
-                            .addProperty("uri", amqpUri)
+                            .addProperty("uri", amqpRule.getAmqpUri())
                             .addProperty("exchange", EXCHANGE_NAME)
                             .addProperty("attribute", MAIL_ATTRIBUTE)
                             .addProperty("routing_key", ROUTING_KEY)
@@ -305,21 +290,10 @@ public class ICSAttachmentWorkflowTest {
         jamesServer.getServerProbe().addUser(FROM, PASSWORD);
         jamesServer.getServerProbe().addUser(RECIPIENT, PASSWORD);
         
jamesServer.getServerProbe().createMailbox(MailboxConstants.USER_NAMESPACE, 
RECIPIENT, "INBOX");
-        
-        ConnectionFactory factory = new ConnectionFactory();
-        factory.setUri(amqpUri);
-        waitingForRabbitToBeReady(factory);
-        connection = factory.newConnection();
-        channel = connection.createChannel();
-        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-        queueName = channel.queueDeclare().getQueue();
-        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
     }
 
     @After
     public void tearDown() throws Exception {
-        channel.close();
-        connection.close();
         jamesServer.shutdown();
     }
 
@@ -350,9 +324,7 @@ public class ICSAttachmentWorkflowTest {
             calmlyAwait.atMost(Duration.ONE_MINUTE).until(() -> 
imapMessageReader.userReceivedMessage(RECIPIENT, PASSWORD));
         }
 
-        boolean autoAck = true;
-        GetResponse basicGet = channel.basicGet(queueName, autoAck);
-        assertThat(basicGet).isNull();
+        assertThat(amqpRule.readContent()).isEmpty();
     }
 
     @Test
@@ -382,10 +354,9 @@ public class ICSAttachmentWorkflowTest {
             calmlyAwait.atMost(Duration.ONE_MINUTE).until(() -> 
imapMessageReader.userReceivedMessage(RECIPIENT, PASSWORD));
         }
 
-        boolean autoAck = true;
-        GetResponse basicGet = channel.basicGet(queueName, autoAck);
-
-        DocumentContext jsonPath = toJsonPath(basicGet);
+        Optional<String> content = amqpRule.readContent();
+        assertThat(content).isPresent();
+        DocumentContext jsonPath = toJsonPath(content.get());
         assertThat(jsonPath.<String> read("ical")).isEqualTo(ICS_1);
         assertThat(jsonPath.<String> read("sender")).isEqualTo(FROM);
         assertThat(jsonPath.<String> read("recipient")).isEqualTo(RECIPIENT);
@@ -396,10 +367,10 @@ public class ICSAttachmentWorkflowTest {
         assertThat(jsonPath.<String> read("recurrence-id")).isNull();
     }
 
-    private DocumentContext toJsonPath(GetResponse basicGet) {
+    private DocumentContext toJsonPath(String content) {
         return JsonPath.using(Configuration.defaultConfiguration()
                 .addOptions(Option.SUPPRESS_EXCEPTIONS))
-            .parse(new String(basicGet.getBody(), Charsets.UTF_8));
+            .parse(content);
     }
 
     @Test
@@ -528,23 +499,4 @@ public class ICSAttachmentWorkflowTest {
                         body)));
     }
 
-    private void waitingForRabbitToBeReady(ConnectionFactory factory) {
-        Awaitility
-            .await()
-            .atMost(30, TimeUnit.SECONDS)
-            .with()
-            .pollInterval(10, TimeUnit.MILLISECONDS)
-            .until(() -> isReady(factory));
-    }
-
-    private boolean isReady(ConnectionFactory factory) {
-        try (Connection connection = factory.newConnection()) {
-            return true;
-        } catch (IOException e) {
-            return false;
-        } catch (TimeoutException e) {
-            return false;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/6d1f913e/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java
----------------------------------------------------------------------
diff --git 
a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java
 
b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java
new file mode 100644
index 0000000..c59f2c3
--- /dev/null
+++ 
b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpRule.java
@@ -0,0 +1,115 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets.amqp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.util.streams.SwarmGenericContainer;
+import org.junit.rules.ExternalResource;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.net.InetAddresses;
+import com.jayway.awaitility.Awaitility;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.GetResponse;
+
+public class AmqpRule extends ExternalResource {
+
+    private final SwarmGenericContainer rabbitMqContainer;
+    private final String exchangeName;
+    private final String routingKey;
+    private Channel channel;
+    private String queueName;
+    private Connection connection;
+    private String amqpUri;
+
+    public AmqpRule(SwarmGenericContainer rabbitMqContainer, String 
exchangeName, String routingKey) {
+        this.rabbitMqContainer = rabbitMqContainer;
+        this.exchangeName = exchangeName;
+        this.routingKey = routingKey;
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        @SuppressWarnings("deprecation")
+        InetAddress containerIp = 
InetAddresses.forString(rabbitMqContainer.getContainerInfo().getNetworkSettings().getIpAddress());
+        amqpUri = "amqp://" + containerIp.getHostAddress();
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setUri(amqpUri);
+        waitingForRabbitToBeReady(factory);
+        connection = factory.newConnection();
+        channel = connection.createChannel();
+        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
+        queueName = channel.queueDeclare().getQueue();
+        channel.queueBind(queueName, exchangeName, routingKey);
+    }
+
+    public String getAmqpUri() {
+        return amqpUri;
+    }
+
+    public Optional<String> readContent() throws IOException {
+        return readContentAsBytes()
+            .map(value -> new String(value, Charsets.UTF_8));
+    }
+
+    public Optional<byte[]> readContentAsBytes() throws IOException {
+        boolean autoAck = true;
+        return Optional.ofNullable(channel.basicGet(queueName, autoAck))
+            .map(GetResponse::getBody);
+    }
+
+    @Override
+    protected void after() {
+        try {
+            channel.close();
+            connection.close();
+        } catch (Exception e) {
+            Throwables.propagate(e);
+        }
+    }
+
+    private void waitingForRabbitToBeReady(ConnectionFactory factory) {
+        Awaitility
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .with()
+            .pollInterval(10, TimeUnit.MILLISECONDS)
+            .until(() -> isReady(factory));
+    }
+
+    private boolean isReady(ConnectionFactory factory) {
+        try (Connection connection = factory.newConnection()) {
+            return true;
+        } catch (IOException e) {
+            return false;
+        } catch (TimeoutException e) {
+            return false;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to