Author: norman
Date: Sun Sep 19 16:42:26 2010
New Revision: 998694

URL: http://svn.apache.org/viewvc?rev=998694&view=rev
Log:
Add support for BlobMessage for out-of-band transfer of big messages. This can 
help when you need to handle many very big messages (JAMES-1046)

Modified:
    
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
    
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java

Modified: 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java?rev=998694&r1=998693&r2=998694&view=diff
==============================================================================
--- 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
 (original)
+++ 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
 Sun Sep 19 16:42:26 2010
@@ -20,6 +20,7 @@ package org.apache.james.queue;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
@@ -38,10 +39,15 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
+import javax.mail.internet.MimeMessage;
 
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
+import org.apache.activemq.pool.PooledSession;
+import org.apache.commons.logging.Log;
 import org.apache.james.core.MailImpl;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
+import org.apache.james.core.MimeMessageInputStream;
 import org.apache.james.core.MimeMessageInputStreamSource;
 import org.apache.james.mailbox.MailboxException;
 import org.apache.mailet.Mail;
@@ -55,7 +61,9 @@ import org.apache.mailet.MailAddress;
  * When a {...@link Mail} attribute is found and is not one of the supported 
primitives, then the 
  * toString() method is called on the attribute value to convert it 
  * 
- * TODO: Make it possible to use {...@link BlobMessage} for large messages
+ * The implementation support the usage of {...@link BlobMessage} for 
out-of-band transfer of the {...@link MimeMessage}
+ * 
+ * See http://activemq.apache.org/blob-messages.html for more details
  * 
  *
  */
@@ -64,6 +72,7 @@ public class ActiveMQMailQueue implement
     private final String queuename;
     private final ConnectionFactory connectionFactory;
     private long messageTreshold = -1;
+    private Log logger;
 
     private final static String JAMES_MAIL_RECIPIENTS = 
"JAMES_MAIL_RECIPIENTS";
     private final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
@@ -76,14 +85,40 @@ public class ActiveMQMailQueue implement
     private final static String JAMES_MAIL_REMOTEADDR = 
"JAMES_MAIL_REMOTEADDR";
     private final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
     private final static String JAMES_MAIL_ATTRIBUTE_NAMES = 
"JAMES_MAIL_ATTRIBUTE_NAMES";
-    private final static int NO_DELAY = -1;
+    private final static String JAMES_BLOB_URL = "JAMES_BLOB_URL";
     
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, long messageTreshold) {
+    private final static int NO_DELAY = -1;
+    public final static int DISABLE_TRESHOLD = -1;
+    public final static int BLOBMESSAGE_ONLY = 0;
+
+    /**
+     * Construct a new ActiveMQ based {...@link MailQueue}. 
+     * The messageTreshold is used to calculate if a {...@link BytesMessage} 
or a {...@link BlobMessage} should be used when queuing the mail in
+     * ActiveMQ. A {...@link BlobMessage} is used If the message size is 
bigger then the messageTreshold. The size if in bytes.
+     * 
+     * If you want to disable the usage of {...@link BlobMessage} just use 
{...@link #DISABLE_TRESHOLD} as value. If you want to use {...@link 
BlobMessage} 
+     * for every message (not depending of  the size) just use {...@link 
#BLOBMESSAGE_ONLY} as value.
+     * 
+     * @param connectionFactory
+     * @param queuename
+     * @param messageTreshold
+     * @param logger
+     */
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final long messageTreshold, final Log logger) {
         this.connectionFactory = connectionFactory;     
         this.queuename = queuename;
         this.messageTreshold  = messageTreshold;
+        this.logger = logger;
     }
     
