This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 98f9f18c9e77e33ed6d62ebe8b189817166fcc68 Author: Rene Cordier <[email protected]> AuthorDate: Tue Jul 21 17:21:04 2020 +0700 JAMES-3093 Port MailboxProvisioner from draft to jmap-rfc-8621 --- .../james/jmap/http/MailboxesProvisioner.scala | 81 ++++++++++++++++ .../james/jmap/http/MailboxesProvisionerTest.scala | 103 +++++++++++++++++++++ 2 files changed, 184 insertions(+) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/MailboxesProvisioner.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/MailboxesProvisioner.scala new file mode 100644 index 0000000..2690ee5 --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/MailboxesProvisioner.scala @@ -0,0 +1,81 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.http + +import java.util.Optional + +import javax.inject.Inject +import org.apache.james.core.Username +import org.apache.james.mailbox.exception.{MailboxException, MailboxExistsException} +import org.apache.james.mailbox.model.{MailboxId, MailboxPath} +import org.apache.james.mailbox.{DefaultMailboxes, MailboxManager, MailboxSession, SubscriptionManager} +import org.apache.james.metrics.api.MetricFactory +import org.slf4j.{Logger, LoggerFactory} +import reactor.core.scala.publisher.{SFlux, SMono} +import reactor.core.scheduler.Schedulers + +import scala.jdk.CollectionConverters._ + +class MailboxesProvisioner @Inject() (mailboxManager: MailboxManager, + subscriptionManager: SubscriptionManager, + metricFactory: MetricFactory) { + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[MailboxesProvisioner]) + + def createMailboxesIfNeeded(session: MailboxSession): SMono[Unit] = + metricFactory.decorateSupplierWithTimerMetric("JMAP-RFC-8621-mailboxes-provisioning", () => + createDefaultMailboxes(session.getUser)) + + + private def createDefaultMailboxes(username: Username): SMono[Unit] = { + val session: MailboxSession = mailboxManager.createSystemSession(username) + + SFlux.fromIterable(DefaultMailboxes.DEFAULT_MAILBOXES.asScala) + .map(toMailboxPath(session)) + .filterWhen((mailboxPath: MailboxPath) => mailboxDoesntExist(mailboxPath, session)) + .concatMap((mailboxPath: MailboxPath) => SMono.fromCallable(() => createMailbox(mailboxPath, session)) + .subscribeOn(Schedulers.elastic)) + .`then` + } + + private def mailboxDoesntExist(mailboxPath: MailboxPath, session: MailboxSession): SMono[Boolean] = { + try { + SMono(mailboxManager.mailboxExists(mailboxPath, session)) + .map(exist => !exist) + } catch { + case exception: MailboxException => SMono.raiseError(exception) + } + } + + private def toMailboxPath(session: MailboxSession): String => MailboxPath = + (mailbox: String) => MailboxPath.forUser(session.getUser, mailbox) + + private def createMailbox(mailboxPath: MailboxPath, session: MailboxSession): Unit = { + try { + val mailboxId: Optional[MailboxId] = mailboxManager.createMailbox(mailboxPath, session) + if (mailboxId.isPresent) { + subscriptionManager.subscribe(session, mailboxPath.getName) + } + LOGGER.info("Provisioning {}. {} created.", mailboxPath, mailboxId) + } catch { + case e: MailboxExistsException => LOGGER.info("Mailbox {} have been created concurrently", mailboxPath) + case e: MailboxException => throw new RuntimeException(e) + } + } +} diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/http/MailboxesProvisionerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/http/MailboxesProvisionerTest.scala new file mode 100644 index 0000000..cd21992 --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/http/MailboxesProvisionerTest.scala @@ -0,0 +1,103 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.http + +import java.time.Duration +import java.util.function.Predicate + +import com.github.fge.lambdas.Throwing +import com.github.steveash.guavate.Guavate +import org.apache.james.core.Username +import org.apache.james.mailbox.inmemory.InMemoryMailboxManager +import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources +import org.apache.james.mailbox.model.MailboxPath +import org.apache.james.mailbox.store.StoreSubscriptionManager +import org.apache.james.mailbox.{DefaultMailboxes, MailboxSession, MailboxSessionUtil} +import org.apache.james.metrics.tests.RecordingMetricFactory +import org.apache.james.util.concurrency.ConcurrentTestRunner +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} + +object MailboxesProvisionerTest { + private val USERNAME: Username = Username.of("username") +} + +class MailboxesProvisionerTest { + import MailboxesProvisionerTest._ + + var testee: MailboxesProvisioner = _ + var session: MailboxSession = _ + var mailboxManager: InMemoryMailboxManager = _ + var subscriptionManager: StoreSubscriptionManager = _ + + @BeforeEach + def setup(): Unit = { + session = MailboxSessionUtil.create(USERNAME) + mailboxManager = InMemoryIntegrationResources.defaultResources.getMailboxManager + subscriptionManager = new StoreSubscriptionManager(mailboxManager.getMapperFactory) + testee = new MailboxesProvisioner(mailboxManager, subscriptionManager, new RecordingMetricFactory) + } + + @Test + def createMailboxesIfNeededShouldCreateSystemMailboxes(): Unit = { + testee.createMailboxesIfNeeded(session).block() + + assertThat(mailboxManager.list(session)) + .containsOnlyElementsOf(DefaultMailboxes.DEFAULT_MAILBOXES + .stream + .map((mailboxName: String) => MailboxPath.forUser(USERNAME, mailboxName)) + .collect(Guavate.toImmutableList())) + } + + @Test + def createMailboxesIfNeededShouldCreateSpamWhenOtherSystemMailboxesExist(): Unit = { + DefaultMailboxes.DEFAULT_MAILBOXES + .stream + .filter(Predicate.not(Predicate.isEqual(DefaultMailboxes.SPAM))) + .forEach(Throwing.consumer((mailbox: String) => mailboxManager.createMailbox(MailboxPath.forUser(USERNAME, mailbox), session))) + + testee.createMailboxesIfNeeded(session).block() + + assertThat(mailboxManager.list(session)) + .contains(MailboxPath.forUser(USERNAME, DefaultMailboxes.SPAM)) + } + + @Test + def createMailboxesIfNeededShouldSubscribeMailboxes(): Unit = { + testee.createMailboxesIfNeeded(session).block() + + assertThat(subscriptionManager.subscriptions(session)) + .containsOnlyElementsOf(DefaultMailboxes.DEFAULT_MAILBOXES) + } + + @Test + def createMailboxesIfNeededShouldNotGenerateExceptionsInConcurrentEnvironment(): Unit = { + ConcurrentTestRunner.builder + .operation((threadNumber: Int, step: Int) => testee.createMailboxesIfNeeded(session).block()) + .threadCount(10) + .runSuccessfullyWithin(Duration.ofSeconds(10)) + + assertThat(mailboxManager.list(session)) + .containsOnlyElementsOf(DefaultMailboxes.DEFAULT_MAILBOXES + .stream + .map((mailboxName: String) => MailboxPath.forUser(USERNAME, mailboxName)) + .collect(Guavate.toImmutableList())) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
