This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ef2890346dce94ad957310a1c9bcbee58404b041 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Nov 18 12:53:26 2020 +0700 JAMES-3441 [REFACTORING] Use Optionals and encapsulating class for JamesMailSpooler mutable content --- .../mailetcontainer/impl/JamesMailSpooler.java | 240 +++++++++++---------- 1 file changed, 132 insertions(+), 108 deletions(-) diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index b6ceddb..23623db 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -63,6 +63,126 @@ import reactor.core.scheduler.Schedulers; * them from the spool when processing is complete. */ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMBean { + private static class Runner { + private final AtomicInteger processingActive = new AtomicInteger(0); + private final MetricFactory metricFactory; + private final MailProcessor mailProcessor; + private final MailRepository errorRepository; + private final MailRepositoryUrl errorRepositoryURL; + private final reactor.core.Disposable disposable; + private final MailQueue queue; + private final int concurrencyLevel; + + private Runner(MetricFactory metricFactory, MailProcessor mailProcessor, MailRepository errorRepository, MailRepositoryUrl errorRepositoryURL, MailQueue queue, int concurrencyLevel) { + this.metricFactory = metricFactory; + this.mailProcessor = mailProcessor; + this.errorRepository = errorRepository; + this.errorRepositoryURL = errorRepositoryURL; + this.disposable = run(queue); + this.queue = queue; + this.concurrencyLevel = concurrencyLevel; + } + + private reactor.core.Disposable run(MailQueue queue) { + return Flux.from(queue.deQueue()) + .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel) + .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + } + + private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) { + TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING); + return Mono.fromCallable(processingActive::incrementAndGet) + .flatMap(ignore -> processMail(queueItem)) + .doOnSuccess(any -> timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)) + .doOnTerminate(processingActive::decrementAndGet); + } + + private Mono<Void> processMail(MailQueueItem queueItem) { + return Mono + .using( + queueItem::getMail, + mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, mail)), + LifecycleUtil::dispose); + } + + private void performProcessMail(MailQueueItem queueItem, Mail mail) { + LOGGER.debug("==== Begin processing mail {} ====", mail.getName()); + try { + mailProcessor.service(mail); + + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Thread has been interrupted"); + } + queueItem.done(true); + } catch (Exception e) { + handleError(queueItem, mail, e); + } finally { + LOGGER.debug("==== End processing mail {} ====", mail.getName()); + } + } + + private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) { + int failureCount = computeFailureCount(mail); + + try { + if (failureCount > MAXIMUM_FAILURE_COUNT) { + LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, errorRepositoryURL.asString()); + storeInErrorRepository(queueItem); + } else { + LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException); + reEnqueue(queueItem, failureCount); + } + } catch (Exception nestedE) { + LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), nestedE); + nack(queueItem, processingException); + } + } + + private int computeFailureCount(Mail mail) { + Integer previousFailureCount = mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT) + .flatMap(attribute -> attribute.getValue().valueAs(Integer.class)) + .orElse(0); + return previousFailureCount + 1; + } + + private void reEnqueue(MailQueueItem queueItem, int failureCount) throws MailQueue.MailQueueException { + Mail mail = queueItem.getMail(); + mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(failureCount))); + queue.enQueue(mail); + queueItem.done(true); + } + + private void storeInErrorRepository(MailQueueItem queueItem) throws MessagingException { + errorRepository.store(queueItem.getMail()); + queueItem.done(true); + } + + private void nack(MailQueueItem queueItem, Exception processingException) { + try { + queueItem.done(false); + } catch (MailQueue.MailQueueException ex) { + throw new RuntimeException(processingException); + } + } + + public void dispose() { + LOGGER.info("start dispose() ..."); + disposable.dispose(); + try { + queue.close(); + } catch (IOException e) { + LOGGER.debug("error closing queue", e); + } + LOGGER.info("thread shutdown completed."); + } + + public int getCurrentSpoolCount() { + return processingActive.get(); + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class); public static final String SPOOL_PROCESSING = "spoolProcessing"; @@ -74,23 +194,14 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB * concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async * process. */ - private int concurrencyLevel; - - private final AtomicInteger processingActive = new AtomicInteger(0); - private final MetricFactory metricFactory; - - /** - * The mail processor - */ private final MailProcessor mailProcessor; private final MailRepositoryStore mailRepositoryStore; - private final MailQueueFactory<?> queueFactory; + + private int concurrencyLevel; private MailRepositoryUrl errorRepositoryURL; - private MailRepository errorRepository; - private reactor.core.Disposable disposable; - private MailQueue queue; + private Optional<Runner> runner; @Inject public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> queueFactory) { @@ -118,100 +229,20 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB public void init() { LOGGER.info("init..."); LOGGER.info("Concurrency level is {}", concurrencyLevel); - queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel)); - disposable = run(queue); + MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel)); + runner = Optional.of(new Runner(metricFactory, + mailProcessor, errorRepository(), errorRepositoryURL, queue, concurrencyLevel)); LOGGER.info("Spooler started"); - try { - this.errorRepository = mailRepositoryStore.select(errorRepositoryURL); - } catch (MailRepositoryStore.MailRepositoryStoreException e) { - throw new RuntimeException(e); - } - - } - - private reactor.core.Disposable run(MailQueue queue) { - return Flux.from(queue.deQueue()) - .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel) - .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)) - .subscribeOn(Schedulers.elastic()) - .subscribe(); - } - - private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) { - TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING); - return Mono.fromCallable(processingActive::incrementAndGet) - .flatMap(ignore -> processMail(queueItem)) - .doOnSuccess(any -> timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)) - .doOnTerminate(processingActive::decrementAndGet); - } - - private Mono<Void> processMail(MailQueueItem queueItem) { - return Mono - .using( - queueItem::getMail, - mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, mail)), - LifecycleUtil::dispose); - } - - private void performProcessMail(MailQueueItem queueItem, Mail mail) { - LOGGER.debug("==== Begin processing mail {} ====", mail.getName()); - try { - mailProcessor.service(mail); - - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("Thread has been interrupted"); - } - queueItem.done(true); - } catch (Exception e) { - handleError(queueItem, mail, e); - } finally { - LOGGER.debug("==== End processing mail {} ====", mail.getName()); - } } - private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) { - int failureCount = computeFailureCount(mail); - + private MailRepository errorRepository() { try { - if (failureCount > MAXIMUM_FAILURE_COUNT) { - LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, errorRepositoryURL.asString()); - storeInErrorRepository(queueItem); - } else { - LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException); - reEnqueue(queueItem, failureCount); - } - } catch (Exception nestedE) { - LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), nestedE); - nack(queueItem, processingException); + return mailRepositoryStore.select(errorRepositoryURL); + } catch (MailRepositoryStore.MailRepositoryStoreException e) { + throw new RuntimeException(e); } } - private int computeFailureCount(Mail mail) { - Integer previousFailureCount = mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT) - .flatMap(attribute -> attribute.getValue().valueAs(Integer.class)) - .orElse(0); - return previousFailureCount + 1; - } - - private void reEnqueue(MailQueueItem queueItem, int failureCount) throws MailQueue.MailQueueException { - Mail mail = queueItem.getMail(); - mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(failureCount))); - queue.enQueue(mail); - queueItem.done(true); - } - - private void storeInErrorRepository(MailQueueItem queueItem) throws MessagingException { - errorRepository.store(queueItem.getMail()); - queueItem.done(true); - } - - private void nack(MailQueueItem queueItem, Exception processingException) { - try { - queueItem.done(false); - } catch (MailQueue.MailQueueException ex) { - throw new RuntimeException(processingException); - } - } /** * The dispose operation is called at the end of a components lifecycle. @@ -224,14 +255,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @PreDestroy @Override public void dispose() { - LOGGER.info("start dispose() ..."); - disposable.dispose(); - try { - queue.close(); - } catch (IOException e) { - LOGGER.debug("error closing queue", e); - } - LOGGER.info("thread shutdown completed."); + runner.ifPresent(Runner::dispose); } @Override @@ -241,6 +265,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @Override public int getCurrentSpoolCount() { - return processingActive.get(); + return runner.map(Runner::getCurrentSpoolCount).orElse(0); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
