This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 5361f88 [PERF] Reactify UsersRepository::contains (#704)
5361f88 is described below
commit 5361f8877f94532ead76fb5149dcb0fd8d8bc04f
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Oct 22 10:15:39 2021 +0700
[PERF] Reactify UsersRepository::contains (#704)
This enables non blocking user provisionning on top
of the Cassandra users repository
---
.../org/apache/james/user/api/UsersRepository.java | 3 ++
.../james/user/cassandra/CassandraUsersDAO.java | 20 ++++++++--
.../java/org/apache/james/user/lib/UsersDAO.java | 9 +++++
.../apache/james/user/lib/UsersRepositoryImpl.java | 6 +++
.../apache/james/jmap/http/UserProvisioner.java | 40 +++++++++-----------
.../apache/james/jmap/http/UserProvisioning.scala | 43 ++++++++++------------
6 files changed, 70 insertions(+), 51 deletions(-)
diff --git
a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
index 0f8809e..7d0b36b 100644
---
a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
+++
b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
import org.apache.james.user.api.model.User;
+import org.reactivestreams.Publisher;
/**
* Interface for a repository of users. A repository represents a logical
@@ -87,6 +88,8 @@ public interface UsersRepository {
*/
boolean contains(Username name) throws UsersRepositoryException;
+ Publisher<Boolean> containsReactive(Username name);
+
/**
* Test if user with name 'name' has password 'password'.
*
diff --git
a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
index 65fedc9..c76a5b2 100644
---
a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
+++
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
@@ -45,12 +45,15 @@ import org.apache.james.user.api.model.User;
import org.apache.james.user.lib.UsersDAO;
import org.apache.james.user.lib.model.Algorithm;
import org.apache.james.user.lib.model.DefaultUser;
+import org.reactivestreams.Publisher;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
+import reactor.core.publisher.Mono;
+
public class CassandraUsersDAO implements UsersDAO {
private static final String DEFAULT_ALGO_VALUE = "SHA-512";
@@ -113,12 +116,16 @@ public class CassandraUsersDAO implements UsersDAO {
@Override
public Optional<DefaultUser> getUserByName(Username name) {
+ return getUserByNameReactive(name)
+ .blockOptional();
+ }
+
+ private Mono<DefaultUser> getUserByNameReactive(Username name) {
return executor.executeSingleRow(
- getUserStatement.bind()
- .setString(NAME, name.asString()))
+ getUserStatement.bind()
+ .setString(NAME, name.asString()))
.map(row -> new DefaultUser(Username.of(row.getString(NAME)),
row.getString(PASSWORD),
- algorithmFactory.of(row.getString(ALGORITHM))))
- .blockOptional();
+ algorithmFactory.of(row.getString(ALGORITHM))));
}
@Override
@@ -156,6 +163,11 @@ public class CassandraUsersDAO implements UsersDAO {
}
@Override
+ public Publisher<Boolean> containsReactive(Username name) {
+ return getUserByNameReactive(name).hasElement();
+ }
+
+ @Override
public int countUsers() {
return executor.executeSingleRow(countUserStatement.bind())
.map(row -> Ints.checkedCast(row.getLong(0)))
diff --git
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
index 458c1f7..d9ef361 100644
---
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
+++
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
@@ -25,6 +25,10 @@ import java.util.Optional;
import org.apache.james.core.Username;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.user.api.model.User;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public interface UsersDAO {
default boolean getDefaultVirtualHostingValue() {
@@ -39,6 +43,11 @@ public interface UsersDAO {
boolean contains(Username name) throws UsersRepositoryException;
+ default Publisher<Boolean> containsReactive(Username name) {
+ return Mono.fromCallable(() -> contains(name))
+ .subscribeOn(Schedulers.elastic());
+ }
+
int countUsers() throws UsersRepositoryException;
Iterator<Username> list() throws UsersRepositoryException;
diff --git
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
index f39c2a6..b0444bd 100644
---
a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
+++
b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
@@ -40,6 +40,7 @@ import org.apache.james.user.api.InvalidUsernameException;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.user.api.model.User;
+import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
import com.google.common.base.CharMatcher;
@@ -169,6 +170,11 @@ public class UsersRepositoryImpl<T extends UsersDAO>
implements UsersRepository,
}
@Override
+ public Publisher<Boolean> containsReactive(Username name) {
+ return usersDAO.containsReactive(name);
+ }
+
+ @Override
public int countUsers() throws UsersRepositoryException {
return usersDAO.countUsers();
}
diff --git
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
index e0ec840..a841145 100644
---
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
+++
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UserProvisioner.java
@@ -25,11 +25,11 @@ import javax.inject.Inject;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.user.api.AlreadyExistInUsersRepositoryException;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.FunctionalUtils;
+import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import reactor.core.publisher.Mono;
@@ -48,35 +48,29 @@ public class UserProvisioner {
public Mono<Void> provisionUser(MailboxSession session) {
if (session != null && !usersRepository.isReadOnly()) {
- return Mono.fromRunnable(() -> createAccountIfNeeded(session))
- .subscribeOn(Schedulers.elastic())
- .then();
+ return createAccountIfNeeded(session);
}
return Mono.empty();
}
- private void createAccountIfNeeded(MailboxSession session) {
- TimeMetric timeMetric = metricFactory.timer("JMAP-user-provisioning");
- try {
- Username username = session.getUser();
- if (needsAccountCreation(username)) {
- createAccount(username);
- }
- } catch (AlreadyExistInUsersRepositoryException e) {
- // Ignore
- } catch (UsersRepositoryException e) {
- throw new RuntimeException(e);
- } finally {
- timeMetric.stopAndPublish();
- }
+ private Mono<Void> createAccountIfNeeded(MailboxSession session) {
+ Username username = session.getUser();
+ return
Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-user-provisioning",
+ needsAccountCreation(username)
+ .filter(FunctionalUtils.identityPredicate())
+ .flatMap(any -> createAccount(username))
+ .onErrorResume(AlreadyExistInUsersRepositoryException.class, e
-> Mono.empty())));
}
- private void createAccount(Username username) throws
UsersRepositoryException {
- usersRepository.addUser(username, generatePassword());
+ private Mono<Void> createAccount(Username username) {
+ return Mono.fromRunnable(Throwing.runnable(() ->
usersRepository.addUser(username, generatePassword())))
+ .subscribeOn(Schedulers.elastic())
+ .then();
}
- private boolean needsAccountCreation(Username username) throws
UsersRepositoryException {
- return !usersRepository.contains(username);
+ private Mono<Boolean> needsAccountCreation(Username username) {
+ return Mono.from(usersRepository.containsReactive(username))
+ .map(FunctionalUtils.negate());
}
private String generatePassword() {
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
index 37cdd28..8ec20d1 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/http/UserProvisioning.scala
@@ -25,39 +25,34 @@ import javax.inject.Inject
import org.apache.james.core.Username
import org.apache.james.mailbox.MailboxSession
import org.apache.james.metrics.api.MetricFactory
-import org.apache.james.user.api.{AlreadyExistInUsersRepositoryException,
UsersRepository, UsersRepositoryException}
+import org.apache.james.user.api.{AlreadyExistInUsersRepositoryException,
UsersRepository}
import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
class UserProvisioning @Inject() (usersRepository: UsersRepository,
metricFactory: MetricFactory) {
-
def provisionUser(session: MailboxSession): SMono[Unit] =
if (session != null && !usersRepository.isReadOnly) {
- SMono.fromCallable(() => createAccountIfNeeded(session))
- .`then`
+ createAccountIfNeeded(session)
} else {
SMono.empty
}
- private def createAccountIfNeeded(session: MailboxSession): Unit = {
- val timeMetric = metricFactory.timer("JMAP-RFC-8621-user-provisioning")
- try {
- val username = session.getUser
- if (needsAccountCreation(username)) {
- createAccount(username)
- }
- } catch {
- case exception: AlreadyExistInUsersRepositoryException => // Ignore
- case exception: UsersRepositoryException => throw new
RuntimeException(exception)
- } finally {
- timeMetric.stopAndPublish
- }
- }
-
- @throws[UsersRepositoryException]
- private def createAccount(username: Username): Unit =
usersRepository.addUser(username, generatePassword)
-
- @throws[UsersRepositoryException]
- private def needsAccountCreation(username: Username): Boolean =
!usersRepository.contains(username)
+ private def createAccountIfNeeded(session: MailboxSession): SMono[Unit] =
+
SMono(metricFactory.decoratePublisherWithTimerMetric("JMAP-RFC-8621-user-provisioning",
+ needsAccountCreation(session.getUser)
+ .filter(b => b)
+ .flatMap(_ => createAccount(session.getUser))
+ .onErrorResume {
+ case _: AlreadyExistInUsersRepositoryException => SMono.empty[Unit]
+ case e => SMono.error[Unit](e)
+ }))
+
+ private def createAccount(username: Username): SMono[Unit] =
+ SMono.fromCallable(() => usersRepository.addUser(username,
generatePassword))
+ .subscribeOn(Schedulers.elastic())
+
+ private def needsAccountCreation(username: Username): SMono[Boolean] =
+ SMono(usersRepository.containsReactive(username)).map(b => !b)
private def generatePassword: String = UUID.randomUUID.toString
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]