Author: norman
Date: Wed Nov 3 15:44:18 2010
New Revision: 1030502
URL: http://svn.apache.org/viewvc?rev=1030502&view=rev
Log:
Rewrite ActiveMQQueue to use BlobMessages and a custom BlobTransferPolicy which
just use the filesystem for store the MimeMessage of the Mail (JAMES-1108)
Added:
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
Modified:
james/server/trunk/container-spring/src/main/config/james/spring-beans.xml
james/server/trunk/queue-activemq/pom.xml
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
Modified:
james/server/trunk/container-spring/src/main/config/james/spring-beans.xml
URL:
http://svn.apache.org/viewvc/james/server/trunk/container-spring/src/main/config/james/spring-beans.xml?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- james/server/trunk/container-spring/src/main/config/james/spring-beans.xml
(original)
+++ james/server/trunk/container-spring/src/main/config/james/spring-beans.xml
Wed Nov 3 15:44:18 2010
@@ -25,7 +25,7 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring-2.5.0.xsd
- http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.0-SNAPSHOT.xsd">
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.4.1.xsd">
<bean id="exporter" class="org.springframework.jmx.export.MBeanExporter"
lazy-init="false">
<property name="beans">
@@ -133,7 +133,7 @@
</camel:camelContext>
<!-- lets create an embedded ActiveMQ Broker -->
- <amq:broker useJmx="true" persistent="true" brokerName="james"
dataDirectory="filesystem=file://var/activemq-data/" useShutdownHook="false"
id="broker">
+ <amq:broker useJmx="true" persistent="true" brokerName="james"
dataDirectory="filesystem=file://var/activemq-data/" useShutdownHook="false"
schedulerSupport="true" id="broker">
<amq:destinationPolicy>
<amq:policyMap>
<amq:policyEntries>
@@ -150,11 +150,21 @@
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>
+
+ <amq:connectionFactory id="amqConnectionFactory"
brokerURL="vm://james?create=false">
+ <property name="blobTransferPolicy" ref="blobTransferPolicy"/>
+ </amq:connectionFactory>
+
+ <bean id="blobTransferPolicy"
class="org.apache.james.queue.activemq.FileSystemBlobTransferPolicy">
+ <property name="defaultUploadUrl" value="file://var/queue"/>
+ </bean>
- <amq:connectionFactory id="amqConnectionFactory"
brokerURL="vm://james?create=false" />
-
- <bean id="jmsConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
- <property name="connectionFactory" ref="amqConnectionFactory"/>
+ <!-- IMPORTANT: if you use not the ActiveMQMailQueueFactory you prolly
need to set cacheConsumers to false -->
+ <bean id="jmsConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
+ <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
+ <property name="sessionCacheSize" value="10"/>
+ <property name="cacheConsumers" value="true"/>
+ <property name="cacheProducers" value="true"/>
</bean>
<!-- setup spring jms TX manager -->
@@ -162,8 +172,7 @@
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
- <!-- Use JMSMailQueueFactory till activemq 5.5.0 is released -->
- <bean id="mailQueueFactory"
class="org.apache.james.queue.jms.JMSMailQueueFactory" depends-on="broker"/>
+ <bean id="mailQueueFactory"
class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory"
depends-on="broker"/>
<!-- Build the camelroute from the spoolmanager.xml -->
<bean id="mailProcessor" name="processorRoute"
class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/>
Added:
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java?rev=1030502&view=auto
==============================================================================
---
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
(added)
+++
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
Wed Nov 3 15:44:18 2010
@@ -0,0 +1,73 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.core;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.mail.internet.SharedInputStream;
+
+import org.apache.james.lifecycle.Disposable;
+
+
+/**
+ * A wrapper around classes which implements {...@link SharedInputStream} and
{...@link InputStream}. This class will not close the underlying
+ * stream on {...@link #close()} call. It will only closs the stream when
{...@link #dispose()} is called
+ *
+ */
+public class NonClosingSharedInputStream<E extends InputStream &
SharedInputStream> extends FilterInputStream implements Disposable,
SharedInputStream{
+
+ public NonClosingSharedInputStream(E in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ /**
+ * Close the stream and so all streams which share the same file
+ */
+ public void dispose() {
+ try {
+ super.close();
+ } catch (IOException e) {
+ // ignore on close
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see javax.mail.internet.SharedInputStream#getPosition()
+ */
+ public long getPosition() {
+ return ((SharedInputStream)in).getPosition();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see javax.mail.internet.SharedInputStream#newStream(long, long)
+ */
+ public InputStream newStream(long arg0, long arg1) {
+ return ((SharedInputStream)in).newStream(arg0, arg1);
+ }
+
+}
Modified: james/server/trunk/queue-activemq/pom.xml
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/pom.xml?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/pom.xml (original)
+++ james/server/trunk/queue-activemq/pom.xml Wed Nov 3 15:44:18 2010
@@ -40,6 +40,10 @@
<groupId>org.apache.james</groupId>
<artifactId>james-server-core-library</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-server-core-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.james</groupId>
<artifactId>apache-mailet</artifactId>
@@ -48,10 +52,33 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-pool</artifactId>
- </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-aop</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-tx</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-annotation_1.0_spec</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
@@ -60,5 +87,9 @@
<groupId>${javax.mail.groupId}</groupId>
<artifactId>${javax.mail.artifactId}</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging-api</artifactId>
+ </dependency>
</dependencies>
</project>
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
Wed Nov 3 15:44:18 2010
@@ -19,7 +19,9 @@
package org.apache.james.queue.activemq;
import java.io.IOException;
+import java.io.InputStream;
import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Iterator;
import java.util.Map;
@@ -37,14 +39,18 @@ import javax.mail.internet.MimeMessage;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
-import org.apache.activemq.pool.PooledSession;
+import org.apache.activemq.ScheduledMessage;
import org.apache.commons.logging.Log;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.core.MimeMessageInputStream;
import org.apache.james.core.MimeMessageInputStreamSource;
+import org.apache.james.core.MimeMessageSource;
+import org.apache.james.core.MimeMessageWrapper;
+import org.apache.james.core.NonClosingSharedInputStream;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.jms.JMSMailQueue;
import org.apache.mailet.Mail;
+import org.springframework.jms.connection.SessionProxy;
/**
* *...@link MailQueue} implementation which use an ActiveMQ Queue.
@@ -55,8 +61,7 @@ import org.apache.mailet.Mail;
* primitives, then the toString() method is called on the attribute value to
* convert it
*
- * The implementation support the usage of {...@link BlobMessage} for
out-of-band
- * transfer of the {...@link MimeMessage}
+ * The implementation use {...@link BlobMessage}
*
* See http://activemq.apache.org/blob-messages.html for more details
*
@@ -69,17 +74,12 @@ import org.apache.mailet.Mail;
* to it. It should use one of the following value {...@link #LOW_PRIORITY},
* {...@link #NORMAL_PRIORITY}, {...@link #HIGH_PRIORITY}
*
+ * To have a good throughput you should use a caching connection factory.
+ *
*
*/
-public class ActiveMQMailQueue extends JMSMailQueue {
-
- private long messageTreshold = -1;
+public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
- private final static String JAMES_BLOB_URL = "JAMES_BLOB_URL";
-
- public final static int NO_DELAY = -1;
- public final static int DISABLE_TRESHOLD = -1;
- public final static int BLOBMESSAGE_ONLY = 0;
/**
* Construct a new ActiveMQ based {...@link MailQueue}. The
messageTreshold is
@@ -88,10 +88,6 @@ public class ActiveMQMailQueue extends J
* is used If the message size is bigger then the messageTreshold. The size
* if in bytes.
*
- * If you want to disable the usage of {...@link BlobMessage} just use
- * {...@link #DISABLE_TRESHOLD} as value. If you want to use
- * {...@link BlobMessage} for every message (not depending of the size)
just
- * use {...@link #BLOBMESSAGE_ONLY} as value.
*
* For enabling the priority feature in AMQ see:
*
@@ -99,22 +95,64 @@ public class ActiveMQMailQueue extends J
*
* @param connectionFactory
* @param queuename
- * @param messageTreshold
* @param logger
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final long messageTreshold, final Log logger) {
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
super(connectionFactory, queuename, logger);
- this.messageTreshold = messageTreshold;
}
-
- /**
- * ActiveMQ based {...@link MailQueue} which just use {...@link
BytesMessage} for
- * all messages
- *
- * @see #ActiveMQMailQueue(ConnectionFactory, String, long, Log)
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.jms.JMSMailQueue#deQueue()
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
- this(connectionFactory, queuename, DISABLE_TRESHOLD, logger);
+ public MailQueueItem deQueue() throws MailQueueException {
+ Connection connection = null;
+ Session session = null;
+ Message message = null;
+ MessageConsumer consumer = null;
+
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queuename);
+ consumer = session.createConsumer(queue);
+
+ message = consumer.receive();
+ return createMailQueueItem(connection, session, consumer, message);
+
+ } catch (Exception e) {
+ try {
+ session.rollback();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+
+ if (consumer != null) {
+
+ try {
+ consumer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ throw new MailQueueException("Unable to dequeue next message", e);
+ }
+
}
/*
@@ -124,21 +162,28 @@ public class ActiveMQMailQueue extends J
* org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax
* .jms.Message, org.apache.mailet.Mail)
*/
+ @SuppressWarnings("unchecked")
protected void populateMailMimeMessage(Message message, Mail mail) throws
MessagingException {
if (message instanceof BlobMessage) {
try {
BlobMessage blobMessage = (BlobMessage) message;
try {
- // store url for later usage. Maybe we can do something
- // smart for RemoteDelivery here
- // TODO: Check if this makes sense at all
+ // store URL and queuenamefor later usage
mail.setAttribute(JAMES_BLOB_URL, blobMessage.getURL());
+ mail.setAttribute(JAMES_QUEUE_NAME, queuename);
} catch (MalformedURLException e) {
// Ignore on error
logger.debug("Unable to get url from blobmessage for mail
" + mail.getName());
}
- mail.setMessage(new MimeMessageCopyOnWriteProxy(new
MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream())));
-
+ InputStream in = blobMessage.getInputStream();
+ MimeMessageSource source;
+
+ if (in instanceof NonClosingSharedInputStream) {
+ source = new MimeMessageBlobMessageSource(blobMessage);
+ } else {
+ source = new MimeMessageInputStreamSource(mail.getName(),
blobMessage.getInputStream());
+ }
+ mail.setMessage(new MimeMessageCopyOnWriteProxy(source));
} catch (IOException e) {
throw new MailQueueException("Unable to populate MimeMessage
for mail " + mail.getName(), e);
} catch (JMSException e) {
@@ -157,50 +202,95 @@ public class ActiveMQMailQueue extends J
* org.apache.mailet.Mail, long)
*/
protected void produceMail(Session session, Map<String,Object> props, int
msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
- boolean useBlob = false;
- if (messageTreshold != -1) {
- try {
- if (messageTreshold == 0 || mail.getMessageSize() >
messageTreshold) {
- useBlob = true;
- }
- } catch (MessagingException e) {
- logger.info("Unable to calculate message size for mail " +
mail.getName() + ". Use BytesMessage for JMS");
- useBlob = false;
+ MessageProducer producer = null;
+ try {
+
+ BlobMessage blobMessage = null;
+ MimeMessage mm = mail.getMessage();
+ MimeMessage wrapper = mm;
+
+ ActiveMQSession amqSession = getAMQSession(session);
+
+ if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
+ wrapper =
((MimeMessageCopyOnWriteProxy)mm).getWrappedMessage();
}
- }
- if (useBlob) {
- MessageProducer producer = null;
- try {
- ActiveMQSession amqSession;
- if (session instanceof PooledSession) {
- amqSession = ((PooledSession)
session).getInternalSession();
- } else {
- amqSession = (ActiveMQSession) session;
- }
- BlobMessage message = amqSession.createBlobMessage(new
MimeMessageInputStream(mail.getMessage()));
- Queue queue = session.createQueue(queuename);
-
- producer = session.createProducer(queue);
- Iterator<String> keys = props.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- message.setObjectProperty(key, props.get(key));
+
+ if (wrapper instanceof MimeMessageWrapper) {
+ URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
+ String fromQueue = (String)
mail.getAttribute(JAMES_QUEUE_NAME);
+ MimeMessageWrapper mwrapper = (MimeMessageWrapper) wrapper;
+
+ if (blobUrl != null && fromQueue != null &&
fromQueue.equals(queuename) && mwrapper.isModified() == false ) {
+ // the message content was not changed so don't need to
upload it again and can just point to the url
+ blobMessage = amqSession.createBlobMessage(blobUrl);
+
+ // thats important so we don't delete the blob file after
complete the processing!
+ mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
+
}
- producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio,
Message.DEFAULT_TIME_TO_LIVE);
- } finally {
- try {
- if (producer != null)
- producer.close();
- } catch (JMSException e) {
- // ignore here
- }
+ }
+ if (blobMessage == null) {
+ // just use the MimeMessageInputStream which can read every
MimeMessage implementation
+ blobMessage = amqSession.createBlobMessage(new
MimeMessageInputStream(wrapper));
}
+ // store the queue name in the props
+ props.put(JAMES_QUEUE_NAME, queuename);
+
+
+ Queue queue = session.createQueue(queuename);
+
+ producer = session.createProducer(queue);
+ Iterator<String> keys = props.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ blobMessage.setObjectProperty(key, props.get(key));
+ }
+ producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio,
Message.DEFAULT_TIME_TO_LIVE);
+ } finally {
+
+ try {
+ if (producer != null)
+ producer.close();
+ } catch (JMSException e) {
+ // ignore here
+ }
+ }
+
+ }
+
+ /**
+ * Cast the given {...@link Session} to an {...@link ActiveMQSession}
+ *
+ * @param session
+ * @return amqSession
+ * @throws JMSException
+ */
+ protected ActiveMQSession getAMQSession(Session session) throws
JMSException {
+ ActiveMQSession amqSession;
+
+ if (session instanceof SessionProxy) {
+ // handle Springs CachingConnectionFactory
+ amqSession = (ActiveMQSession) ((SessionProxy)
session).getTargetSession();
} else {
- super.produceMail(session, props, msgPrio, mail);
+ // just cast as we have no other idea
+ amqSession = (ActiveMQSession) session;
}
+ return amqSession;
+ }
+
+ @Override
+ protected Map<String, Object> getJMSProperties(Mail mail, long
delayInMillis) throws JMSException, MessagingException {
+ Map<String, Object> props = super.getJMSProperties(mail,
delayInMillis);
+
+ // add JMS Property for handling message scheduling
+ // http://activemq.apache.org/delay-and-schedule-message-delivery.html
+ if (delayInMillis > 0) {
+ props.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
+ }
+ return props;
}
@Override
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
Wed Nov 3 15:44:18 2010
@@ -18,10 +18,7 @@
****************************************************************/
package org.apache.james.queue.activemq;
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-import org.apache.activemq.BlobMessage;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.jms.JMSMailQueueFactory;
@@ -35,25 +32,13 @@ import org.apache.james.queue.jms.JMSMai
*
*/
public class ActiveMQMailQueueFactory extends JMSMailQueueFactory{
-
- private long sizeTreshold = ActiveMQMailQueue.DISABLE_TRESHOLD;
- /**
- * The size treshold which will be used for setting if a {...@link
BlobMessage} or {...@link BytesMessage} will be used
- * as {...@link Message} type. See {...@link ActiveMQMailQueue} for more
details
- *
- * @param sizeTreshold
- */
- public void setSizeTreshold(long sizeTreshold) {
- this.sizeTreshold = sizeTreshold;
- }
-
@Override
protected MailQueue createMailQueue(String name) {
- return new ActiveMQMailQueue(connectionFactory, name, sizeTreshold,
log);
+ return new ActiveMQMailQueue(connectionFactory, name, log);
}
}
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
Wed Nov 3 15:44:18 2010
@@ -39,7 +39,7 @@ import org.apache.mailet.Mail;
* well
*
*/
-public class ActiveMQMailQueueItem extends JMSMailQueueItem {
+public class ActiveMQMailQueueItem extends JMSMailQueueItem implements
ActiveMQSupport{
private final Message message;
private final Log logger;
@@ -50,14 +50,15 @@ public class ActiveMQMailQueueItem exten
this.logger = logger;
}
- @Override
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.queue.jms.JMSMailQueueItem#done(boolean)
+ */
public void done(boolean success) throws MailQueueException {
super.done(success);
if (success) {
- if (message instanceof ActiveMQBlobMessage) {
- /*
- * TODO: Enable this once activemq 5.4.2 was released
- // delete the file
+ if (message instanceof ActiveMQBlobMessage &&
getMail().getAttribute(JAMES_REUSE_BLOB_URL) == null) {
+
// This should get removed once this jira issue was fixed
// https://issues.apache.org/activemq/browse/AMQ-1529
try {
@@ -67,9 +68,11 @@ public class ActiveMQMailQueueItem exten
} catch (JMSException e) {
logger.info("Unable to delete blob message file for mail "
+ getMail().getName());
}
- */
}
+ getMail().removeAttribute(JAMES_REUSE_BLOB_URL);
+
}
+
}
-}
+}
\ No newline at end of file
Added:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java?rev=1030502&view=auto
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
(added)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
Wed Nov 3 15:44:18 2010
@@ -0,0 +1,44 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import org.apache.james.queue.jms.JMSSupport;
+
+/**
+ * Interface which should get implemented by ActiveMQ depending implementions
+ *
+ */
+public interface ActiveMQSupport extends JMSSupport{
+
+ /**
+ * The name of the Queue the mail is stored in
+ */
+ public final static String JAMES_QUEUE_NAME = "JAMES_QUEUE_NAME";
+
+ /**
+ * The URL of the Blobmessage content
+ */
+ public final static String JAMES_BLOB_URL = "JAMES_BLOB_URL";
+
+ /**
+ * Indicate that the Blobmessage content is reused for a new message
+ */
+ public final static String JAMES_REUSE_BLOB_URL =" JAMES_REUSE_BLOB_URL";
+
+}
Added:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1030502&view=auto
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
(added)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
Wed Nov 3 15:44:18 2010
@@ -0,0 +1,122 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+import javax.mail.util.SharedFileInputStream;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobDownloadStrategy;
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploadStrategy;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.james.core.NonClosingSharedInputStream;
+import org.apache.james.services.FileSystem;
+
+/**
+ * {...@link BlobUploadStrategy} and {...@link BlobDownloadStrategy}
implementation which use the {...@link FileSystem} to lookup the {...@link
File} for the {...@link BlobMessage}
+ *
+ */
+public class FileSystemBlobStrategy implements BlobUploadStrategy,
BlobDownloadStrategy, ActiveMQSupport{
+
+
+ private final FileSystem fs;
+ private final BlobTransferPolicy policy;
+
+ public FileSystemBlobStrategy(final BlobTransferPolicy policy, final
FileSystem fs) {
+ this.fs = fs;
+ this.policy = policy;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.activemq.blob.BlobUploadStrategy#uploadFile(org.apache.activemq.command.ActiveMQBlobMessage,
java.io.File)
+ */
+ public URL uploadFile(ActiveMQBlobMessage message, File file) throws
JMSException, IOException {
+ return uploadStream(message, new FileInputStream(file));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.activemq.blob.BlobUploadStrategy#uploadStream(org.apache.activemq.command.ActiveMQBlobMessage,
java.io.InputStream)
+ */
+ public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
throws JMSException, IOException {
+ File f = getFile(message);
+ FileOutputStream out = new FileOutputStream(f);
+ byte[] buffer = new byte[policy.getBufferSize()];
+ for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
+ out.write(buffer, 0, c);
+ out.flush();
+ }
+ out.flush();
+ out.close();
+ return f.toURL();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
org.apache.activemq.blob.BlobDownloadStrategy#deleteFile(org.apache.activemq.command.ActiveMQBlobMessage)
+ */
+ public void deleteFile(ActiveMQBlobMessage message) throws IOException,
JMSException {
+ File f = getFile(message);
+ if (f.exists()) {
+ f.delete();
+ }
+ }
+
+ /**
+ * Returns a {...@link SharedFileInputStream} for the give {...@link
BlobMessage}
+ */
+ public InputStream getInputStream(ActiveMQBlobMessage message) throws
IOException, JMSException {
+ // return a NonClosingSharedInputStream to make sure the stream will
only get closed on dispose call later
+ return new NonClosingSharedInputStream<SharedFileInputStream>(new
SharedFileInputStream(getFile(message)));
+ }
+
+
+ /**
+ * Return the {...@link File} for the {...@link ActiveMQBlobMessage}.
+ * The {...@link File} is lookup via the {...@link FileSystem} service
+ *
+ * @param message
+ * @return file
+ * @throws JMSException
+ * @throws FileNotFoundException
+ */
+ protected File getFile(ActiveMQBlobMessage message) throws JMSException,
FileNotFoundException {
+ String queueName = message.getStringProperty(JAMES_QUEUE_NAME);
+ String mailname = message.getStringProperty(JAMES_MAIL_NAME);
+ String queueUrl = policy.getUploadUrl() + "/" + queueName + "/";
+ File queueF = fs.getFile(queueUrl);
+
+ // check if we need to create the queue folder
+ if (queueF.exists() == false) {
+ queueF.mkdirs();
+ }
+ return fs.getFile(queueUrl+ "/" + mailname);
+
+ }
+}
Added:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java?rev=1030502&view=auto
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
(added)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
Wed Nov 3 15:44:18 2010
@@ -0,0 +1,64 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import javax.annotation.Resource;
+
+import org.apache.activemq.blob.BlobDownloadStrategy;
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploadStrategy;
+import org.apache.james.services.FileSystem;
+
+/**
+ * {...@link BlobTransferPolicy} which use the {...@link FileSystem} to
download and upload data.
+ * So this implementation is only useful when using a non-clustered ActiveMQ
Broker or when using a shared Storage for the files.
+ *
+ */
+public class FileSystemBlobTransferPolicy extends BlobTransferPolicy{
+
+ private FileSystem fs;
+
+ @Resource(name="filesystem")
+ public void setFileSystem(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ public BlobTransferPolicy copy() {
+ FileSystemBlobTransferPolicy that = new FileSystemBlobTransferPolicy();
+ that.setFileSystem(fs);
+ that.setDefaultUploadUrl(getDefaultUploadUrl());
+ that.setBrokerUploadUrl(getBrokerUploadUrl());
+ that.setUploadUrl(getUploadUrl());
+ that.setUploadStrategy(getUploadStrategy());
+ return that;
+ }
+
+
+ @Override
+ protected BlobDownloadStrategy createDownloadStrategy() {
+ return new FileSystemBlobStrategy(this, fs);
+ }
+
+ @Override
+ protected BlobUploadStrategy createUploadStrategy() {
+ return new FileSystemBlobStrategy(this, fs);
+ }
+
+}
Added:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java?rev=1030502&view=auto
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
(added)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
Wed Nov 3 15:44:18 2010
@@ -0,0 +1,82 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.Disposable;
+import org.apache.james.core.MimeMessageSource;
+import org.apache.james.core.NonClosingSharedInputStream;
+import org.apache.james.lifecycle.LifecycleUtil;
+
+/**
+ * {...@link MimeMessageSource} which use a {...@link BlobMessage} as input.
Be aware that {...@link BlobMessage} must contain
+ * a {...@link NonClosingSharedInputStream} for this implementation!
+ *
+ */
+...@suppresswarnings("unchecked")
+public class MimeMessageBlobMessageSource extends MimeMessageSource implements
ActiveMQSupport, Disposable{
+
+ private NonClosingSharedInputStream in;
+ private String sourceId;
+ private long size;
+
+ public MimeMessageBlobMessageSource(BlobMessage message) throws
JMSException, IOException {
+ this.sourceId = message.getJMSMessageID();
+ this.size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+ this.in = (NonClosingSharedInputStream)message.getInputStream();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.core.MimeMessageSource#getInputStream()
+ */
+ public InputStream getInputStream() throws IOException {
+ return in.newStream(0, -1);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.core.MimeMessageSource#getSourceId()
+ */
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public long getMessageSize() throws IOException {
+ // if the size is < 1 we seems to not had it stored in the property,
so fallback to super implementation
+ if (size == -1) {
+ super.getMessageSize();
+ }
+ return size;
+ }
+
+ /**
+ * Call dispose on the {...@link InputStream}
+ */
+ public void dispose() {
+ LifecycleUtil.dispose(in);
+ }
+}
Modified:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
---
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
(original)
+++
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
Wed Nov 3 15:44:18 2010
@@ -46,6 +46,11 @@ import org.apache.mailet.Mail;
public interface MailQueue {
/**
+ * No delay for queued {...@link MailQueueItem}
+ */
+ public final static int NO_DELAY = -1;
+
+ /**
* Enqueue the Mail to the queue. The given delay and unit are used to
calculate the time when the
* Mail will be avaible for dequeue
*
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Wed Nov 3 15:44:18 2010
@@ -57,23 +57,11 @@ import org.apache.mailet.MailAddress;
*
*
*/
-public class JMSMailQueue implements MailQueue {
+public class JMSMailQueue implements MailQueue, JMSSupport {
protected final String queuename;
protected final ConnectionFactory connectionFactory;
protected final Log logger;
- protected final static String JAMES_MAIL_RECIPIENTS =
"JAMES_MAIL_RECIPIENTS";
- protected final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
- protected final static String JAMES_MAIL_ERROR_MESSAGE =
"JAMES_MAIL_ERROR_MESSAGE";
- protected final static String JAMES_MAIL_LAST_UPDATED =
"JAMES_MAIL_LAST_UPDATED";
- protected final static String JAMES_MAIL_MESSAGE_SIZE =
"JAMES_MAIL_MESSAGE_SIZE";
- protected final static String JAMES_MAIL_NAME = "JAMES_MAIL_NAME";
- protected final static String JAMES_MAIL_SEPERATOR = ";";
- protected final static String JAMES_MAIL_REMOTEHOST =
"JAMES_MAIL_REMOTEHOST";
- protected final static String JAMES_MAIL_REMOTEADDR =
"JAMES_MAIL_REMOTEADDR";
- protected final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
- protected final static String JAMES_MAIL_ATTRIBUTE_NAMES =
"JAMES_MAIL_ATTRIBUTE_NAMES";
- protected final static String JAMES_NEXT_DELIVERY = "JAMES_NEXT_DELIVERY";
/**
* Handle mail with lowest priority
@@ -106,7 +94,7 @@ public class JMSMailQueue implements Mai
}
/**
- * Execute the given {...@link DequeueOperation} when a mail is ready to
precoess. As JMS does not support delay scheduling out-of-the box, we use
+ * Execute the given {...@link DequeueOperation} when a mail is ready to
process. As JMS does not support delay scheduling out-of-the box, we use
* a messageselector to check if a mail is ready. For this a {...@link
MessageConsumer#receive(long) is used with a timeout of 10 seconds.
*
* Many JMS implementations support better solutions for this, so this
should get overridden by these implementations
@@ -126,7 +114,7 @@ public class JMSMailQueue implements Mai
session = connection.createSession(true,
Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queuename);
- consumer = session.createConsumer(queue, JAMES_NEXT_DELIVERY +
" <= " + System.currentTimeMillis());
+ consumer = session.createConsumer(queue, getMessageSelector());
message = consumer.receive(10000);
@@ -192,6 +180,15 @@ public class JMSMailQueue implements Mai
}
+ /**
+ * Return message selector to use for consuming
+ *
+ * @return selector
+ */
+ protected String getMessageSelector() {
+ return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis();
+ }
+
/*
* (non-Javadoc)
*
@@ -257,7 +254,7 @@ public class JMSMailQueue implements Mai
* @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail)
*/
public void enQueue(Mail mail) throws MailQueueException {
- enQueue(mail, -1, TimeUnit.MILLISECONDS);
+ enQueue(mail, NO_DELAY, TimeUnit.MILLISECONDS);
}
/**
@@ -325,7 +322,7 @@ public class JMSMailQueue implements Mai
String recipient = recipients.next().toString();
recipientsBuilder.append(recipient.trim());
if (recipients.hasNext()) {
- recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
+ recipientsBuilder.append(JAMES_MAIL_SEPARATOR);
}
}
props.put(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
@@ -350,7 +347,7 @@ public class JMSMailQueue implements Mai
props.put(attrName, value);
if (attrs.hasNext()) {
- attrsBuilder.append(JAMES_MAIL_SEPERATOR);
+ attrsBuilder.append(JAMES_MAIL_SEPARATOR);
}
}
props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
@@ -409,7 +406,7 @@ public class JMSMailQueue implements Mai
List<MailAddress> rcpts = new ArrayList<MailAddress>();
String recipients = message.getStringProperty(JAMES_MAIL_RECIPIENTS);
- StringTokenizer recipientTokenizer = new StringTokenizer(recipients,
JAMES_MAIL_SEPERATOR);
+ StringTokenizer recipientTokenizer = new StringTokenizer(recipients,
JAMES_MAIL_SEPARATOR);
while (recipientTokenizer.hasMoreTokens()) {
try {
MailAddress rcpt = new
MailAddress(recipientTokenizer.nextToken());
@@ -425,7 +422,7 @@ public class JMSMailQueue implements Mai
mail.setRemoteHost(message.getStringProperty(JAMES_MAIL_REMOTEHOST));
String attributeNames =
message.getStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES);
- StringTokenizer namesTokenizer = new StringTokenizer(attributeNames,
JAMES_MAIL_SEPERATOR);
+ StringTokenizer namesTokenizer = new StringTokenizer(attributeNames,
JAMES_MAIL_SEPARATOR);
while (namesTokenizer.hasMoreTokens()) {
String name = namesTokenizer.nextToken();
Serializable attrValue = message.getStringProperty(name);
@@ -468,6 +465,17 @@ public class JMSMailQueue implements Mai
return "MailQueue:" + queuename;
}
+ /**
+ * Create a {...@link MailQueueItem} for the given parameters
+ *
+ * @param connection
+ * @param session
+ * @param consumer
+ * @param message
+ * @return item
+ * @throws JMSException
+ * @throws MessagingException
+ */
protected MailQueueItem createMailQueueItem(Connection connection, Session
session, MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
final Mail mail = createMail(message);
return new JMSMailQueueItem(mail, connection, session, consumer);
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
Wed Nov 3 15:44:18 2010
@@ -34,10 +34,10 @@ import org.apache.mailet.Mail;
*/
public class JMSMailQueueItem implements MailQueueItem {
- private final Mail mail;
- private final Connection connection;
- private final Session session;
- private final MessageConsumer consumer;
+ protected final Mail mail;
+ protected final Connection connection;
+ protected final Session session;
+ protected final MessageConsumer consumer;
public JMSMailQueueItem(Mail mail, Connection connection, Session session,
MessageConsumer consumer) {
this.mail = mail;
Added:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java?rev=1030502&view=auto
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
(added)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
Wed Nov 3 15:44:18 2010
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+/**
+ * Interface which should get implemented by JMS depending implementations
+ *
+ */
+public interface JMSSupport {
+
+ /**
+ * JMS Property which holds the recipient as String
+ */
+ public final static String JAMES_MAIL_RECIPIENTS = "JAMES_MAIL_RECIPIENTS";
+
+ /**
+ * JMS Property which holds the sender as String
+ */
+ public final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
+
+ /**
+ * JMS Property which holds the error message as String
+ */
+ public final static String JAMES_MAIL_ERROR_MESSAGE =
"JAMES_MAIL_ERROR_MESSAGE";
+
+ /**
+ * JMS Property which holds the last updated time as long (ms)
+ */
+ public final static String JAMES_MAIL_LAST_UPDATED =
"JAMES_MAIL_LAST_UPDATED";
+
+ /**
+ * JMS Property which holds the mail size as long (bytes)
+ */
+ public final static String JAMES_MAIL_MESSAGE_SIZE =
"JAMES_MAIL_MESSAGE_SIZE";
+
+ /**
+ * JMS Property which holds the mail name as String
+ */
+ public final static String JAMES_MAIL_NAME = "JAMES_MAIL_NAME";
+
+ /**
+ * Separator which is used for separate an array of String values in the
JMS Property value
+ */
+ public final static String JAMES_MAIL_SEPARATOR = ";";
+
+ /**
+ * JMS Property which holds the remote hostname as String
+ */
+ public final static String JAMES_MAIL_REMOTEHOST = "JAMES_MAIL_REMOTEHOST";
+
+ /**
+ * JMS Property which holds the remote ipaddress as String
+ */
+ public final static String JAMES_MAIL_REMOTEADDR = "JAMES_MAIL_REMOTEADDR";
+
+ /**
+ * JMS Property which holds the mail state as String
+ */
+ public final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
+
+ /**
+ * JMS Property which holds the mail attribute names as String
+ */
+ public final static String JAMES_MAIL_ATTRIBUTE_NAMES =
"JAMES_MAIL_ATTRIBUTE_NAMES";
+
+ /**
+ * JMS Property which holds next delivery time as long (ms)
+ */
+ public final static String JAMES_NEXT_DELIVERY = "JAMES_NEXT_DELIVERY";
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]