This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 5361f88  [PERF] Reactify UsersRepository::contains (#704)
5361f88 is described below

commit 5361f8877f94532ead76fb5149dcb0fd8d8bc04f
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Oct 22 10:15:39 2021 +0700

    [PERF] Reactify UsersRepository::contains (#704)
    
    This enables non blocking user provisionning on top
    of the Cassandra users repository
---
 .../org/apache/james/user/api/UsersRepository.java |  3 ++
 .../james/user/cassandra/CassandraUsersDAO.java    | 20 ++++++++--
 .../java/org/apache/james/user/lib/UsersDAO.java   |  9 +++++
 .../apache/james/user/lib/UsersRepositoryImpl.java |  6 +++
 .../apache/james/jmap/http/UserProvisioner.java    | 40 +++++++++-----------
 .../apache/james/jmap/http/UserProvisioning.scala  | 43 ++++++++++------------
 6 files changed, 70 insertions(+), 51 deletions(-)

diff --git 
a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
 
b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
index 0f8809e..7d0b36b 100644
--- 
a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
+++ 
b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
 import org.apache.james.user.api.model.User;
+import org.reactivestreams.Publisher;
 
 /**
  * Interface for a repository of users. A repository represents a logical
@@ -87,6 +88,8 @@ public interface UsersRepository {
      */
     boolean contains(Username name) throws UsersRepositoryException;
 
+    Publisher<Boolean> containsReactive(Username name);
+
     /**
      * Test if user with name 'name' has password 'password'.
      * 
diff --git 
a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
 
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
index 65fedc9..c76a5b2 100644
--- 
a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
+++ 
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
@@ -45,12 +45,15 @@ import org.apache.james.user.api.model.User;
 import org.apache.james.user.lib.UsersDAO;
 import org.apache.james.user.lib.model.Algorithm;
 import org.apache.james.user.lib.model.DefaultUser;
+import org.reactivestreams.Publisher;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraUsersDAO implements UsersDAO {
     private static final String DEFAULT_ALGO_VALUE = "SHA-512";
 
@@ -113,12 +116,16 @@ public class CassandraUsersDAO implements UsersDAO {
 
     @Override
     public Optional<DefaultUser> getUserByName(Username name) {
+        return getUserByNameReactive(name)
+            .blockOptional();
+    }
+
+    private Mono<DefaultUser> getUserByNameReactive(Username name) {
         return executor.executeSingleRow(
-                getUserStatement.bind()
-                    .setString(NAME, name.asString()))
+            getUserStatement.bind()
+                .setString(NAME, name.asString()))
             .map(row -> new DefaultUser(Username.of(row.getString(NAME)), 
row.getString(PASSWORD),
-                algorithmFactory.of(row.getString(ALGORITHM))))
-            .blockOptional();
+                algorithmFactory.of(row.getString(ALGORITHM))));
     }
 
     @Override
@@ -156,6 +163,11 @@ public class CassandraUsersDAO implements UsersDAO {
     }
 
     @Override
+    public Publisher<Boolean> containsReactive(Username name) {
+        return getUserByNameReactive(name).hasElement();
+    }
+
+    @Override
     public int countUsers() {
         return executor.executeSingleRow(countUserStatement.bind())
             .map(row -> Ints.checkedCast(row.getLong(0)))
diff --git 
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
 
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
index 458c1f7..d9ef361 100644
--- 
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
+++ 
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
@@ -25,6 +25,10 @@ import java.util.Optional;
 import org.apache.james.core.Username;
 import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.user.api.model.User;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface UsersDAO {
     default boolean getDefaultVirtualHostingValue() {
@@ -39,6 +43,11 @@ public interface UsersDAO {
 
     boolean contains(Username name) throws UsersRepositoryException;
 
+    default Publisher<Boolean> containsReactive(Username name) {
+        return Mono.fromCallable(() -> contains(name))
+            .subscribeOn(Schedulers.elastic());
+    }
+
     int countUsers() throws UsersRepositoryException;
 
     Iterator<Username> list() throws UsersRepositoryException;
diff --git 
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
 
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
index f39c2a6..b0444bd 100644
--- 
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
+++ 
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
@@ -40,6 +40,7 @@ import org.apache.james.user.api.InvalidUsernameException;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.user.api.model.User;
+import org.reactivestreams.Publisher;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.CharMatcher;
@@ -169,6 +170,11 @@ public class UsersRepositoryImpl<T extends UsersDAO> 
implements UsersRepository,
     }
 
     @Override
+    public Publisher<Boolean> containsReactive(Username name) {
+        return usersDAO.containsReactive(name);
+    }
+
+    @Override
     public int countUsers() throws UsersRepositoryException {
         return usersDAO.countUsers();
     }
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
index e0ec840..a841145 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
@@ -25,11 +25,11 @@ import javax.inject.Inject;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.user.api.AlreadyExistInUsersRepositoryException;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.FunctionalUtils;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Mono;
@@ -48,35 +48,29 @@ public class UserProvisioner {
 
     public Mono<Void> provisionUser(MailboxSession session) {
         if (session != null && !usersRepository.isReadOnly()) {
-            return Mono.fromRunnable(() -> createAccountIfNeeded(session))
-                .subscribeOn(Schedulers.elastic())
-                .then();
+            return createAccountIfNeeded(session);
         }
         return Mono.empty();
     }
 
-    private void createAccountIfNeeded(MailboxSession session) {
-        TimeMetric timeMetric = metricFactory.timer("JMAP-user-provisioning");
-        try {
-            Username username = session.getUser();
-            if (needsAccountCreation(username)) {
-                createAccount(username);
-            }
-        } catch (AlreadyExistInUsersRepositoryException e) {
-            // Ignore
-        } catch (UsersRepositoryException e) {
-            throw new RuntimeException(e);
-        } finally {
-            timeMetric.stopAndPublish();
-        }
+    private Mono<Void> createAccountIfNeeded(MailboxSession session) {
+        Username username = session.getUser();
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-user-provisioning",
+            needsAccountCreation(username)
+                .filter(FunctionalUtils.identityPredicate())
+                .flatMap(any -> createAccount(username))
+                .onErrorResume(AlreadyExistInUsersRepositoryException.class, e 
-> Mono.empty())));
     }
 
-    private void createAccount(Username username) throws 
UsersRepositoryException {
-        usersRepository.addUser(username, generatePassword());
+    private Mono<Void> createAccount(Username username) {
+        return Mono.fromRunnable(Throwing.runnable(() -> 
usersRepository.addUser(username, generatePassword())))
+            .subscribeOn(Schedulers.elastic())
+            .then();
     }
 
-    private boolean needsAccountCreation(Username username) throws 
UsersRepositoryException {
-        return !usersRepository.contains(username);
+    private Mono<Boolean> needsAccountCreation(Username username) {
+        return Mono.from(usersRepository.containsReactive(username))
+            .map(FunctionalUtils.negate());
     }
 
     private String generatePassword() {
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
index 37cdd28..8ec20d1 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
@@ -25,39 +25,34 @@ import javax.inject.Inject
 import org.apache.james.core.Username
 import org.apache.james.mailbox.MailboxSession
 import org.apache.james.metrics.api.MetricFactory
-import org.apache.james.user.api.{AlreadyExistInUsersRepositoryException, 
UsersRepository, UsersRepositoryException}
+import org.apache.james.user.api.{AlreadyExistInUsersRepositoryException, 
UsersRepository}
 import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
 
 class UserProvisioning @Inject() (usersRepository: UsersRepository, 
metricFactory: MetricFactory) {
-
   def provisionUser(session: MailboxSession): SMono[Unit] =
     if (session != null && !usersRepository.isReadOnly) {
-      SMono.fromCallable(() => createAccountIfNeeded(session))
-        .`then`
+      createAccountIfNeeded(session)
     } else {
       SMono.empty
     }
 
-  private def createAccountIfNeeded(session: MailboxSession): Unit = {
-    val timeMetric = metricFactory.timer("JMAP-RFC-8621-user-provisioning")
-    try {
-      val username = session.getUser
-      if (needsAccountCreation(username)) {
-        createAccount(username)
-      }
-    } catch {
-      case exception: AlreadyExistInUsersRepositoryException => // Ignore
-      case exception: UsersRepositoryException => throw new 
RuntimeException(exception)
-    } finally {
-      timeMetric.stopAndPublish
-    }
-  }
-
-  @throws[UsersRepositoryException]
-  private def createAccount(username: Username): Unit = 
usersRepository.addUser(username, generatePassword)
-
-  @throws[UsersRepositoryException]
-  private def needsAccountCreation(username: Username): Boolean = 
!usersRepository.contains(username)
+  private def createAccountIfNeeded(session: MailboxSession): SMono[Unit] =
+    
SMono(metricFactory.decoratePublisherWithTimerMetric("JMAP-RFC-8621-user-provisioning",
+      needsAccountCreation(session.getUser)
+        .filter(b => b)
+        .flatMap(_ => createAccount(session.getUser))
+        .onErrorResume {
+          case _: AlreadyExistInUsersRepositoryException => SMono.empty[Unit]
+          case e => SMono.error[Unit](e)
+        }))
+
+  private def createAccount(username: Username): SMono[Unit] =
+    SMono.fromCallable(() => usersRepository.addUser(username, 
generatePassword))
+      .subscribeOn(Schedulers.elastic())
+
+  private def needsAccountCreation(username: Username): SMono[Boolean] =
+    SMono(usersRepository.containsReactive(username)).map(b => !b)
 
   private def generatePassword: String = UUID.randomUUID.toString
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to