This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 4a2603eab8 [FIX] make `updateACL` thread-safe in InMemoryMailboxMapper 4a2603eab8 is described below commit 4a2603eab8457a95b1fe54aaabfe7a25ee86123d Author: TungTV <vtt...@linagora.com> AuthorDate: Wed Feb 19 09:15:58 2025 +0700 [FIX] make `updateACL` thread-safe in InMemoryMailboxMapper --- .../inmemory/mail/InMemoryMailboxMapper.java | 74 ++++++++++++++-------- .../mail/PostgresMailboxMapperACLTest.java | 42 ------------ .../RLSSupportPostgresMailboxMapperACLTest.java | 8 +++ .../store/mail/model/MailboxMapperACLTest.java | 36 +++++++++++ 4 files changed, 93 insertions(+), 67 deletions(-) diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java index 6bc71804c0..3b97058ccf 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java @@ -27,6 +27,7 @@ import org.apache.james.core.Username; import org.apache.james.mailbox.acl.ACLDiff; import org.apache.james.mailbox.exception.MailboxExistsException; import org.apache.james.mailbox.exception.MailboxNotFoundException; +import org.apache.james.mailbox.exception.UnsupportedRightException; import org.apache.james.mailbox.inmemory.InMemoryId; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxACL; @@ -47,31 +48,33 @@ import reactor.core.publisher.Mono; public class InMemoryMailboxMapper implements MailboxMapper { private static final int INITIAL_SIZE = 128; - private final ConcurrentHashMap<MailboxPath, Mailbox> mailboxesByPath; + private final ConcurrentHashMap<MailboxId, Mailbox> mailboxesById; private final AtomicLong mailboxIdGenerator = new AtomicLong(); public InMemoryMailboxMapper() { - mailboxesByPath = new ConcurrentHashMap<>(INITIAL_SIZE); + mailboxesById = new ConcurrentHashMap<>(INITIAL_SIZE); } @Override public Mono<Void> delete(Mailbox mailbox) { - return Mono.fromRunnable(() -> mailboxesByPath.remove(mailbox.generateAssociatedPath())); + return Mono.fromRunnable(() -> mailboxesById.remove(mailbox.getMailboxId())); } public Mono<Void> deleteAll() { - return Mono.fromRunnable(mailboxesByPath::clear); + return Mono.fromRunnable(mailboxesById::clear); } @Override public Mono<Mailbox> findMailboxByPath(MailboxPath path) { - return Mono.defer(() -> Mono.justOrEmpty(mailboxesByPath.get(path))) + return Flux.fromIterable(mailboxesById.values()) + .filter(mailbox -> path.equals(mailbox.generateAssociatedPath())) + .next() .map(Mailbox::new); } @Override public Mono<Mailbox> findMailboxById(MailboxId id) { - return Mono.fromCallable(mailboxesByPath::values) + return Mono.fromCallable(mailboxesById::values) .flatMapIterable(Function.identity()) .filter(mailbox -> mailbox.getMailboxId().equals(id)) .next() @@ -81,7 +84,7 @@ public class InMemoryMailboxMapper implements MailboxMapper { @Override public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { - return Mono.fromCallable(mailboxesByPath::values) + return Mono.fromCallable(mailboxesById::values) .flatMapIterable(Function.identity()) .filter(query::matches) .map(Mailbox::new); @@ -92,30 +95,40 @@ public class InMemoryMailboxMapper implements MailboxMapper { InMemoryId id = InMemoryId.of(mailboxIdGenerator.incrementAndGet()); Mailbox mailbox = new Mailbox(mailboxPath, uidValidity, id); - return saveMailbox(mailbox) - .thenReturn(mailbox); + return existsMailboxPath(mailboxPath) + .flatMap(exist -> { + if (exist) { + return Mono.error(new MailboxExistsException(mailboxPath.getName())); + } + return saveMailbox(mailbox) + .thenReturn(mailbox); + }); } @Override public Mono<MailboxId> rename(Mailbox mailbox) { Preconditions.checkNotNull(mailbox.getMailboxId(), "A mailbox we want to rename should have a defined mailboxId"); - InMemoryId id = (InMemoryId) mailbox.getMailboxId(); - return findMailboxById(id) - .flatMap(mailboxWithPreviousName -> saveMailbox(mailbox) - .then(Mono.fromCallable(() -> mailboxesByPath.remove(mailboxWithPreviousName.generateAssociatedPath())))) - .thenReturn(mailbox.getMailboxId()); + return existsMailboxPath(mailbox.generateAssociatedPath()) + .flatMap(exist -> { + if (exist) { + return Mono.error(new MailboxExistsException(mailbox.generateAssociatedPath().getName())); + } + return Mono.defer(() -> Mono.justOrEmpty(mailboxesById.computeIfPresent(mailbox.getMailboxId(), (mailboxId, existingMailbox) -> mailbox)) + .map(Mailbox::getMailboxId) + .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailbox.getMailboxId())))); + }); } private Mono<Void> saveMailbox(Mailbox mailbox) { - return Mono.defer(() -> Mono.justOrEmpty(mailboxesByPath.putIfAbsent(mailbox.generateAssociatedPath(), mailbox))) + return Mono.defer(() -> Mono.justOrEmpty(mailboxesById.putIfAbsent(mailbox.getMailboxId(), mailbox))) .flatMap(ignored -> Mono.error(new MailboxExistsException(mailbox.getName()))); } @Override public Mono<Boolean> hasChildren(Mailbox mailbox, char delimiter) { String mailboxName = mailbox.getName() + delimiter; - return Mono.fromCallable(mailboxesByPath::values) + return Mono.fromCallable(mailboxesById::values) .flatMapIterable(Function.identity()) .filter(box -> belongsToSameUser(mailbox, box) && box.getName().startsWith(mailboxName)) .hasElements(); @@ -128,32 +141,39 @@ public class InMemoryMailboxMapper implements MailboxMapper { @Override public Flux<Mailbox> list() { - return Mono.fromCallable(mailboxesByPath::values) + return Mono.fromCallable(mailboxesById::values) .flatMapIterable(Function.identity()); } @Override public Mono<ACLDiff> updateACL(Mailbox mailbox, MailboxACL.ACLCommand mailboxACLCommand) { - return Mono.fromCallable(() -> { - MailboxACL oldACL = mailbox.getACL(); - MailboxACL newACL = mailbox.getACL().apply(mailboxACLCommand); - mailboxesByPath.get(mailbox.generateAssociatedPath()).setACL(newACL); - return ACLDiff.computeDiff(oldACL, newACL); - }); + return Mono.just(mailbox.getACL()) + .flatMap(oldACL -> Mono.fromCallable(() -> mailboxesById.compute(mailbox.getMailboxId(), (mailboxId, existingMailbox) -> { + if (existingMailbox == null) { + throw new IllegalArgumentException("Mailbox not found for id: " + mailboxId); + } + try { + existingMailbox.setACL(existingMailbox.getACL().apply(mailboxACLCommand)); + } catch (UnsupportedRightException e) { + throw new RuntimeException("ACL update failed", e); + } + return existingMailbox; + })) + .map(updatedMailbox -> ACLDiff.computeDiff(oldACL, updatedMailbox.getACL()))); } @Override public Mono<ACLDiff> setACL(Mailbox mailbox, MailboxACL mailboxACL) { return Mono.fromCallable(() -> { MailboxACL oldMailboxAcl = mailbox.getACL(); - mailboxesByPath.get(mailbox.generateAssociatedPath()).setACL(mailboxACL); + mailboxesById.get(mailbox.getMailboxId()).setACL(mailboxACL); return ACLDiff.computeDiff(oldMailboxAcl, mailboxACL); }); } @Override public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { - return Mono.fromCallable(mailboxesByPath::values) + return Mono.fromCallable(mailboxesById::values) .flatMapIterable(Function.identity()) .filter(mailbox -> hasRightOn(mailbox, userName, right)); } @@ -166,4 +186,8 @@ public class InMemoryMailboxMapper implements MailboxMapper { .map(rights -> rights.contains(right)) .orElse(false); } + + private Mono<Boolean> existsMailboxPath(MailboxPath mailboxPath) { + return findMailboxByPath(mailboxPath).hasElement(); + } } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java index 67f44e77ed..1c59824bbf 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java @@ -19,23 +19,12 @@ package org.apache.james.mailbox.postgres.mail; -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; -import java.util.concurrent.ExecutionException; -import java.util.stream.IntStream; - import org.apache.james.backends.postgres.PostgresExtension; -import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest; -import org.apache.james.util.concurrency.ConcurrentTestRunner; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import com.google.common.collect.ImmutableMap; - class PostgresMailboxMapperACLTest extends MailboxMapperACLTest { @RegisterExtension static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE); @@ -47,35 +36,4 @@ class PostgresMailboxMapperACLTest extends MailboxMapperACLTest { mailboxMapper = new PostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor())); return mailboxMapper; } - - @Test - protected void updateAclShouldWorkWellInMultiThreadEnv() throws ExecutionException, InterruptedException { - MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write); - MailboxACL.Rfc4314Rights newRights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Write); - - ConcurrentTestRunner.builder() - .reactorOperation((threadNumber, step) -> { - int userNumber = threadNumber / 2; - MailboxACL.EntryKey key = MailboxACL.EntryKey.createUserEntryKey("user" + userNumber); - if (threadNumber % 2 == 0) { - return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(rights).asReplacement()) - .then(); - } else { - return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(newRights).asAddition()) - .then(); - } - }) - .threadCount(10) - .operationCount(1) - .runSuccessfullyWithin(Duration.ofMinutes(1)); - - MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0, 5).boxed() - .collect(ImmutableMap.toImmutableMap(userNumber -> MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber -> rights))); - - assertThat( - mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId()) - .block() - .getACL()) - .isEqualTo(expectedMailboxACL); - } } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java index 1352f0b74e..9811865f8d 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java @@ -19,11 +19,14 @@ package org.apache.james.mailbox.postgres.mail; +import java.util.concurrent.ExecutionException; + import org.apache.james.backends.postgres.PostgresExtension; import org.apache.james.backends.postgres.PostgresModule; import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.RegisterExtension; class RLSSupportPostgresMailboxMapperACLTest extends MailboxMapperACLTest { @@ -36,4 +39,9 @@ class RLSSupportPostgresMailboxMapperACLTest extends MailboxMapperACLTest { return new RLSSupportPostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()), new PostgresMailboxMemberDAO(postgresExtension.getDefaultPostgresExecutor())); } + + @Override + @Disabled("not yet implemented") + protected void updateAclShouldWorkWellInMultiThreadEnv() throws ExecutionException, InterruptedException { + } } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java index 6e2f94a90f..d0e9ebccaf 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java @@ -21,6 +21,10 @@ package org.apache.james.mailbox.store.mail.model; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + import org.apache.james.core.Username; import org.apache.james.mailbox.acl.ACLDiff; import org.apache.james.mailbox.exception.MailboxException; @@ -32,6 +36,7 @@ import org.apache.james.mailbox.model.MailboxACL.Right; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.store.mail.MailboxMapper; +import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -443,4 +448,35 @@ public abstract class MailboxMapperACLTest { assertThat(mailboxMapper.updateACL(benwaInboxMailbox, aclCommand).block()).isEqualTo(expectAclDiff); } + + @Test + protected void updateAclShouldWorkWellInMultiThreadEnv() throws ExecutionException, InterruptedException { + MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write); + MailboxACL.Rfc4314Rights newRights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Write); + + ConcurrentTestRunner.builder() + .reactorOperation((threadNumber, step) -> { + int userNumber = threadNumber / 2; + MailboxACL.EntryKey key = MailboxACL.EntryKey.createUserEntryKey("user" + userNumber); + if (threadNumber % 2 == 0) { + return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(rights).asReplacement()) + .then(); + } else { + return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(newRights).asAddition()) + .then(); + } + }) + .threadCount(10) + .operationCount(1) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0, 5).boxed() + .collect(ImmutableMap.toImmutableMap(userNumber -> MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber -> rights))); + + assertThat( + mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId()) + .block() + .getACL()) + .isEqualTo(expectedMailboxACL); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org