This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f7cc352d7a189ad10d0dcdb71c8d6aded07bb439 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Nov 18 13:06:37 2020 +0700 JAMES-3441 [Refactoring] Strong type & extract JamesMailSpooler configuration This ease injection overrides for this configuration thus ease testing --- .../modules/server/CamelMailetContainerModule.java | 12 ++- .../mailetcontainer/impl/JamesMailSpooler.java | 91 ++++++++++++++++------ 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java b/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java index 68c7d06..0364045 100644 --- a/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java +++ b/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java @@ -45,6 +45,7 @@ import org.apache.james.mailetcontainer.impl.JamesMailetContext; import org.apache.james.mailetcontainer.impl.MatcherMailetPair; import org.apache.james.mailetcontainer.impl.camel.CamelCompositeProcessor; import org.apache.james.mailetcontainer.impl.camel.CamelMailetProcessor; +import org.apache.james.mailrepository.api.MailRepositoryStore; import org.apache.james.server.core.configuration.ConfigurationProvider; import org.apache.james.transport.mailets.RemoveMimeHeader; import org.apache.james.transport.matchers.All; @@ -123,12 +124,19 @@ public class CamelMailetContainerModule extends AbstractModule { return camelContext; } + @Provides + @Singleton + public JamesMailSpooler.Configuration spoolerConfiguration(MailRepositoryStore mailRepositoryStore, ConfigurationProvider configurationProvider) { + HierarchicalConfiguration<ImmutableNode> conf = getJamesSpoolerConfiguration(configurationProvider); + return JamesMailSpooler.Configuration.from(mailRepositoryStore, conf); + } + @ProvidesIntoSet - InitializationOperation startSpooler(JamesMailSpooler jamesMailSpooler, ConfigurationProvider configurationProvider) { + InitializationOperation startSpooler(JamesMailSpooler jamesMailSpooler, JamesMailSpooler.Configuration configuration) { return InitilizationOperationBuilder .forClass(JamesMailSpooler.class) .init(() -> { - jamesMailSpooler.configure(getJamesSpoolerConfiguration(configurationProvider)); + jamesMailSpooler.configure(configuration); jamesMailSpooler.init(); }); } diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 6244e4c..cb15deb 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -53,6 +53,7 @@ import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; @@ -70,24 +71,24 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private final MetricFactory metricFactory; private final MailProcessor mailProcessor; private final MailRepository errorRepository; - private final MailRepositoryUrl errorRepositoryURL; private final reactor.core.Disposable disposable; private final MailQueue queue; - private final int concurrencyLevel; + private final Configuration configuration; - private Runner(MetricFactory metricFactory, MailProcessor mailProcessor, MailRepository errorRepository, MailRepositoryUrl errorRepositoryURL, MailQueue queue, int concurrencyLevel) { + private Runner(MetricFactory metricFactory, MailProcessor mailProcessor, + MailRepository errorRepository, MailQueue queue, Configuration configuration) { this.metricFactory = metricFactory; this.mailProcessor = mailProcessor; this.errorRepository = errorRepository; - this.errorRepositoryURL = errorRepositoryURL; - this.disposable = run(queue); this.queue = queue; - this.concurrencyLevel = concurrencyLevel; + this.configuration = configuration; + + this.disposable = run(queue); } private reactor.core.Disposable run(MailQueue queue) { return Flux.from(queue.deQueue()) - .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel) + .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), configuration.getConcurrencyLevel()) .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)) .subscribeOn(Schedulers.elastic()) .subscribe(); @@ -130,7 +131,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB try { if (failureCount > MAXIMUM_FAILURE_COUNT) { - LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, errorRepositoryURL.asString()); + LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, configuration.getErrorRepositoryURL().asString()); storeInErrorRepository(queueItem); } else { LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException); @@ -185,6 +186,50 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB } } + public static class Configuration { + public static Configuration from(MailRepositoryStore mailRepositoryStore, HierarchicalConfiguration<ImmutableNode> config) { + int concurrencyLevel = config.getInt("threads", 100); + MailRepositoryUrl errorRepositoryURL = Optional.ofNullable(config.getString("errorRepository", null)) + .map(MailRepositoryUrl::from) + .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol( + mailRepositoryStore.defaultProtocol() + .orElseThrow(() -> new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0")), + ERROR_REPOSITORY_PATH)); + + return new Configuration(concurrencyLevel, errorRepositoryURL); + } + + private final int concurrencyLevel; + private final MailRepositoryUrl errorRepositoryURL; + + public Configuration(int concurrencyLevel, MailRepositoryUrl errorRepositoryURL) { + Preconditions.checkArgument(concurrencyLevel >= 0, "'threads' needs to be greater than or equal to zero"); + + this.concurrencyLevel = concurrencyLevel; + this.errorRepositoryURL = errorRepositoryURL; + } + + public int getConcurrencyLevel() { + return concurrencyLevel; + } + + public boolean isEnabled() { + return concurrencyLevel > 0; + } + + public MailRepositoryUrl getErrorRepositoryURL() { + return errorRepositoryURL; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("concurrencyLevel", concurrencyLevel) + .add("errorRepositoryURL", errorRepositoryURL) + .toString(); + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class); public static final String SPOOL_PROCESSING = "spoolProcessing"; @@ -201,8 +246,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private final MailRepositoryStore mailRepositoryStore; private final MailQueueFactory<?> queueFactory; - private int concurrencyLevel; - private MailRepositoryUrl errorRepositoryURL; + private Configuration configuration; private Optional<Runner> runner; @Inject @@ -215,13 +259,11 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @Override public void configure(HierarchicalConfiguration<ImmutableNode> config) { - concurrencyLevel = config.getInt("threads", 100); - errorRepositoryURL = Optional.ofNullable(config.getString("errorRepository", null)) - .map(MailRepositoryUrl::from) - .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol( - mailRepositoryStore.defaultProtocol() - .orElseThrow(() -> new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0")), - ERROR_REPOSITORY_PATH)); + configure(Configuration.from(mailRepositoryStore, config)); + } + + public void configure(Configuration configuration) { + this.configuration = configuration; } /** @@ -229,22 +271,21 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB */ @PostConstruct public void init() { - Preconditions.checkArgument(concurrencyLevel >= 0, "'threads' needs to be greater than or equal to zero"); - if (concurrencyLevel > 0) { + if (configuration.isEnabled()) { LOGGER.info("init..."); - LOGGER.info("Concurrency level is {}", concurrencyLevel); - MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel)); + LOGGER.info("Concurrency level is {}", configuration.getConcurrencyLevel()); + MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(configuration.getConcurrencyLevel())); runner = Optional.of(new Runner(metricFactory, - mailProcessor, errorRepository(), errorRepositoryURL, queue, concurrencyLevel)); + mailProcessor, errorRepository(), queue, configuration)); LOGGER.info("Spooler started"); } else { - LOGGER.info("Spooler had been dis-activated. To enable is set 'threads' count to a value greater than zero"); + LOGGER.info("Spooler had been deactivated. To enable it set 'threads' count to a value greater than zero"); } } private MailRepository errorRepository() { try { - return mailRepositoryStore.select(errorRepositoryURL); + return mailRepositoryStore.select(configuration.getErrorRepositoryURL()); } catch (MailRepositoryStore.MailRepositoryStoreException e) { throw new RuntimeException(e); } @@ -267,7 +308,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @Override public int getThreadCount() { - return concurrencyLevel; + return configuration.getConcurrencyLevel(); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
