Author: norman
Date: Mon Oct 18 11:24:34 2010
New Revision: 1023741
URL: http://svn.apache.org/viewvc?rev=1023741&view=rev
Log:
Refactor MailQueue interface to return a MailQueueItem. This is more elegant
then before and allows better integration with multithreading
Added:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
Modified:
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
Modified:
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
---
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
(original)
+++
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
Mon Oct 18 11:24:34 2010
@@ -26,8 +26,8 @@ import org.apache.james.dnsservice.api.T
import org.apache.james.lifecycle.LifecycleUtil;
import org.apache.james.queue.MailQueue;
import org.apache.james.queue.MailQueueFactory;
-import org.apache.james.queue.MailQueue.DequeueOperation;
import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
import org.apache.james.services.MailServer;
import org.apache.james.util.TimeConverter;
import org.apache.mailet.base.GenericMailet;
@@ -710,65 +710,65 @@ public class RemoteDelivery extends Gene
// of time to block is determined by the 'getWaitTime'
// method of the
// MultipleDelayFilter.
- queue.deQueue(new DequeueOperation() {
+ MailQueueItem queueItem = queue.deQueue();
+ Mail mail = queueItem.getMail();
+
+ String key = mail.getName();
+ try {
+ if (isDebug) {
+ String message = Thread.currentThread().getName()
+ + " will process mail " + key;
+ log(message);
+ }
- public void process(Mail mail) throws
MailQueueException {
- String key = mail.getName();
+ // Deliver message
+ if (deliver(mail, session)) {
+ // Message was successfully delivered/fully
failed...
+ // delete it
+ LifecycleUtil.dispose(mail);
+ //workRepository.remove(key);
+ } else {
+ // Something happened that will delay delivery.
+ // Store it back in the retry repository.
+ //workRepository.store(mail);
+ int retries = 0;
try {
- if (isDebug) {
- String message =
Thread.currentThread().getName()
- + " will process mail " + key;
- log(message);
- }
-
- // Deliver message
- if (deliver(mail, session)) {
- // Message was successfully
delivered/fully failed...
- // delete it
- LifecycleUtil.dispose(mail);
- //workRepository.remove(key);
- } else {
- // Something happened that will delay
delivery.
- // Store it back in the retry repository.
- //workRepository.store(mail);
- int retries = 0;
- try {
- retries =
Integer.parseInt(mail.getErrorMessage());
- } catch (NumberFormatException e) {
- // Something strange was happen with
the errorMessage..
- }
-
- long delay = getNextDelay (retries);
- queue.enQueue(mail, delay,
TimeUnit.MILLISECONDS);
- LifecycleUtil.dispose(mail);
-
- // This is an update, so we have to unlock
and
- // notify or this mail is kept locked by
this thread.
- //workRepository.unlock(key);
-
- // Note: We do not notify because we
updated an
- // already existing mail and we are now
free to handle
- // more mails.
- // Furthermore this mail should not be
processed now
- // because we have a retry time scheduling.
- }
-
- // Clear the object handle to make sure it
recycles
- // this object.
- mail = null;
- } catch (Exception e) {
- // Prevent unexpected exceptions from causing
looping by
- // removing message from outgoing.
- // DO NOT CHANGE THIS to catch Error! For
example, if
- // there were an OutOfMemory condition caused
because
- // something else in the server was abusing
memory, we would
- // not want to start purging the retrying
spool!
- LifecycleUtil.dispose(mail);
- //workRepository.remove(key);
- throw new MailQueueException("Unable to
perform dequeue", e);
+ retries =
Integer.parseInt(mail.getErrorMessage());
+ } catch (NumberFormatException e) {
+ // Something strange was happen with the
errorMessage..
}
+
+ long delay = getNextDelay (retries);
+ queue.enQueue(mail, delay, TimeUnit.MILLISECONDS);
+ LifecycleUtil.dispose(mail);
+
+ // This is an update, so we have to unlock and
+ // notify or this mail is kept locked by this
thread.
+ //workRepository.unlock(key);
+
+ // Note: We do not notify because we updated an
+ // already existing mail and we are now free to
handle
+ // more mails.
+ // Furthermore this mail should not be processed
now
+ // because we have a retry time scheduling.
}
- });
+
+ // Clear the object handle to make sure it recycles
+ // this object.
+ mail = null;
+ queueItem.done(true);
+ } catch (Exception e) {
+ // Prevent unexpected exceptions from causing looping
by
+ // removing message from outgoing.
+ // DO NOT CHANGE THIS to catch Error! For example, if
+ // there were an OutOfMemory condition caused because
+ // something else in the server was abusing memory, we
would
+ // not want to start purging the retrying spool!
+ LifecycleUtil.dispose(mail);
+ //workRepository.remove(key);
+ queueItem.done(false);
+ throw new MailQueueException("Unable to perform
dequeue", e);
+ }
} catch (Throwable e) {
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
Mon Oct 18 11:24:34 2010
@@ -27,20 +27,17 @@ import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
-import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.pool.PooledSession;
import org.apache.commons.logging.Log;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.core.MimeMessageInputStream;
import org.apache.james.core.MimeMessageInputStreamSource;
-import org.apache.james.core.MimeMessageWrapper;
import org.apache.james.queue.MailQueue;
import org.apache.james.queue.jms.JMSMailQueue;
import org.apache.mailet.Mail;
@@ -108,87 +105,15 @@ public class ActiveMQMailQueue extends J
public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final
String queuename, final Log logger) {
this(connectionFactory, queuename, DISABLE_TRESHOLD, logger);
}
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.queue.activemq.MailQueue#deQueue()
- */
- public void deQueue(DequeueOperation operation) throws MailQueueException,
MessagingException {
- Connection connection = null;
- Session session = null;
- Message message = null;
- MessageConsumer consumer = null;
- try {
- connection = connectionFactory.createConnection();
- connection.start();
-
- session = connection.createSession(true,
Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(queuename);
- consumer = session.createConsumer(queue);
- message = consumer.receive();
-
- if (message == null){
- return;
- }
-
- Mail mail = createMail(message);
- operation.process(mail);
- session.commit();
- if (message instanceof ActiveMQBlobMessage) {
- // delete the file
- // This should get removed once this jira issue was fixed
- // https://issues.apache.org/activemq/browse/AMQ-1529
- try {
- ((ActiveMQBlobMessage) message).deleteFile();
- } catch (IOException e) {
- logger.info("Unable to delete blob message file for mail "
+ mail.getName());
- }
- }
- } catch (JMSException e) {
- throw new MailQueueException("Unable to dequeue next message", e);
- } catch (MessagingException e) {
-
- if (session != null) {
- try {
- session.rollback();
- } catch (JMSException e1) {
- // ignore on rollback
- }
- }
- } finally {
- if (consumer != null) {
-
- try {
- consumer.close();
- } catch (JMSException e1) {
- // ignore on rollback
- }
- }
- try {
- if (session != null) session.close();
- } catch (JMSException e) {
- // ignore here
- }
-
- try {
- if (connection != null) connection.close();
- } catch (JMSException e) {
- // ignore here
- }
- }
-
-
- }
/*
* (non-Javadoc)
* @see
org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax.jms.Message,
org.apache.mailet.Mail)
*/
- protected void populateMailMimeMessage(Message message, Mail mail)
- throws MessagingException {
- if (message instanceof BlobMessage) {
- try {
- BlobMessage blobMessage = (BlobMessage)
message;
+ protected void populateMailMimeMessage(Message message, Mail mail) throws
MessagingException {
+ if (message instanceof BlobMessage) {
+ try {
+ BlobMessage blobMessage = (BlobMessage) message;
try {
// store url for later usage. Maybe we can do something
smart for RemoteDelivery here
// TODO: Check if this makes sense at all
@@ -199,15 +124,15 @@ public class ActiveMQMailQueue extends J
}
mail.setMessage(new MimeMessageCopyOnWriteProxy(new
MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream())));
- } catch (IOException e) {
- throw new MailQueueException("Unable to
populate MimeMessage for mail " + mail.getName(), e);
- } catch (JMSException e) {
- throw new MailQueueException("Unable to
populate MimeMessage for mail " + mail.getName(), e);
- }
- } else {
- super.populateMailMimeMessage(message, mail);
- }
- }
+ } catch (IOException e) {
+ throw new MailQueueException("Unable to populate MimeMessage
for mail " + mail.getName(), e);
+ } catch (JMSException e) {
+ throw new MailQueueException("Unable to populate MimeMessage
for mail " + mail.getName(), e);
+ }
+ } else {
+ super.populateMailMimeMessage(message, mail);
+ }
+ }
/*
* (non-Javadoc)
@@ -242,20 +167,10 @@ public class ActiveMQMailQueue extends J
}
- /*
- * (non-Javadoc)
- * @see
org.apache.james.queue.jms.JMSMailQueue#populateJMSProperties(javax.jms.Message,
org.apache.mailet.Mail, long)
- */
- protected void populateJMSProperties(Message message, Mail mail,
- long delayInMillis) throws JMSException,
MessagingException {
- if (delayInMillis > 0) {
- // This will get picked up by activemq for delay message
-
message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY,
delayInMillis);
- }
-
- super.populateJMSProperties(message, mail, delayInMillis);
- }
-
-
+ @Override
+ protected MailQueueItem createMailQueueItem(Connection connection, Session
session, MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
+ Mail mail = createMail(message);
+ return new ActiveMQMailQueueItem(mail, connection, session, consumer,
message, logger);
+ }
}
Added:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java?rev=1023741&view=auto
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
(added)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
Mon Oct 18 11:24:34 2010
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.logging.Log;
+import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
+import org.apache.james.queue.jms.JMSMailQueueItem;
+import org.apache.mailet.Mail;
+
+/**
+ * ActiveMQ {...@link MailQueueItem} implementation which handles
Blob-Messages as
+ * well
+ *
+ */
+public class ActiveMQMailQueueItem extends JMSMailQueueItem {
+
+ private final Message message;
+ private final Log logger;
+
+ public ActiveMQMailQueueItem(Mail mail, Connection connection, Session
session, MessageConsumer consumer, Message message, Log logger) {
+ super(mail, connection, session, consumer);
+ this.message = message;
+ this.logger = logger;
+ }
+
+ @Override
+ public void done(boolean success) throws MailQueueException {
+ super.done(success);
+ if (success) {
+ if (message instanceof ActiveMQBlobMessage) {
+ /*
+ * TODO: Enable this once activemq 5.4.2 was released
+ // delete the file
+ // This should get removed once this jira issue was fixed
+ // https://issues.apache.org/activemq/browse/AMQ-1529
+ try {
+ ((ActiveMQBlobMessage) message).deleteFile();
+ } catch (IOException e) {
+ logger.info("Unable to delete blob message file for mail "
+ getMail().getName());
+ } catch (JMSException e) {
+ logger.info("Unable to delete blob message file for mail "
+ getMail().getName());
+ }
+ */
+ }
+ }
+ }
+
+}
Modified:
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
---
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
(original)
+++
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
Mon Oct 18 11:24:34 2010
@@ -54,7 +54,7 @@ public interface MailQueue {
* @param unit
* @throws MailQueueException
*/
- public void enQueue(Mail mail, long delay, TimeUnit unit) throws
MailQueueException, MessagingException;
+ public void enQueue(Mail mail, long delay, TimeUnit unit) throws
MailQueueException;
/**
@@ -63,7 +63,7 @@ public interface MailQueue {
* @param mail
* @throws MailQueueException
*/
- public void enQueue(Mail mail) throws MailQueueException,
MessagingException;
+ public void enQueue(Mail mail) throws MailQueueException;
/**
@@ -73,7 +73,7 @@ public interface MailQueue {
* @param dequeueOperation
* @throws MailQueueException
*/
- public void deQueue(DequeueOperation operation) throws MailQueueException,
MessagingException;
+ public MailQueueItem deQueue() throws MailQueueException;
/**
@@ -92,19 +92,28 @@ public interface MailQueue {
}
}
-
+
/**
*
- * Operation which will get executed once a new Mail is ready to process
+ *
*/
- public interface DequeueOperation {
+ public interface MailQueueItem {
+
+ /**
+ * Return the dequeued {...@link Mail}
+ *
+ * @return mail
+ */
+ public Mail getMail();
/**
- * Process some action on the mail
- * @param mail
+ * Callback which MUST get called after the operation on the dequeued
{...@link Mail} was complete.
+ *
+ * This is mostly used to either commit a transaction or rollback.
+ *
+ * @param success
* @throws MailQueueException
- * @throws MessagingException
*/
- public void process(Mail mail) throws MailQueueException,
MessagingException;
+ public void done(boolean success) throws MailQueueException;
}
}
Modified:
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
---
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
(original)
+++
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
Mon Oct 18 11:24:34 2010
@@ -25,8 +25,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import javax.mail.MessagingException;
-
+import org.apache.james.queue.MailQueue.MailQueueException;
import org.apache.mailet.Mail;
public class MockMailQueue implements MailQueue{
@@ -42,19 +41,30 @@ public class MockMailQueue implements Ma
this.throwException = true;
}
- public void deQueue(DequeueOperation operation) throws MailQueueException,
MessagingException {
+ public MailQueueItem deQueue() throws MailQueueException {
if (throwException) {
throwException = false;
throw new MailQueueException("Mock");
}
try {
- operation.process(queue.take());
+ final Mail mail = queue.take();
+ return new MailQueueItem() {
+
+ public Mail getMail() {
+ return mail;
+ }
+
+ public void done(boolean success) throws
MailQueueException {
+ // do nothing here
+
+ }
+ };
} catch (InterruptedException e) {
throw new MailQueueException("Mock",e);
}
}
- public void enQueue(final Mail mail, long delay, TimeUnit unit) throws
MailQueueException, MessagingException {
+ public void enQueue(final Mail mail, long delay, TimeUnit unit) throws
MailQueueException {
if (throwException) {
throwException = false;
throw new MailQueueException("Mock");
@@ -72,7 +82,7 @@ public class MockMailQueue implements Ma
}, delay, unit);
}
- public void enQueue(Mail mail) throws MailQueueException,
MessagingException {
+ public void enQueue(Mail mail) throws MailQueueException {
if (throwException) {
throwException = false;
throw new MailQueueException("Mock");
Modified:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
(original)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Mon Oct 18 11:24:34 2010
@@ -108,14 +108,13 @@ public class JMSMailQueue implements Mai
*
* @see
org.apache.james.queue.MailQueue#deQueue(org.apache.james.queue.MailQueue.DequeueOperation)
*/
- public void deQueue(DequeueOperation operation) throws MailQueueException,
MessagingException {
+ public MailQueueItem deQueue() throws MailQueueException {
Connection connection = null;
Session session = null;
Message message = null;
MessageConsumer consumer = null;
- boolean received = false;
- while(received == false) {
+ while(true) {
try {
connection = connectionFactory.createConnection();
connection.start();
@@ -127,48 +126,65 @@ public class JMSMailQueue implements Mai
message = consumer.receive(10000);
if (message != null) {
- received = true;
- Mail mail = createMail(message);
- operation.process(mail);
+ return createMailQueueItem(connection, session, consumer,
message);
+ } else {
+ session.commit();
+
+ if (consumer != null) {
+
+ try {
+ consumer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
}
- session.commit();
- } catch (JMSException e) {
- throw new MailQueueException("Unable to dequeue next message",
e);
- } catch (MessagingException e) {
-
- if (session != null) {
+ } catch (Exception e) {
+ try {
+ session.rollback();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+
+ if (consumer != null) {
+
try {
- session.rollback();
+ consumer.close();
} catch (JMSException e1) {
- // ignore on rollback
+ // ignore on rollback
}
}
- } finally {
- if (consumer != null) {
-
- try {
- consumer.close();
- } catch (JMSException e1) {
- // ignore on rollback
- }
- }
- try {
- if (session != null)
- session.close();
- } catch (JMSException e) {
- // ignore here
- }
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
- try {
- if (connection != null)
- connection.close();
- } catch (JMSException e) {
- // ignore here
- }
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e1) {
+ // ignore here
+ }
+ throw new MailQueueException("Unable to dequeue next message",
e);
}
}
-
+
}
/*
@@ -176,7 +192,7 @@ public class JMSMailQueue implements Mai
* @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail,
long, java.util.concurrent.TimeUnit)
*/
public void enQueue(Mail mail, long delay, TimeUnit unit)
- throws MailQueueException, MessagingException {
+ throws MailQueueException {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
@@ -238,8 +254,7 @@ public class JMSMailQueue implements Mai
* (non-Javadoc)
* @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail)
*/
- public void enQueue(Mail mail) throws MailQueueException,
- MessagingException {
+ public void enQueue(Mail mail) throws MailQueueException{
enQueue(mail, 0, TimeUnit.MILLISECONDS);
}
@@ -442,5 +457,10 @@ public class JMSMailQueue implements Mai
public String toString() {
return "MailQueue:" + queuename;
}
+
+ protected MailQueueItem createMailQueueItem(Connection connection, Session
session, MessageConsumer consumer, Message message) throws JMSException,
MessagingException{
+ final Mail mail = createMail(message);
+ return new JMSMailQueueItem(mail, connection, session, consumer);
+ }
}
Added:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java?rev=1023741&view=auto
==============================================================================
---
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
(added)
+++
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
Mon Oct 18 11:24:34 2010
@@ -0,0 +1,101 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.jms;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
+import org.apache.mailet.Mail;
+
+/**
+ * JMS {...@link MailQueueItem} implementation
+ *
+ */
+public class JMSMailQueueItem implements MailQueueItem {
+
+ private final Mail mail;
+ private final Connection connection;
+ private final Session session;
+ private final MessageConsumer consumer;
+
+ public JMSMailQueueItem(Mail mail, Connection connection, Session session,
MessageConsumer consumer) {
+ this.mail = mail;
+ this.connection = connection;
+ this.session = session;
+ this.consumer = consumer;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.james.queue.MailQueue.MailQueueItem#done(boolean)
+ */
+ public void done(boolean success) throws MailQueueException {
+ try {
+ if (success) {
+ session.commit();
+ } else {
+ try {
+ session.rollback();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+ } catch (JMSException ex) {
+ throw new MailQueueException("Unable to commit dequeue operation
for mail " + mail.getName(), ex);
+ } finally {
+ if (consumer != null) {
+
+ try {
+ consumer.close();
+ } catch (JMSException e1) {
+ // ignore on rollback
+ }
+ }
+ try {
+ if (session != null)
+ session.close();
+ } catch (JMSException e) {
+ // ignore here
+ }
+
+ try {
+ if (connection != null)
+ connection.close();
+ } catch (JMSException e) {
+ // ignore here
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.james.queue.MailQueue.MailQueueItem#getMail()
+ */
+ public Mail getMail() {
+ return mail;
+ }
+
+}
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
---
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
(original)
+++
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
Mon Oct 18 11:24:34 2010
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.Atomi
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
-import javax.mail.MessagingException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.HierarchicalConfiguration;
@@ -43,8 +42,7 @@ import org.apache.james.mailetcontainer.
import org.apache.james.mailetcontainer.MailetContainer;
import org.apache.james.queue.MailQueue;
import org.apache.james.queue.MailQueueFactory;
-import org.apache.james.queue.MailQueue.DequeueOperation;
-import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
import org.apache.james.services.SpoolManager;
import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
@@ -157,31 +155,30 @@ public class JamesSpoolManager implement
numActive.incrementAndGet();
try {
- queue.deQueue(new DequeueOperation() {
-
- /*
- * (non-Javadoc)
- * @see
org.apache.james.queue.activemq.MailQueue.DequeueOperation#process(org.apache.mailet.Mail)
- */
- public void process(Mail mail) throws MailQueueException,
MessagingException {
- if (logger.isDebugEnabled()) {
- StringBuffer debugBuffer =
- new StringBuffer(64)
- .append("==== Begin processing mail ")
- .append(mail.getName())
- .append("====");
- logger.debug(debugBuffer.toString());
- }
-
- try {
- mailProcessor.service(mail);
- } finally {
- LifecycleUtil.dispose(mail);
- mail = null;
- }
+ MailQueueItem queueItem = queue.deQueue();
+ Mail mail = queueItem.getMail();
+ if (logger.isDebugEnabled()) {
+ StringBuffer debugBuffer =
+ new StringBuffer(64)
+ .append("==== Begin processing mail ")
+ .append(mail.getName())
+ .append("====");
+ logger.debug(debugBuffer.toString());
+ }
+
+ try {
+ mailProcessor.service(mail);
+ queueItem.done(true);
+ } catch (Exception e) {
+ if (active.get() && logger.isErrorEnabled()) {
+ logger.error("Exception processing mail in
JamesSpoolManager.run " + e.getMessage(), e);
}
- });
-
+ queueItem.done(false);
+
+ } finally {
+ LifecycleUtil.dispose(mail);
+ mail = null;
+ }
} catch (Throwable e) {
if (active.get() && logger.isErrorEnabled()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]