Author: norman
Date: Sun Sep 19 16:42:26 2010
New Revision: 998694
URL: http://svn.apache.org/viewvc?rev=998694&view=rev
Log:
Add support for BlobMessage for out-of-band transfer of big messages. This can
help when you need to handle many very big messages (JAMES-1046)
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java?rev=998694&r1=998693&r2=998694&view=diff
==============================================================================
---
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
(original)
+++
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
Sun Sep 19 16:42:26 2010
@@ -20,6 +20,7 @@ package org.apache.james.queue;
import java.io.IOException;
import java.io.Serializable;
+import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
@@ -38,10 +39,15 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
+import javax.mail.internet.MimeMessage;
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
+import org.apache.activemq.pool.PooledSession;
+import org.apache.commons.logging.Log;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
+import org.apache.james.core.MimeMessageInputStream;
import org.apache.james.core.MimeMessageInputStreamSource;
import org.apache.james.mailbox.MailboxException;
import org.apache.mailet.Mail;
@@ -55,7 +61,9 @@ import org.apache.mailet.MailAddress;
* When a {...@link Mail} attribute is found and is not one of the supported
primitives, then the
* toString() method is called on the attribute value to convert it
*
- * TODO: Make it possible to use {...@link BlobMessage} for large messages
+ * The implementation support the usage of {...@link BlobMessage} for
out-of-band transfer of the {...@link MimeMessage}
+ *
+ * See http://activemq.apache.org/blob-messages.html for more details
*
*
*/
@@ -64,6 +72,7 @@ public class ActiveMQMailQueue implement
private final String queuename;
private final ConnectionFactory connectionFactory;
private long messageTreshold = -1;
+ private Log logger;
private final static String JAMES_MAIL_RECIPIENTS =
"JAMES_MAIL_RECIPIENTS";
private final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
@@ -76,14 +85,40 @@ public class ActiveMQMailQueue implement
private final static String JAMES_MAIL_REMOTEADDR =
"JAMES_MAIL_REMOTEADDR";
private final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
private final static String JAMES_MAIL_ATTRIBUTE_NAMES =
"JAMES_MAIL_ATTRIBUTE_NAMES";
- private final static int NO_DELAY = -1;
+ private final static String JAMES_BLOB_URL = "JAMES_BLOB_URL";
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, long messageTreshold) {
+ private final static int NO_DELAY = -1;
+ public final static int DISABLE_TRESHOLD = -1;
+ public final static int BLOBMESSAGE_ONLY = 0;
+
+ /**
+ * Construct a new ActiveMQ based {...@link MailQueue}.
+ * The messageTreshold is used to calculate if a {...@link BytesMessage}
or a {...@link BlobMessage} should be used when queuing the mail in
+ * ActiveMQ. A {...@link BlobMessage} is used If the message size is
bigger then the messageTreshold. The size if in bytes.
+ *
+ * If you want to disable the usage of {...@link BlobMessage} just use
{...@link #DISABLE_TRESHOLD} as value. If you want to use {...@link
BlobMessage}
+ * for every message (not depending of the size) just use {...@link
#BLOBMESSAGE_ONLY} as value.
+ *
+ * @param connectionFactory
+ * @param queuename
+ * @param messageTreshold
+ * @param logger
+ */
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final long messageTreshold, final Log logger) {
this.connectionFactory = connectionFactory;
this.queuename = queuename;
this.messageTreshold = messageTreshold;
+ this.logger = logger;
}
+ /**
+ * ActiveMQ based {...@link MailQueue} which just use {...@link
BytesMessage} for all messages
+ *
+ * @see #ActiveMQMailQueue(ConnectionFactory, String, long, Log)
+ */
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
+ this(connectionFactory, queuename, DISABLE_TRESHOLD, logger);
+ }
/*
* (non-Javadoc)
@@ -194,13 +229,30 @@ public class ActiveMQMailQueue implement
return "MailQueue:" + queuename;
}
- private Mail createMail(BytesMessage message) throws MailQueueException,
JMSException {
+ private Mail createMail(Message message) throws MailQueueException,
JMSException {
MailImpl mail = new MailImpl();
populateMail(message, mail);
try {
-
- mail.setMessage(new MimeMessageCopyOnWriteProxy(new
MimeMessageInputStreamSource(mail.getName(), new
BytesMessageInputStream(message))));
+ if (message instanceof BytesMessage) {
+ mail.setMessage(new MimeMessageCopyOnWriteProxy(new
MimeMessageInputStreamSource(mail.getName(), new
BytesMessageInputStream((BytesMessage)message))));
+ } else if (message instanceof BlobMessage) {
+ BlobMessage blobMessage = (BlobMessage) message;
+ try {
+ // store url for later usage. Maybe we can do something
smart for RemoteDelivery here
+ // TODO: Check if this makes sense at all
+ mail.setAttribute(JAMES_BLOB_URL, blobMessage.getURL());
+ } catch (MalformedURLException e) {
+ // Ignore on error
+ logger.debug("Unable to get url from blobmessage for mail
" + mail.getName());
+ }
+ mail.setMessage(new MimeMessageCopyOnWriteProxy(new
MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream())));
+
+ } else {
+ throw new MailQueueException("Not supported JMS Message
received " + message);
+ }
+ } catch (IOException e) {
+ throw new MailQueueException("Unable to prepare Mail for dequeue",
e);
} catch (MessagingException e) {
throw new MailQueueException("Unable to prepare Mail for dequeue",
e);
}
@@ -262,12 +314,36 @@ public class ActiveMQMailQueue implement
}
private Message createMessage(Session session, Mail mail, long
delayInMillis) throws MailQueueException{
try {
- BytesMessage message = session.createBytesMessage();
-
- populateJMSProperties(message, mail, delayInMillis);
-
- mail.getMessage().writeTo(new BytesMessageOutputStream(message));;
- return message;
+ boolean useBlob = false;
+ if (messageTreshold != -1) {
+ try {
+ if (messageTreshold == 0 || mail.getMessageSize() >
messageTreshold) {
+ useBlob = true;
+ }
+ } catch (MessagingException e) {
+ logger.info("Unable to calculate message size for mail " +
mail.getName() + ". Use BytesMessage for JMS");
+ useBlob = false;
+ }
+ }
+ if (useBlob == false) {
+ BytesMessage message = session.createBytesMessage();
+
+ populateJMSProperties(message, mail, delayInMillis);
+
+ mail.getMessage().writeTo(new
BytesMessageOutputStream(message));;
+ return message;
+ } else {
+ ActiveMQSession amqSession;
+ if (session instanceof PooledSession) {
+ amqSession = ((PooledSession)
session).getInternalSession();
+ } else {
+ amqSession = (ActiveMQSession) session;
+ }
+ BlobMessage message = amqSession.createBlobMessage(new
MimeMessageInputStream(mail.getMessage()));
+ populateJMSProperties(message, mail, delayInMillis);
+ return message;
+ }
+
} catch (MessagingException e) {
throw new MailQueueException("Unable to prepare Mail for enqueue"
, e);
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java?rev=998694&r1=998693&r2=998694&view=diff
==============================================================================
---
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
(original)
+++
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
Sun Sep 19 16:42:26 2010
@@ -22,25 +22,47 @@ import java.util.HashMap;
import java.util.Map;
import javax.annotation.Resource;
+import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.commons.logging.Log;
+import org.apache.james.lifecycle.LogEnabled;
/**
* {...@link MailQueueFactory} implementations which return {...@link
ActiveMQMailQueue} instances
*
+ *
+ *
*
*/
-public class ActiveMQMailQueueFactory implements MailQueueFactory{
+public class ActiveMQMailQueueFactory implements MailQueueFactory, LogEnabled{
+
private final Map<String, MailQueue> queues = new HashMap<String,
MailQueue>();
private ConnectionFactory connectionFactory;
+ private long sizeTreshold = ActiveMQMailQueue.DISABLE_TRESHOLD;
+ private Log log;
@Resource(name="jmsConnectionFactory")
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
+ /**
+ * The size treshold which will be used for setting if a {...@link
BlobMessage} or {...@link BytesMessage} will be used
+ * as {...@link Message} type. See {...@link ActiveMQMailQueue} for more
details
+ *
+ * @param sizeTreshold
+ */
+ public void setSizeTreshold(long sizeTreshold) {
+ this.sizeTreshold = sizeTreshold;
+ }
+
+
/*
*
@@ -50,11 +72,21 @@ public class ActiveMQMailQueueFactory im
public synchronized MailQueue getQueue(String name) {
MailQueue queue = queues.get(name);
if (queue == null) {
- queue = new ActiveMQMailQueue(connectionFactory, name, -1);
+ queue = new ActiveMQMailQueue(connectionFactory, name,
sizeTreshold, log);
queues.put(name, queue);
}
return queue;
}
+
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
+ */
+ public void setLog(Log log) {
+ this.log = log;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]