Author: norman
Date: Wed Nov  3 15:44:18 2010
New Revision: 1030502

URL: http://svn.apache.org/viewvc?rev=1030502&view=rev
Log:
Rewrite ActiveMQQueue to use BlobMessages and a custom BlobTransferPolicy which 
just use the filesystem for store the MimeMessage of the Mail (JAMES-1108)

Added:
    
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
Modified:
    james/server/trunk/container-spring/src/main/config/james/spring-beans.xml
    james/server/trunk/queue-activemq/pom.xml
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
    
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java

Modified: 
james/server/trunk/container-spring/src/main/config/james/spring-beans.xml
URL: 
http://svn.apache.org/viewvc/james/server/trunk/container-spring/src/main/config/james/spring-beans.xml?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- james/server/trunk/container-spring/src/main/config/james/spring-beans.xml 
(original)
+++ james/server/trunk/container-spring/src/main/config/james/spring-beans.xml 
Wed Nov  3 15:44:18 2010
@@ -25,7 +25,7 @@
        xsi:schemaLocation="
           http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring-2.5.0.xsd
-          http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core-5.0-SNAPSHOT.xsd";>
+          http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core-5.4.1.xsd";>
           
     <bean id="exporter" class="org.springframework.jmx.export.MBeanExporter" 
lazy-init="false">
       <property name="beans">
@@ -133,7 +133,7 @@
     </camel:camelContext>
 
     <!--  lets create an embedded ActiveMQ Broker -->
-    <amq:broker useJmx="true" persistent="true" brokerName="james" 
dataDirectory="filesystem=file://var/activemq-data/" useShutdownHook="false" 
id="broker">
+    <amq:broker useJmx="true" persistent="true" brokerName="james" 
dataDirectory="filesystem=file://var/activemq-data/" useShutdownHook="false" 
schedulerSupport="true" id="broker">
         <amq:destinationPolicy>
             <amq:policyMap>
                 <amq:policyEntries>
@@ -150,11 +150,21 @@
             <amq:transportConnector uri="tcp://localhost:0" />
         </amq:transportConnectors>
     </amq:broker>
+    
+    <amq:connectionFactory id="amqConnectionFactory" 
brokerURL="vm://james?create=false">
+        <property name="blobTransferPolicy" ref="blobTransferPolicy"/>
+    </amq:connectionFactory>
+
+    <bean id="blobTransferPolicy" 
class="org.apache.james.queue.activemq.FileSystemBlobTransferPolicy">
+        <property name="defaultUploadUrl" value="file://var/queue"/>
+    </bean>
 
-    <amq:connectionFactory id="amqConnectionFactory" 
brokerURL="vm://james?create=false" />
-
-    <bean id="jmsConnectionFactory"  
class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
-        <property name="connectionFactory" ref="amqConnectionFactory"/>
+    <!-- IMPORTANT: if you use not the ActiveMQMailQueueFactory you prolly 
need to set cacheConsumers to false -->
+    <bean id="jmsConnectionFactory" 
class="org.springframework.jms.connection.CachingConnectionFactory">
+       <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
+       <property name="sessionCacheSize" value="10"/>
+       <property name="cacheConsumers" value="true"/>
+       <property name="cacheProducers" value="true"/>
     </bean>
 
     <!-- setup spring jms TX manager -->
@@ -162,8 +172,7 @@
         <property name="connectionFactory" ref="jmsConnectionFactory"/>
     </bean>
 
-    <!-- Use JMSMailQueueFactory till activemq 5.5.0 is released -->
-    <bean id="mailQueueFactory" 
class="org.apache.james.queue.jms.JMSMailQueueFactory" depends-on="broker"/>
+    <bean id="mailQueueFactory" 
class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory" 
depends-on="broker"/>
         
     <!-- Build the camelroute from the spoolmanager.xml -->
     <bean id="mailProcessor" name="processorRoute" 
class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/>

Added: 
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java?rev=1030502&view=auto
==============================================================================
--- 
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
 (added)
+++ 
james/server/trunk/core-library/src/main/java/org/apache/james/core/NonClosingSharedInputStream.java
 Wed Nov  3 15:44:18 2010
