JAMES-2159 Support List in AmqpForwardAttribute

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c803800d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c803800d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c803800d

Branch: refs/heads/master
Commit: c803800dd43439316e64a635271f6dcbe1e93fcd
Parents: 605ab57
Author: Matthieu Baechler <matth...@apache.org>
Authored: Tue Sep 12 18:16:45 2017 +0200
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Wed Sep 27 16:07:08 2017 +0200

----------------------------------------------------------------------
 .../transport/mailets/AmqpForwardAttribute.java | 36 ++++++++++++--------
 .../mailets/AmqpForwardAttributeTest.java       | 26 ++++++++++++--
 2 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/c803800d/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
----------------------------------------------------------------------
diff --git 
a/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
 
b/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
index 7d1b104..e0cb8f1 100644
--- 
a/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
+++ 
b/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
@@ -21,8 +21,10 @@ package org.apache.james.transport.mailets;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailetException;
@@ -30,6 +32,7 @@ import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.rabbitmq.client.AMQP;
@@ -104,7 +107,7 @@ public class AmqpForwardAttribute extends GenericMailet {
         if (mail.getAttribute(attribute) == null) {
             return;
         }
-        Map<String, byte[]> content = getAttributeContent(mail);
+        Stream<byte[]> content = getAttributeContent(mail);
         try {
             sendContent(content);
         } catch (IOException e) {
@@ -115,17 +118,20 @@ public class AmqpForwardAttribute extends GenericMailet {
     }
 
     @SuppressWarnings("unchecked")
-    private Map<String, byte[]> getAttributeContent(Mail mail) throws 
MailetException {
+    private Stream<byte[]> getAttributeContent(Mail mail) throws 
MailetException {
         Serializable attributeContent = mail.getAttribute(attribute);
-        if (! (attributeContent instanceof Map)) {
-            throw new MailetException("Invalid attribute found into attribute "
-                    + attribute + "class Map expected but "
-                    + attributeContent.getClass() + " found.");
+        if (attributeContent instanceof Map) {
+            return ((Map<String, byte[]>) attributeContent).values().stream();
         }
-        return (Map<String, byte[]>) attributeContent;
+        if (attributeContent instanceof List) {
+            return ((List<byte[]>) attributeContent).stream();
+        }
+        throw new MailetException("Invalid attribute found into attribute "
+                + attribute + "class Map or List expected but "
+                + attributeContent.getClass() + " found.");
     }
 
-    private void sendContent(Map<String, byte[]> content) throws IOException, 
TimeoutException {
+    private void sendContent(Stream<byte[]> content) throws IOException, 
TimeoutException {
         Connection connection = null;
         Channel channel = null;
         try {
@@ -143,13 +149,13 @@ public class AmqpForwardAttribute extends GenericMailet {
         }
     }
 
-    private void sendContentOnChannel(Channel channel, Map<String, byte[]> 
content) throws IOException {
-        for (byte[] body: content.values()) {
-            channel.basicPublish(exchange, 
-                    routingKey, 
-                    new AMQP.BasicProperties(), 
-                    body);
-        }
+    private void sendContentOnChannel(Channel channel, Stream<byte[]> content) 
throws IOException {
+        content.forEach(
+            Throwing.consumer(message ->
+                channel.basicPublish(exchange,
+                        routingKey,
+                        new AMQP.BasicProperties(),
+                        message)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/c803800d/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
----------------------------------------------------------------------
diff --git 
a/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
 
b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
index 0af1e67..c8f201e 100644
--- 
a/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
+++ 
b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
@@ -172,10 +172,10 @@ public class AmqpForwardAttributeTest {
     }
 
     @Test
-    public void serviceShouldThrowWhenAttributeContentIsNotAMap() throws 
MessagingException {
+    public void 
serviceShouldThrowWhenAttributeContentIsNotAMapAListOrAString() throws 
MessagingException {
         mailet.init(mailetConfig);
         Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ImmutableList.of());
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(2);
 
         expectedException.expect(MailetException.class);
 
@@ -207,7 +207,7 @@ public class AmqpForwardAttributeTest {
     }
 
     @Test
-    public void serviceShouldPublishAttributeContentWhenAttributeInMail() 
throws Exception {
+    public void 
serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAMap() throws 
Exception {
         mailet.init(mailetConfig);
         Channel channel = mock(Channel.class);
         Connection connection = mock(Connection.class);
@@ -225,4 +225,24 @@ public class AmqpForwardAttributeTest {
         verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), 
basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
         
assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
     }
+
+    @Test
+    public void 
serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAList() throws 
Exception {
+        mailet.init(mailetConfig);
+        Channel channel = mock(Channel.class);
+        Connection connection = mock(Connection.class);
+        when(connection.createChannel()).thenReturn(channel);
+        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+        when(connectionFactory.newConnection()).thenReturn(connection);
+        mailet.setConnectionFactory(connectionFactory);
+        Mail mail = mock(Mail.class);
+        
when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ImmutableList.of(ATTACHMENT_CONTENT));
+        BasicProperties expectedProperties = new AMQP.BasicProperties();
+
+        mailet.service(mail);
+
+        ArgumentCaptor<BasicProperties> basicPropertiesCaptor = 
ArgumentCaptor.forClass(BasicProperties.class);
+        verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), 
basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
+        
assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to