+    /**
+     * ActiveMQ based {...@link MailQueue} which just use {...@link 
BytesMessage} for all messages
+     * 
+     * @see #ActiveMQMailQueue(ConnectionFactory, String, long, Log)
+     */
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
+        this(connectionFactory, queuename, DISABLE_TRESHOLD, logger);
+    }
     
     /*
      * (non-Javadoc)
@@ -194,13 +229,30 @@ public class ActiveMQMailQueue implement
         return "MailQueue:" + queuename;
     }
     
-    private Mail createMail(BytesMessage message) throws MailQueueException, 
JMSException {
+    private Mail createMail(Message message) throws MailQueueException, 
JMSException {
         MailImpl mail = new MailImpl();
         populateMail(message, mail);
         
         try {
-            
-            mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
MimeMessageInputStreamSource(mail.getName(), new 
BytesMessageInputStream(message))));
+            if (message instanceof BytesMessage) {
+                mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
MimeMessageInputStreamSource(mail.getName(), new 
BytesMessageInputStream((BytesMessage)message))));
+            } else if (message instanceof BlobMessage) {
+                BlobMessage blobMessage = (BlobMessage) message;
+                try {
+                    // store url for later usage. Maybe we can do something 
smart for RemoteDelivery here
+                    // TODO: Check if this makes sense at all
+                    mail.setAttribute(JAMES_BLOB_URL, blobMessage.getURL());
+                } catch (MalformedURLException e) {
+                    // Ignore on error
+                    logger.debug("Unable to get url from blobmessage for mail 
" + mail.getName());
+                }
+                mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream())));
+                
+            } else {
+                throw new MailQueueException("Not supported JMS Message 
received " + message);
+            }
+        } catch (IOException e) {
+            throw new MailQueueException("Unable to prepare Mail for dequeue", 
e);
         } catch (MessagingException e) {
             throw new MailQueueException("Unable to prepare Mail for dequeue", 
e);
         }
@@ -262,12 +314,36 @@ public class ActiveMQMailQueue implement
     }
     private Message createMessage(Session session, Mail mail, long 
delayInMillis) throws MailQueueException{
         try {
-            BytesMessage message  = session.createBytesMessage();
-     
-            populateJMSProperties(message, mail, delayInMillis);
-                        
-            mail.getMessage().writeTo(new BytesMessageOutputStream(message));;
-            return message;
+            boolean useBlob = false;
+            if (messageTreshold != -1) {
+                try {
+                if (messageTreshold == 0 || mail.getMessageSize() > 
messageTreshold) {
+                    useBlob = true;
+                }
+                } catch (MessagingException e) {
+                    logger.info("Unable to calculate message size for mail " + 
mail.getName() + ". Use BytesMessage for JMS");
+                    useBlob = false;
+                }
+            }
+            if (useBlob == false) {
+                BytesMessage message  = session.createBytesMessage();
+                
+                populateJMSProperties(message, mail, delayInMillis);
+                            
+                mail.getMessage().writeTo(new 
BytesMessageOutputStream(message));;
+                return message;
+            } else {
+                ActiveMQSession amqSession;
+                if (session instanceof PooledSession) {
+                    amqSession = ((PooledSession) 
session).getInternalSession();
+                } else {
+                    amqSession = (ActiveMQSession) session;
+                }
+                BlobMessage message  = amqSession.createBlobMessage(new 
MimeMessageInputStream(mail.getMessage()));
+                populateJMSProperties(message, mail, delayInMillis);
+                return message;
+            }
+            
 
         } catch (MessagingException e) {
             throw new MailQueueException("Unable to prepare Mail for enqueue" 
, e);

Modified: 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java?rev=998694&r1=998693&r2=998694&view=diff
==============================================================================
--- 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
 (original)
+++ 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
 Sun Sep 19 16:42:26 2010
@@ -22,25 +22,47 @@ import java.util.HashMap;
 import java.util.Map;
 
 import javax.annotation.Resource;
+import javax.jms.BytesMessage;
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.commons.logging.Log;
+import org.apache.james.lifecycle.LogEnabled;
 
 
 /**
  * {...@link MailQueueFactory} implementations which return {...@link 
ActiveMQMailQueue} instances
  * 
+ * 
+ * 
  *
  */
-public class ActiveMQMailQueueFactory implements MailQueueFactory{
+public class ActiveMQMailQueueFactory implements MailQueueFactory, LogEnabled{
 
     
+    
     private final Map<String, MailQueue> queues = new HashMap<String, 
MailQueue>();
     private ConnectionFactory connectionFactory;
+    private long sizeTreshold = ActiveMQMailQueue.DISABLE_TRESHOLD;
+    private Log log;
     
     @Resource(name="jmsConnectionFactory")
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
     }
     
+    /**
+     * The size treshold which will be used for setting if a {...@link 
BlobMessage} or {...@link BytesMessage} will be used
+     * as {...@link Message} type. See {...@link ActiveMQMailQueue} for more 
details
+     * 
+     * @param sizeTreshold
+     */
+    public void setSizeTreshold(long sizeTreshold) {
+        this.sizeTreshold  = sizeTreshold;
+    }
+    
+    
     
     /*
      * 
@@ -50,11 +72,21 @@ public class ActiveMQMailQueueFactory im
     public synchronized MailQueue getQueue(String name) {
         MailQueue queue = queues.get(name);
         if (queue == null) {
-            queue = new ActiveMQMailQueue(connectionFactory, name, -1);
+            queue = new ActiveMQMailQueue(connectionFactory, name, 
sizeTreshold, log);
             queues.put(name, queue);
         }
 
         return queue;
     }
 
+
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
+     */
+    public void setLog(Log log) {
+        this.log = log;
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to