This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 314b9dc960 JAMES-3891 ActiveMQCacheableMailQueue: discard emails not
backed by a blob (#1465)
314b9dc960 is described below
commit 314b9dc960dc9c452f5d2d8cd0065b0cc200673d
Author: Benoit TELLIER <[email protected]>
AuthorDate: Sun Feb 26 20:40:14 2023 +0700
JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob
(#1465)
---
.../queue/activemq/ActiveMQCacheableMailQueue.java | 27 ++++++++++++++++++----
.../james/queue/jms/JMSCacheableMailQueue.java | 7 +++---
2 files changed, 26 insertions(+), 8 deletions(-)
diff --git
a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
index 01596cb926..6660a2a7db 100644
---
a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
+++
b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
@@ -18,6 +18,7 @@
****************************************************************/
package org.apache.james.queue.activemq;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
@@ -56,6 +57,8 @@ import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
/**
* <p>
* {@link MailQueue} implementation which use an ActiveMQ Queue.
@@ -226,10 +229,26 @@ public class ActiveMQCacheableMailQueue extends
JMSCacheableMailQueue implements
}
@Override
- protected MailQueueItem createMailQueueItem(Session session,
MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
- Mail mail = createMail(message);
- ActiveMQMailQueueItem activeMQMailQueueItem = new
ActiveMQMailQueueItem(mail, session, consumer, message);
- return mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem,
queueName);
+ protected Mono<MailQueueItem> createMailQueueItem(Session session,
MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
+ try {
+ Mail mail = createMail(message);
+ ActiveMQMailQueueItem activeMQMailQueueItem = new
ActiveMQMailQueueItem(mail, session, consumer, message);
+ return
Mono.just(mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem,
queueName));
+ } catch (MessagingException e) {
+ if (e.getCause() instanceof FileNotFoundException) {
+ LOGGER.warn("Blob message cannot be found, discarding email",
e);
+ try {
+ session.commit();
+ } catch (JMSException ex) {
+ throw new MailQueueException("Unable to commit dequeue
operation for mail", ex);
+ } finally {
+ JMSCacheableMailQueue.closeConsumer(consumer);
+ JMSCacheableMailQueue.closeSession(session);
+ }
+ return Mono.empty();
+ }
+ return Mono.error(e);
+ }
}
@Override
diff --git
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 8f5356810e..b69177e5c4 100644
---
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -246,7 +246,7 @@ public class JMSCacheableMailQueue implements
ManageableMailQueue, JMSSupport, M
if (message != null) {
dequeuedMailsMetric.increment();
- return Mono.just(createMailQueueItem(session, consumer,
message));
+ return createMailQueueItem(session, consumer, message);
} else {
session.commit();
closeConsumer(consumer);
@@ -405,7 +405,6 @@ public class JMSCacheableMailQueue implements
ManageableMailQueue, JMSSupport, M
} else {
throw new MailQueueException("Not supported JMS Message received "
+ message);
}
-
}
/**
@@ -507,10 +506,10 @@ public class JMSCacheableMailQueue implements
ManageableMailQueue, JMSSupport, M
* @throws JMSException
* @throws MessagingException
*/
- protected MailQueueItem createMailQueueItem(Session session,
MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
+ protected Mono<MailQueueItem> createMailQueueItem(Session session,
MessageConsumer consumer, Message message) throws JMSException,
MessagingException {
Mail mail = createMail(message);
JMSMailQueueItem jmsMailQueueItem = new JMSMailQueueItem(mail,
session, consumer);
- return mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem,
queueName);
+ return
Mono.just(mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, queueName));
}
protected String getMessageSelector() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]