This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e84d5f1e1a42eefd6ac3bbbc7975e54dd7169f05 Author: Tung Tran <[email protected]> AuthorDate: Wed Nov 22 10:12:28 2023 +0700 JAMES-2586 Implement PostgresUidProvider --- .../mailbox/postgres/PostgresMailboxIdFaker.java | 43 +++++++ .../postgres/mail/PostgresMailboxModule.java | 10 +- .../mailbox/postgres/mail/PostgresUidProvider.java | 106 ++++++++++++++++ .../postgres/mail/dao/PostgresMailboxDAO.java | 24 +++- .../postgres/mail/PostgresUidProviderTest.java | 140 +++++++++++++++++++++ 5 files changed, 316 insertions(+), 7 deletions(-) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxIdFaker.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxIdFaker.java new file mode 100644 index 0000000000..23751b5001 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxIdFaker.java @@ -0,0 +1,43 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres; + +import java.util.UUID; + +import org.apache.james.mailbox.model.MailboxId; + +// TODO remove: this is trick convert JPAId to PostgresMailboxId when implementing PostgresUidProvider. +// it should be removed when all JPA dependencies are removed +@Deprecated +public class PostgresMailboxIdFaker { + public static PostgresMailboxId getMailboxId(MailboxId mailboxId) { + if (mailboxId instanceof JPAId) { + long longValue = ((JPAId) mailboxId).getRawId(); + return PostgresMailboxId.of(longToUUID(longValue)); + } + return (PostgresMailboxId) mailboxId; + } + + public static UUID longToUUID(Long longValue) { + long mostSigBits = longValue << 32; + long leastSigBits = 0; + return new UUID(mostSigBits, leastSigBits); + } +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java index 6ed11a0c56..9c9b424c48 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.postgres.mail; +import static org.jooq.impl.SQLDataType.BIGINT; + import java.util.UUID; import org.apache.james.backends.postgres.PostgresModule; @@ -35,14 +37,14 @@ public interface PostgresMailboxModule { Field<UUID> MAILBOX_ID = DSL.field("mailbox_id", SQLDataType.UUID.notNull()); Field<String> MAILBOX_NAME = DSL.field("mailbox_name", SQLDataType.VARCHAR(255).notNull()); - Field<Long> MAILBOX_UID_VALIDITY = DSL.field("mailbox_uid_validity", SQLDataType.BIGINT.notNull()); + Field<Long> MAILBOX_UID_VALIDITY = DSL.field("mailbox_uid_validity", BIGINT.notNull()); Field<String> USER_NAME = DSL.field("user_name", SQLDataType.VARCHAR(255)); Field<String> MAILBOX_NAMESPACE = DSL.field("mailbox_namespace", SQLDataType.VARCHAR(255).notNull()); - Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", SQLDataType.BIGINT); - Field<Long> MAILBOX_HIGHEST_MODSEQ = DSL.field("mailbox_highest_modseq", SQLDataType.BIGINT); + Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", BIGINT); + Field<Long> MAILBOX_HIGHEST_MODSEQ = DSL.field("mailbox_highest_modseq", BIGINT); PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName()) - .createTableStep(((dsl, tableName) -> dsl.createTable(tableName) + .createTableStep(((dsl, tableName) -> dsl.createTableIfNotExists(tableName) .column(MAILBOX_ID, SQLDataType.UUID) .column(MAILBOX_NAME) .column(MAILBOX_UID_VALIDITY) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresUidProvider.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresUidProvider.java new file mode 100644 index 0000000000..8333fcbf03 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresUidProvider.java @@ -0,0 +1,106 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import java.util.List; +import java.util.Optional; +import java.util.stream.LongStream; + +import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.Mailbox; +import org.apache.james.mailbox.model.MailboxId; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.store.mail.UidProvider; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import reactor.core.publisher.Mono; + +public class PostgresUidProvider implements UidProvider { + + public static class Factory { + + private final PostgresExecutor.Factory executorFactory; + + public Factory(PostgresExecutor.Factory executorFactory) { + this.executorFactory = executorFactory; + } + + public PostgresUidProvider create(MailboxSession session) { + PostgresExecutor postgresExecutor = executorFactory.create(session.getUser().getDomainPart()); + return new PostgresUidProvider(new PostgresMailboxDAO(postgresExecutor)); + } + } + + private final PostgresMailboxDAO mailboxDAO; + + public PostgresUidProvider(PostgresMailboxDAO mailboxDAO) { + this.mailboxDAO = mailboxDAO; + } + + @Override + public MessageUid nextUid(Mailbox mailbox) throws MailboxException { + return nextUid(mailbox.getMailboxId()); + } + + @Override + public Optional<MessageUid> lastUid(Mailbox mailbox) { + return lastUidReactive(mailbox).block(); + } + + @Override + public MessageUid nextUid(MailboxId mailboxId) throws MailboxException { + return nextUidReactive(mailboxId) + .blockOptional() + .orElseThrow(() -> new MailboxException("Error during Uid update")); + } + + @Override + public Mono<Optional<MessageUid>> lastUidReactive(Mailbox mailbox) { + return mailboxDAO.findLastUidByMailboxId(mailbox.getMailboxId()) + .map(Optional::of) + .switchIfEmpty(Mono.just(Optional.empty())); + } + + @Override + public Mono<MessageUid> nextUidReactive(MailboxId mailboxId) { + return mailboxDAO.incrementAndGetLastUid(mailboxId, 1) + .defaultIfEmpty(MessageUid.MIN_VALUE); + } + + @Override + public Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) { + Preconditions.checkArgument(count > 0, "Count need to be positive"); + Mono<MessageUid> updateNewLastUid = mailboxDAO.incrementAndGetLastUid(mailboxId, count) + .defaultIfEmpty(MessageUid.MIN_VALUE); + return updateNewLastUid.map(lastUid -> range(lastUid, count)); + } + + private List<MessageUid> range(MessageUid higherInclusive, int count) { + return LongStream.range(higherInclusive.asLong() - count + 1, higherInclusive.asLong() + 1) + .mapToObj(MessageUid::of) + .collect(ImmutableList.toImmutableList()); + } + +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java index 7e6d592bfb..de6a42e826 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java @@ -19,18 +19,20 @@ package org.apache.james.mailbox.postgres.mail.dao; +import static org.apache.james.mailbox.postgres.PostgresMailboxIdFaker.getMailboxId; import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_LAST_UID; import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_NAME; import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_NAMESPACE; import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_UID_VALIDITY; import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.TABLE_NAME; import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.USER_NAME; +import static org.jooq.impl.DSL.coalesce; import static org.jooq.impl.DSL.count; -import javax.inject.Inject; - import org.apache.james.backends.postgres.utils.PostgresExecutor; import org.apache.james.core.Username; +import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.exception.MailboxExistsException; import org.apache.james.mailbox.exception.MailboxNotFoundException; import org.apache.james.mailbox.model.Mailbox; @@ -54,7 +56,6 @@ public class PostgresMailboxDAO { private final PostgresExecutor postgresExecutor; - @Inject public PostgresMailboxDAO(PostgresExecutor postgresExecutor) { this.postgresExecutor = postgresExecutor; } @@ -140,4 +141,21 @@ public class PostgresMailboxDAO { return new Mailbox(new MailboxPath(record.get(MAILBOX_NAMESPACE), Username.of(record.get(USER_NAME)), record.get(MAILBOX_NAME)), UidValidity.of(record.get(MAILBOX_UID_VALIDITY)), PostgresMailboxId.of(record.get(MAILBOX_ID))); } + + public Mono<MessageUid> findLastUidByMailboxId(MailboxId mailboxId) { + return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(MAILBOX_LAST_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(getMailboxId(mailboxId).asUuid())))) + .flatMap(record -> Mono.justOrEmpty(record.get(MAILBOX_LAST_UID))) + .map(MessageUid::of); + } + + public Mono<MessageUid> incrementAndGetLastUid(MailboxId mailboxId, int count) { + return postgresExecutor.executeRow(dsl -> Mono.from(dsl.update(TABLE_NAME) + .set(MAILBOX_LAST_UID, coalesce(MAILBOX_LAST_UID, 0L).add(count)) + .where(MAILBOX_ID.eq(getMailboxId(mailboxId).asUuid())) + .returning(MAILBOX_LAST_UID))) + .map(record -> record.get(MAILBOX_LAST_UID)) + .map(MessageUid::of); + } } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresUidProviderTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresUidProviderTest.java new file mode 100644 index 0000000000..f2e20f09ac --- /dev/null +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresUidProviderTest.java @@ -0,0 +1,140 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.stream.LongStream; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.core.Username; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.Mailbox; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.mailbox.model.UidValidity; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.store.mail.UidProvider; +import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.github.fge.lambdas.Throwing; + +public class PostgresUidProviderTest { + + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE); + + private UidProvider uidProvider; + + private Mailbox mailbox; + + @BeforeEach + void setup() { + PostgresMailboxDAO mailboxDAO = new PostgresMailboxDAO(postgresExtension.getPostgresExecutor()); + uidProvider = new PostgresUidProvider(mailboxDAO); + MailboxPath mailboxPath = new MailboxPath("gsoc", Username.of("ieugen" + UUID.randomUUID()), "INBOX"); + UidValidity uidValidity = UidValidity.of(1234); + mailbox = mailboxDAO.create(mailboxPath, uidValidity).block(); + } + + @Test + void lastUidShouldRetrieveValueStoredByNextUid() throws Exception { + int nbEntries = 100; + Optional<MessageUid> result = uidProvider.lastUid(mailbox); + assertThat(result).isEmpty(); + LongStream.range(0, nbEntries) + .forEach(Throwing.longConsumer(value -> { + MessageUid uid = uidProvider.nextUid(mailbox); + assertThat(uid).isEqualTo(uidProvider.lastUid(mailbox).get()); + }) + ); + } + + @Test + void nextUidShouldIncrementValueByOne() { + int nbEntries = 100; + LongStream.range(1, nbEntries) + .forEach(Throwing.longConsumer(value -> { + MessageUid result = uidProvider.nextUid(mailbox); + assertThat(value).isEqualTo(result.asLong()); + })); + } + + @Test + void nextUidShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException, MailboxException { + uidProvider.nextUid(mailbox); + int threadCount = 10; + int nbEntries = 100; + + ConcurrentSkipListSet<MessageUid> messageUids = new ConcurrentSkipListSet<>(); + ConcurrentTestRunner.builder() + .operation((threadNumber, step) -> messageUids.add(uidProvider.nextUid(mailbox))) + .threadCount(threadCount) + .operationCount(nbEntries / threadCount) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(messageUids).hasSize(nbEntries); + } + + @Test + void nextUidsShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException, MailboxException { + uidProvider.nextUid(mailbox); + + int threadCount = 10; + int nbOperations = 100; + + ConcurrentSkipListSet<MessageUid> messageUids = new ConcurrentSkipListSet<>(); + ConcurrentTestRunner.builder() + .operation((threadNumber, step) -> messageUids.addAll(uidProvider.nextUids(mailbox.getMailboxId(), 10).block())) + .threadCount(threadCount) + .operationCount(nbOperations / threadCount) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(messageUids).hasSize(nbOperations * 10); + } + + @Test + void nextUidWithCountShouldReturnCorrectUids() { + int count = 10; + List<MessageUid> messageUids = uidProvider.nextUids(mailbox.getMailboxId(), count).block(); + assertThat(messageUids).hasSize(count) + .containsExactlyInAnyOrder( + MessageUid.of(1), + MessageUid.of(2), + MessageUid.of(3), + MessageUid.of(4), + MessageUid.of(5), + MessageUid.of(6), + MessageUid.of(7), + MessageUid.of(8), + MessageUid.of(9), + MessageUid.of(10)); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