@@ -0,0 +1,73 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.core;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.mail.internet.SharedInputStream;
+
+import org.apache.james.lifecycle.Disposable;
+
+
+/**
+ * A wrapper around classes which implements {...@link SharedInputStream} and 
{...@link InputStream}. This class will not close the underlying
+ * stream on {...@link #close()} call. It will only closs the stream when 
{...@link #dispose()} is called
+ *
+ */
+public class NonClosingSharedInputStream<E extends InputStream & 
SharedInputStream> extends FilterInputStream implements Disposable, 
SharedInputStream{
+
+    public NonClosingSharedInputStream(E in) throws IOException {
+        super(in);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+
+    /**
+     * Close the stream and so all streams which share the same file
+     */
+    public void dispose() {
+        try {
+            super.close();
+        } catch (IOException e) {
+            // ignore on close
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see javax.mail.internet.SharedInputStream#getPosition()
+     */
+    public long getPosition() {
+        return ((SharedInputStream)in).getPosition();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see javax.mail.internet.SharedInputStream#newStream(long, long)
+     */
+    public InputStream newStream(long arg0, long arg1) {
+        return ((SharedInputStream)in).newStream(arg0, arg1);
+    }
+
+}

Modified: james/server/trunk/queue-activemq/pom.xml
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/pom.xml?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/pom.xml (original)
+++ james/server/trunk/queue-activemq/pom.xml Wed Nov  3 15:44:18 2010
@@ -40,6 +40,10 @@
       <groupId>org.apache.james</groupId>
       <artifactId>james-server-core-library</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.james</groupId>
+      <artifactId>james-server-core-api</artifactId>
+    </dependency>
      <dependency>
       <groupId>org.apache.james</groupId>
       <artifactId>apache-mailet</artifactId>
@@ -48,10 +52,33 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
     </dependency>
-   <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-pool</artifactId>
-   </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-jms</artifactId>
+      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.springframework</groupId>
+          <artifactId>spring-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.springframework</groupId>
+          <artifactId>spring-beans</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.springframework</groupId>
+          <artifactId>spring-aop</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.springframework</groupId>
+          <artifactId>spring-tx</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-annotation_1.0_spec</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
@@ -60,5 +87,9 @@
       <groupId>${javax.mail.groupId}</groupId>
       <artifactId>${javax.mail.artifactId}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging-api</artifactId>
+    </dependency> 
   </dependencies>
 </project>

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 Wed Nov  3 15:44:18 2010
@@ -19,7 +19,9 @@
 package org.apache.james.queue.activemq;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -37,14 +39,18 @@ import javax.mail.internet.MimeMessage;
 
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
-import org.apache.activemq.pool.PooledSession;
+import org.apache.activemq.ScheduledMessage;
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.core.MimeMessageInputStream;
 import org.apache.james.core.MimeMessageInputStreamSource;
+import org.apache.james.core.MimeMessageSource;
+import org.apache.james.core.MimeMessageWrapper;
+import org.apache.james.core.NonClosingSharedInputStream;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.jms.JMSMailQueue;
 import org.apache.mailet.Mail;
+import org.springframework.jms.connection.SessionProxy;
 
 /**
  * *...@link MailQueue} implementation which use an ActiveMQ Queue.
@@ -55,8 +61,7 @@ import org.apache.mailet.Mail;
  * primitives, then the toString() method is called on the attribute value to
  * convert it
  * 
- * The implementation support the usage of {...@link BlobMessage} for 
out-of-band
- * transfer of the {...@link MimeMessage}
+ * The implementation use {...@link BlobMessage} 
  * 
  * See http://activemq.apache.org/blob-messages.html for more details
  * 
@@ -69,17 +74,12 @@ import org.apache.mailet.Mail;
  * to it. It should use one of the following value {...@link #LOW_PRIORITY},
  * {...@link #NORMAL_PRIORITY}, {...@link #HIGH_PRIORITY}
  * 
+ * To have a good throughput you should use a caching connection factory.
+ * 
  * 
  */
-public class ActiveMQMailQueue extends JMSMailQueue {
-
-    private long messageTreshold = -1;
+public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
 
-    private final static String JAMES_BLOB_URL = "JAMES_BLOB_URL";
-
-    public final static int NO_DELAY = -1;
-    public final static int DISABLE_TRESHOLD = -1;
-    public final static int BLOBMESSAGE_ONLY = 0;
 
     /**
      * Construct a new ActiveMQ based {...@link MailQueue}. The 
messageTreshold is
@@ -88,10 +88,6 @@ public class ActiveMQMailQueue extends J
      * is used If the message size is bigger then the messageTreshold. The size
      * if in bytes.
      * 
-     * If you want to disable the usage of {...@link BlobMessage} just use
-     * {...@link #DISABLE_TRESHOLD} as value. If you want to use
-     * {...@link BlobMessage} for every message (not depending of the size) 
just
-     * use {...@link #BLOBMESSAGE_ONLY} as value.
      * 
      * For enabling the priority feature in AMQ see:
      * 
@@ -99,22 +95,64 @@ public class ActiveMQMailQueue extends J
      * 
      * @param connectionFactory
      * @param queuename
-     * @param messageTreshold
      * @param logger
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final long messageTreshold, final Log logger) {
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
         super(connectionFactory, queuename, logger);
-        this.messageTreshold = messageTreshold;
     }
-
-    /**
-     * ActiveMQ based {...@link MailQueue} which just use {...@link 
BytesMessage} for
-     * all messages
-     * 
-     * @see #ActiveMQMailQueue(ConnectionFactory, String, long, Log)
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.jms.JMSMailQueue#deQueue()
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
-        this(connectionFactory, queuename, DISABLE_TRESHOLD, logger);
+    public MailQueueItem deQueue() throws MailQueueException {
+        Connection connection = null;
+        Session session = null;
+        Message message = null;
+        MessageConsumer consumer = null;
+
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+
+            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queuename);
+            consumer = session.createConsumer(queue);
+
+            message = consumer.receive();
+            return createMailQueueItem(connection, session, consumer, message);
+
+        } catch (Exception e) {
+            try {
+                session.rollback();
+            } catch (JMSException e1) {
+                // ignore on rollback
+            }
+
+            if (consumer != null) {
+
+                try {
+                    consumer.close();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e1) {
+                // ignore here
+            }
+            throw new MailQueueException("Unable to dequeue next message", e);
+        }
+
     }
 
     /*
@@ -124,21 +162,28 @@ public class ActiveMQMailQueue extends J
      * org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax
      * .jms.Message, org.apache.mailet.Mail)
      */
+    @SuppressWarnings("unchecked")
     protected void populateMailMimeMessage(Message message, Mail mail) throws 
MessagingException {
         if (message instanceof BlobMessage) {
             try {
                 BlobMessage blobMessage = (BlobMessage) message;
                 try {
-                    // store url for later usage. Maybe we can do something
-                    // smart for RemoteDelivery here
-                    // TODO: Check if this makes sense at all
+                    // store URL and queuenamefor later usage
                     mail.setAttribute(JAMES_BLOB_URL, blobMessage.getURL());
+                    mail.setAttribute(JAMES_QUEUE_NAME, queuename);
                 } catch (MalformedURLException e) {
                     // Ignore on error
                     logger.debug("Unable to get url from blobmessage for mail 
" + mail.getName());
                 }
-                mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream())));
-
+                InputStream in = blobMessage.getInputStream();
+                MimeMessageSource source;
+  
+                if (in instanceof NonClosingSharedInputStream) {
+                    source = new MimeMessageBlobMessageSource(blobMessage);
+                } else {
+                    source = new MimeMessageInputStreamSource(mail.getName(), 
blobMessage.getInputStream());
+                }
+                mail.setMessage(new MimeMessageCopyOnWriteProxy(source));
             } catch (IOException e) {
                 throw new MailQueueException("Unable to populate MimeMessage 
for mail " + mail.getName(), e);
             } catch (JMSException e) {
@@ -157,50 +202,95 @@ public class ActiveMQMailQueue extends J
      * org.apache.mailet.Mail, long)
      */
     protected void produceMail(Session session, Map<String,Object> props, int 
msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
-        boolean useBlob = false;
-        if (messageTreshold != -1) {
-            try {
-                if (messageTreshold == 0 || mail.getMessageSize() > 
messageTreshold) {
-                    useBlob = true;
-                }
-            } catch (MessagingException e) {
-                logger.info("Unable to calculate message size for mail " + 
mail.getName() + ". Use BytesMessage for JMS");
-                useBlob = false;
+        MessageProducer producer = null;
+        try {
+            
+            BlobMessage blobMessage = null;
+            MimeMessage mm = mail.getMessage();
+            MimeMessage wrapper = mm;
+            
+            ActiveMQSession amqSession = getAMQSession(session);
+            
+            if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
+                wrapper = 
((MimeMessageCopyOnWriteProxy)mm).getWrappedMessage();
             }
-        }
-        if (useBlob) {
-            MessageProducer producer = null;
-            try {
-                ActiveMQSession amqSession;
-                if (session instanceof PooledSession) {
-                    amqSession = ((PooledSession) 
session).getInternalSession();
-                } else {
-                    amqSession = (ActiveMQSession) session;
-                }
-                BlobMessage message = amqSession.createBlobMessage(new 
MimeMessageInputStream(mail.getMessage()));
-                Queue queue = session.createQueue(queuename);
-
-                producer = session.createProducer(queue);
-                Iterator<String> keys = props.keySet().iterator();
-                while (keys.hasNext()) {
-                    String key = keys.next();
-                    message.setObjectProperty(key, props.get(key));
+            
+            if (wrapper instanceof MimeMessageWrapper) {
+                URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
+                String fromQueue = (String) 
mail.getAttribute(JAMES_QUEUE_NAME);
+                MimeMessageWrapper mwrapper = (MimeMessageWrapper) wrapper;
+
+                if (blobUrl != null && fromQueue != null && 
fromQueue.equals(queuename) && mwrapper.isModified() == false ) {
+                    // the message content was not changed so don't need to 
upload it again and can just point to the url
+                    blobMessage = amqSession.createBlobMessage(blobUrl);
+                    
+                    // thats important so we don't delete the blob file after 
complete the processing!
+                    mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
+                    
                 }
-                producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, 
Message.DEFAULT_TIME_TO_LIVE);
-            } finally {
 
-                try {
-                    if (producer != null)
-                        producer.close();
-                } catch (JMSException e) {
-                    // ignore here
-                }
+            }
+            if (blobMessage == null) {
+                // just use the MimeMessageInputStream which can read every 
MimeMessage implementation
+                blobMessage = amqSession.createBlobMessage(new 
MimeMessageInputStream(wrapper));
             }
             
+            // store the queue name in the props
+            props.put(JAMES_QUEUE_NAME, queuename);
+
+              
+            Queue queue = session.createQueue(queuename);
+
+            producer = session.createProducer(queue);
+            Iterator<String> keys = props.keySet().iterator();
+            while (keys.hasNext()) {
+                String key = keys.next();
+                blobMessage.setObjectProperty(key, props.get(key));
+            }
+            producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, 
Message.DEFAULT_TIME_TO_LIVE);
+        } finally {
+
+            try {
+                if (producer != null)
+                    producer.close();
+            } catch (JMSException e) {
+                // ignore here
+            }
+        }
+      
+    }
+
+    /**
+     * Cast the given {...@link Session} to an {...@link ActiveMQSession}
+     * 
+     * @param session
+     * @return amqSession
+     * @throws JMSException
+     */
+    protected ActiveMQSession getAMQSession(Session session) throws 
JMSException {
+        ActiveMQSession amqSession;
+        
+        if (session instanceof SessionProxy) {
+            // handle Springs CachingConnectionFactory 
+            amqSession = (ActiveMQSession) ((SessionProxy) 
session).getTargetSession();
         } else {
-            super.produceMail(session, props, msgPrio, mail);
+            // just cast as we have no other idea
+            amqSession = (ActiveMQSession) session;
         }
+        return amqSession;
+    }
+    
 
+    @Override
+    protected Map<String, Object> getJMSProperties(Mail mail, long 
delayInMillis) throws JMSException, MessagingException {
+        Map<String, Object> props =  super.getJMSProperties(mail, 
delayInMillis);
+       
+        // add JMS Property for handling message scheduling
+        // http://activemq.apache.org/delay-and-schedule-message-delivery.html
+        if (delayInMillis > 0) {
+            props.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
+        }
+        return props;
     }
 
     @Override

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
 Wed Nov  3 15:44:18 2010
@@ -18,10 +18,7 @@
  ****************************************************************/
 package org.apache.james.queue.activemq;
 
-import javax.jms.BytesMessage;
-import javax.jms.Message;
 
-import org.apache.activemq.BlobMessage;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.jms.JMSMailQueueFactory;
@@ -35,25 +32,13 @@ import org.apache.james.queue.jms.JMSMai
  *
  */
 public class ActiveMQMailQueueFactory extends JMSMailQueueFactory{
-
-    private long sizeTreshold = ActiveMQMailQueue.DISABLE_TRESHOLD;
   
-    /**
-     * The size treshold which will be used for setting if a {...@link 
BlobMessage} or {...@link BytesMessage} will be used
-     * as {...@link Message} type. See {...@link ActiveMQMailQueue} for more 
details
-     * 
-     * @param sizeTreshold
-     */
-    public void setSizeTreshold(long sizeTreshold) {
-        this.sizeTreshold  = sizeTreshold;
-    }
-    
     
     
 
     @Override
        protected MailQueue createMailQueue(String name) {
-        return new ActiveMQMailQueue(connectionFactory, name, sizeTreshold, 
log);
+        return new ActiveMQMailQueue(connectionFactory, name, log);
        }
 
 }

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
 Wed Nov  3 15:44:18 2010
@@ -39,7 +39,7 @@ import org.apache.mailet.Mail;
  * well
  * 
  */
-public class ActiveMQMailQueueItem extends JMSMailQueueItem {
+public class ActiveMQMailQueueItem extends JMSMailQueueItem implements 
ActiveMQSupport{
 
     private final Message message;
     private final Log logger;
@@ -50,14 +50,15 @@ public class ActiveMQMailQueueItem exten
         this.logger = logger;
     }
 
-    @Override
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.queue.jms.JMSMailQueueItem#done(boolean)
+     */
     public void done(boolean success) throws MailQueueException {
         super.done(success);
         if (success) {
-            if (message instanceof ActiveMQBlobMessage) {
-                /*
-                 * TODO: Enable this once activemq 5.4.2 was released
-                // delete the file
+            if (message instanceof ActiveMQBlobMessage && 
getMail().getAttribute(JAMES_REUSE_BLOB_URL) == null) {
+
                 // This should get removed once this jira issue was fixed
                 // https://issues.apache.org/activemq/browse/AMQ-1529
                 try {
@@ -67,9 +68,11 @@ public class ActiveMQMailQueueItem exten
                 } catch (JMSException e) {
                     logger.info("Unable to delete blob message file for mail " 
+ getMail().getName());
                 }
-                */
             }
+            getMail().removeAttribute(JAMES_REUSE_BLOB_URL);
+
         }
+
     }
 
-}
+}
\ No newline at end of file

Added: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java?rev=1030502&view=auto
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
 (added)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQSupport.java
 Wed Nov  3 15:44:18 2010
@@ -0,0 +1,44 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import org.apache.james.queue.jms.JMSSupport;
+
+/**
+ * Interface which should get implemented by ActiveMQ depending implementions 
+ *
+ */
+public interface ActiveMQSupport extends JMSSupport{
+
+    /**
+     * The name of the Queue the mail is stored in
+     */
+    public final static String JAMES_QUEUE_NAME = "JAMES_QUEUE_NAME";
+    
+    /**
+     * The URL of the Blobmessage content
+     */
+    public final static String JAMES_BLOB_URL = "JAMES_BLOB_URL";
+    
+    /**
+     * Indicate that the Blobmessage content is reused for a new message
+     */
+    public final static String JAMES_REUSE_BLOB_URL =" JAMES_REUSE_BLOB_URL";
+
+}

Added: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1030502&view=auto
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
 (added)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
 Wed Nov  3 15:44:18 2010
@@ -0,0 +1,122 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+import javax.mail.util.SharedFileInputStream;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobDownloadStrategy;
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploadStrategy;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.james.core.NonClosingSharedInputStream;
+import org.apache.james.services.FileSystem;
+
+/**
+ * {...@link BlobUploadStrategy} and {...@link BlobDownloadStrategy} 
implementation which use the {...@link FileSystem} to lookup the {...@link 
File} for the {...@link BlobMessage}
+ *
+ */
+public class FileSystemBlobStrategy implements BlobUploadStrategy, 
BlobDownloadStrategy, ActiveMQSupport{
+
+   
+    private final FileSystem fs;
+    private final BlobTransferPolicy policy;
+
+    public FileSystemBlobStrategy(final BlobTransferPolicy policy, final 
FileSystem fs) {
+        this.fs = fs;
+        this.policy = policy;
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.activemq.blob.BlobUploadStrategy#uploadFile(org.apache.activemq.command.ActiveMQBlobMessage,
 java.io.File)
+     */
+    public URL uploadFile(ActiveMQBlobMessage message, File file) throws 
JMSException, IOException {
+        return uploadStream(message, new FileInputStream(file));
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.activemq.blob.BlobUploadStrategy#uploadStream(org.apache.activemq.command.ActiveMQBlobMessage,
 java.io.InputStream)
+     */
+    public URL uploadStream(ActiveMQBlobMessage message, InputStream in) 
throws JMSException, IOException {
+        File f = getFile(message);
+        FileOutputStream out = new FileOutputStream(f);
+        byte[] buffer = new byte[policy.getBufferSize()];
+        for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
+            out.write(buffer, 0, c);
+            out.flush();
+        }
+        out.flush();
+        out.close();
+        return f.toURL();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see 
org.apache.activemq.blob.BlobDownloadStrategy#deleteFile(org.apache.activemq.command.ActiveMQBlobMessage)
+     */
+    public void deleteFile(ActiveMQBlobMessage message) throws IOException, 
JMSException {
+        File f = getFile(message);
+        if (f.exists()) {
+            f.delete();
+        }
+    }
+
+    /**
+     * Returns a {...@link SharedFileInputStream} for the give {...@link 
BlobMessage}
+     */
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws 
IOException, JMSException {
+        // return a NonClosingSharedInputStream to make sure the stream will 
only get closed on dispose call later
+        return new NonClosingSharedInputStream<SharedFileInputStream>(new 
SharedFileInputStream(getFile(message)));
+    }
+
+    
+    /**
+     * Return the {...@link File} for the {...@link ActiveMQBlobMessage}. 
+     * The {...@link File} is lookup via the {...@link FileSystem} service
+     * 
+     * @param message
+     * @return file
+     * @throws JMSException
+     * @throws FileNotFoundException
+     */
+    protected File getFile(ActiveMQBlobMessage message) throws JMSException, 
FileNotFoundException {
+        String queueName = message.getStringProperty(JAMES_QUEUE_NAME);
+        String mailname = message.getStringProperty(JAMES_MAIL_NAME);
+        String queueUrl = policy.getUploadUrl() + "/" + queueName + "/";
+        File queueF = fs.getFile(queueUrl);
+        
+        // check if we need to create the queue folder
+        if (queueF.exists() == false) {
+            queueF.mkdirs();
+        }
+        return fs.getFile(queueUrl+ "/" + mailname);
+        
+    }
+}

Added: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java?rev=1030502&view=auto
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
 (added)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobTransferPolicy.java
 Wed Nov  3 15:44:18 2010
@@ -0,0 +1,64 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import javax.annotation.Resource;
+
+import org.apache.activemq.blob.BlobDownloadStrategy;
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.blob.BlobUploadStrategy;
+import org.apache.james.services.FileSystem;
+
+/**
+ * {...@link BlobTransferPolicy} which use the {...@link FileSystem} to 
download and upload data.
+ * So this implementation is only useful when using a non-clustered ActiveMQ 
Broker or when using a shared Storage for the files.
+ *
+ */
+public class FileSystemBlobTransferPolicy extends BlobTransferPolicy{
+
+    private FileSystem fs;
+
+    @Resource(name="filesystem")
+    public void setFileSystem(FileSystem fs) {
+        this.fs = fs;
+    }
+
+    @Override
+    public BlobTransferPolicy copy() {
+        FileSystemBlobTransferPolicy that = new FileSystemBlobTransferPolicy();
+        that.setFileSystem(fs);
+        that.setDefaultUploadUrl(getDefaultUploadUrl());
+        that.setBrokerUploadUrl(getBrokerUploadUrl());
+        that.setUploadUrl(getUploadUrl());
+        that.setUploadStrategy(getUploadStrategy());
+        return that;
+    }
+
+    
+    @Override
+    protected BlobDownloadStrategy createDownloadStrategy() {
+        return new FileSystemBlobStrategy(this, fs);
+    }
+
+    @Override
+    protected BlobUploadStrategy createUploadStrategy() {
+        return new FileSystemBlobStrategy(this, fs);
+    }
+
+}

Added: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java?rev=1030502&view=auto
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
 (added)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
 Wed Nov  3 15:44:18 2010
@@ -0,0 +1,82 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.activemq;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.Disposable;
+import org.apache.james.core.MimeMessageSource;
+import org.apache.james.core.NonClosingSharedInputStream;
+import org.apache.james.lifecycle.LifecycleUtil;
+
+/**
+ * {...@link MimeMessageSource} which use a {...@link BlobMessage} as input. 
Be aware that {...@link BlobMessage} must contain
+ * a {...@link NonClosingSharedInputStream} for this implementation!
+ *
+ */
+...@suppresswarnings("unchecked")
+public class MimeMessageBlobMessageSource extends MimeMessageSource implements 
ActiveMQSupport, Disposable{
+
+    private NonClosingSharedInputStream in;
+    private String sourceId;
+    private long size;
+
+    public MimeMessageBlobMessageSource(BlobMessage message) throws 
JMSException, IOException {
+        this.sourceId = message.getJMSMessageID();
+        this.size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+        this.in = (NonClosingSharedInputStream)message.getInputStream();
+    }
+    
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.core.MimeMessageSource#getInputStream()
+     */
+    public InputStream getInputStream() throws IOException {
+        return in.newStream(0, -1);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.core.MimeMessageSource#getSourceId()
+     */
+    public String getSourceId() {
+        return sourceId;
+    }
+    
+    @Override
+    public long getMessageSize() throws IOException {
+        // if the size is < 1 we seems to not had it stored in the property, 
so fallback to super implementation
+        if (size == -1) {
+            super.getMessageSize();
+        }
+        return size;
+    }
+    
+    /**
+     * Call dispose on the {...@link InputStream}
+     */
+    public void dispose() {
+        LifecycleUtil.dispose(in);
+    }
+}

Modified: 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
 (original)
+++ 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
 Wed Nov  3 15:44:18 2010
@@ -46,6 +46,11 @@ import org.apache.mailet.Mail;
 public interface MailQueue {
 
     /**
+     * No delay for queued {...@link MailQueueItem}
+     */
+    public final static int NO_DELAY = -1;
+
+    /**
      * Enqueue the Mail to the queue. The given delay and unit are used to 
calculate the time when the 
      * Mail will be avaible for dequeue
      * 

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 Wed Nov  3 15:44:18 2010
@@ -57,23 +57,11 @@ import org.apache.mailet.MailAddress;
  * 
  * 
  */
-public class JMSMailQueue implements MailQueue {
+public class JMSMailQueue implements MailQueue, JMSSupport {
 
     protected final String queuename;
     protected final ConnectionFactory connectionFactory;
     protected final Log logger;
-    protected final static String JAMES_MAIL_RECIPIENTS = 
"JAMES_MAIL_RECIPIENTS";
-    protected final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
-    protected final static String JAMES_MAIL_ERROR_MESSAGE = 
"JAMES_MAIL_ERROR_MESSAGE";
-    protected final static String JAMES_MAIL_LAST_UPDATED = 
"JAMES_MAIL_LAST_UPDATED";
-    protected final static String JAMES_MAIL_MESSAGE_SIZE = 
"JAMES_MAIL_MESSAGE_SIZE";
-    protected final static String JAMES_MAIL_NAME = "JAMES_MAIL_NAME";
-    protected final static String JAMES_MAIL_SEPERATOR = ";";
-    protected final static String JAMES_MAIL_REMOTEHOST = 
"JAMES_MAIL_REMOTEHOST";
-    protected final static String JAMES_MAIL_REMOTEADDR = 
"JAMES_MAIL_REMOTEADDR";
-    protected final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
-    protected final static String JAMES_MAIL_ATTRIBUTE_NAMES = 
"JAMES_MAIL_ATTRIBUTE_NAMES";
-    protected final static String JAMES_NEXT_DELIVERY = "JAMES_NEXT_DELIVERY";
 
     /**
      * Handle mail with lowest priority
@@ -106,7 +94,7 @@ public class JMSMailQueue implements Mai
     }
 
 /**
-     * Execute the given {...@link DequeueOperation} when a mail is ready to 
precoess. As JMS does not support delay scheduling out-of-the box, we use 
+     * Execute the given {...@link DequeueOperation} when a mail is ready to 
process. As JMS does not support delay scheduling out-of-the box, we use 
      * a messageselector to check if a mail is ready. For this a {...@link 
MessageConsumer#receive(long) is used with a timeout of 10 seconds. 
      * 
      * Many JMS implementations support better solutions for this, so this 
should get overridden by these implementations
@@ -126,7 +114,7 @@ public class JMSMailQueue implements Mai
 
                 session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
                 Queue queue = session.createQueue(queuename);
-                consumer = session.createConsumer(queue, JAMES_NEXT_DELIVERY + 
" <= " + System.currentTimeMillis());
+                consumer = session.createConsumer(queue, getMessageSelector());
 
                 message = consumer.receive(10000);
 
@@ -192,6 +180,15 @@ public class JMSMailQueue implements Mai
 
     }
 
+    /**
+     * Return message selector to use for consuming
+     * 
+     * @return selector
+     */
+    protected String getMessageSelector() {
+        return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis();
+    }
+    
     /*
      * (non-Javadoc)
      * 
@@ -257,7 +254,7 @@ public class JMSMailQueue implements Mai
      * @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail)
      */
     public void enQueue(Mail mail) throws MailQueueException {
-        enQueue(mail, -1, TimeUnit.MILLISECONDS);
+        enQueue(mail, NO_DELAY, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -325,7 +322,7 @@ public class JMSMailQueue implements Mai
             String recipient = recipients.next().toString();
             recipientsBuilder.append(recipient.trim());
             if (recipients.hasNext()) {
-                recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
+                recipientsBuilder.append(JAMES_MAIL_SEPARATOR);
             }
         }
         props.put(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
@@ -350,7 +347,7 @@ public class JMSMailQueue implements Mai
             props.put(attrName, value);
 
             if (attrs.hasNext()) {
-                attrsBuilder.append(JAMES_MAIL_SEPERATOR);
+                attrsBuilder.append(JAMES_MAIL_SEPARATOR);
             }
         }
         props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
@@ -409,7 +406,7 @@ public class JMSMailQueue implements Mai
 
         List<MailAddress> rcpts = new ArrayList<MailAddress>();
         String recipients = message.getStringProperty(JAMES_MAIL_RECIPIENTS);
-        StringTokenizer recipientTokenizer = new StringTokenizer(recipients, 
JAMES_MAIL_SEPERATOR);
+        StringTokenizer recipientTokenizer = new StringTokenizer(recipients, 
JAMES_MAIL_SEPARATOR);
         while (recipientTokenizer.hasMoreTokens()) {
             try {
                 MailAddress rcpt = new 
MailAddress(recipientTokenizer.nextToken());
@@ -425,7 +422,7 @@ public class JMSMailQueue implements Mai
         mail.setRemoteHost(message.getStringProperty(JAMES_MAIL_REMOTEHOST));
 
         String attributeNames = 
message.getStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES);
-        StringTokenizer namesTokenizer = new StringTokenizer(attributeNames, 
JAMES_MAIL_SEPERATOR);
+        StringTokenizer namesTokenizer = new StringTokenizer(attributeNames, 
JAMES_MAIL_SEPARATOR);
         while (namesTokenizer.hasMoreTokens()) {
             String name = namesTokenizer.nextToken();
             Serializable attrValue = message.getStringProperty(name);
@@ -468,6 +465,17 @@ public class JMSMailQueue implements Mai
         return "MailQueue:" + queuename;
     }
 
+    /**
+     * Create a {...@link MailQueueItem} for the given parameters
+     * 
+     * @param connection
+     * @param session
+     * @param consumer
+     * @param message
+     * @return item
+     * @throws JMSException
+     * @throws MessagingException
+     */
     protected MailQueueItem createMailQueueItem(Connection connection, Session 
session, MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
         final Mail mail = createMail(message);
         return new JMSMailQueueItem(mail, connection, session, consumer);

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java?rev=1030502&r1=1030501&r2=1030502&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
 Wed Nov  3 15:44:18 2010
@@ -34,10 +34,10 @@ import org.apache.mailet.Mail;
  */
 public class JMSMailQueueItem implements MailQueueItem {
 
-    private final Mail mail;
-    private final Connection connection;
-    private final Session session;
-    private final MessageConsumer consumer;
+    protected final Mail mail;
+    protected final Connection connection;
+    protected final Session session;
+    protected final MessageConsumer consumer;
 
     public JMSMailQueueItem(Mail mail, Connection connection, Session session, 
MessageConsumer consumer) {
         this.mail = mail;

Added: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java?rev=1030502&view=auto
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
 (added)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSupport.java
 Wed Nov  3 15:44:18 2010
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+/**
+ * Interface which should get implemented by JMS depending implementations
+ *
+ */
+public interface JMSSupport {
+
+    /**
+     * JMS Property which holds the recipient as String
+     */
+    public final static String JAMES_MAIL_RECIPIENTS = "JAMES_MAIL_RECIPIENTS";
+    
+    /**
+     * JMS Property which holds the sender as String
+     */
+    public final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
+    
+    /**
+     * JMS Property which holds the error message as String
+     */
+    public final static String JAMES_MAIL_ERROR_MESSAGE = 
"JAMES_MAIL_ERROR_MESSAGE";
+    
+    /**
+     * JMS Property which holds the last updated time as long (ms)
+     */
+    public final static String JAMES_MAIL_LAST_UPDATED = 
"JAMES_MAIL_LAST_UPDATED";
+    
+    /**
+     * JMS Property which holds the mail size as long (bytes)
+     */
+    public final static String JAMES_MAIL_MESSAGE_SIZE = 
"JAMES_MAIL_MESSAGE_SIZE";
+    
+    /**
+     * JMS Property which holds the mail name as String
+     */
+    public final static String JAMES_MAIL_NAME = "JAMES_MAIL_NAME";
+    
+    /**
+     * Separator which is used for separate an array of String values in the 
JMS Property value
+     */
+    public final static String JAMES_MAIL_SEPARATOR = ";";
+    
+    /**
+     * JMS Property which holds the remote hostname as String
+     */
+    public final static String JAMES_MAIL_REMOTEHOST = "JAMES_MAIL_REMOTEHOST";
+    
+    /**
+     * JMS Property which holds the remote ipaddress as String
+     */
+    public final static String JAMES_MAIL_REMOTEADDR = "JAMES_MAIL_REMOTEADDR";
+    
+    /**
+     * JMS Property which holds the mail state as String
+     */
+    public final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
+    
+    /**
+     * JMS Property which holds the mail attribute names as String
+     */
+    public final static String JAMES_MAIL_ATTRIBUTE_NAMES = 
"JAMES_MAIL_ATTRIBUTE_NAMES";
+    
+    /**
+     * JMS Property which holds next delivery time as long (ms)
+     */
+    public final static String JAMES_NEXT_DELIVERY = "JAMES_NEXT_DELIVERY";
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to