This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e84d5f1e1a42eefd6ac3bbbc7975e54dd7169f05
Author: Tung Tran <[email protected]>
AuthorDate: Wed Nov 22 10:12:28 2023 +0700

    JAMES-2586 Implement PostgresUidProvider
---
 .../mailbox/postgres/PostgresMailboxIdFaker.java   |  43 +++++++
 .../postgres/mail/PostgresMailboxModule.java       |  10 +-
 .../mailbox/postgres/mail/PostgresUidProvider.java | 106 ++++++++++++++++
 .../postgres/mail/dao/PostgresMailboxDAO.java      |  24 +++-
 .../postgres/mail/PostgresUidProviderTest.java     | 140 +++++++++++++++++++++
 5 files changed, 316 insertions(+), 7 deletions(-)

diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxIdFaker.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxIdFaker.java
new file mode 100644
index 0000000000..23751b5001
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxIdFaker.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres;
+
+import java.util.UUID;
+
+import org.apache.james.mailbox.model.MailboxId;
+
+// TODO remove: this is trick convert JPAId to PostgresMailboxId when 
implementing PostgresUidProvider.
+// it should be removed when all JPA dependencies are removed
+@Deprecated
+public class PostgresMailboxIdFaker {
+    public static PostgresMailboxId getMailboxId(MailboxId mailboxId) {
+        if (mailboxId instanceof JPAId) {
+            long longValue = ((JPAId) mailboxId).getRawId();
+            return PostgresMailboxId.of(longToUUID(longValue));
+        }
+        return (PostgresMailboxId) mailboxId;
+    }
+
+    public static UUID longToUUID(Long longValue) {
+        long mostSigBits = longValue << 32;
+        long leastSigBits = 0;
+        return new UUID(mostSigBits, leastSigBits);
+    }
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
index 6ed11a0c56..9c9b424c48 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
+import static org.jooq.impl.SQLDataType.BIGINT;
+
 import java.util.UUID;
 
 import org.apache.james.backends.postgres.PostgresModule;
@@ -35,14 +37,14 @@ public interface PostgresMailboxModule {
 
         Field<UUID> MAILBOX_ID = DSL.field("mailbox_id", 
SQLDataType.UUID.notNull());
         Field<String> MAILBOX_NAME = DSL.field("mailbox_name", 
SQLDataType.VARCHAR(255).notNull());
-        Field<Long> MAILBOX_UID_VALIDITY = DSL.field("mailbox_uid_validity", 
SQLDataType.BIGINT.notNull());
+        Field<Long> MAILBOX_UID_VALIDITY = DSL.field("mailbox_uid_validity", 
BIGINT.notNull());
         Field<String> USER_NAME = DSL.field("user_name", 
SQLDataType.VARCHAR(255));
         Field<String> MAILBOX_NAMESPACE = DSL.field("mailbox_namespace", 
SQLDataType.VARCHAR(255).notNull());
-        Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", 
SQLDataType.BIGINT);
-        Field<Long> MAILBOX_HIGHEST_MODSEQ = 
DSL.field("mailbox_highest_modseq", SQLDataType.BIGINT);
+        Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", BIGINT);
+        Field<Long> MAILBOX_HIGHEST_MODSEQ = 
DSL.field("mailbox_highest_modseq", BIGINT);
 
         PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName())
-            .createTableStep(((dsl, tableName) -> dsl.createTable(tableName)
+            .createTableStep(((dsl, tableName) -> 
dsl.createTableIfNotExists(tableName)
                 .column(MAILBOX_ID, SQLDataType.UUID)
                 .column(MAILBOX_NAME)
                 .column(MAILBOX_UID_VALIDITY)
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresUidProvider.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresUidProvider.java
new file mode 100644
index 0000000000..8333fcbf03
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresUidProvider.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.LongStream;
+
+import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.store.mail.UidProvider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Mono;
+
+public class PostgresUidProvider implements UidProvider {
+
+    public static class Factory {
+
+        private final PostgresExecutor.Factory executorFactory;
+
+        public Factory(PostgresExecutor.Factory executorFactory) {
+            this.executorFactory = executorFactory;
+        }
+
+        public PostgresUidProvider create(MailboxSession session) {
+            PostgresExecutor postgresExecutor = 
executorFactory.create(session.getUser().getDomainPart());
+            return new PostgresUidProvider(new 
PostgresMailboxDAO(postgresExecutor));
+        }
+    }
+
+    private final PostgresMailboxDAO mailboxDAO;
+
+    public PostgresUidProvider(PostgresMailboxDAO mailboxDAO) {
+        this.mailboxDAO = mailboxDAO;
+    }
+
+    @Override
+    public MessageUid nextUid(Mailbox mailbox) throws MailboxException {
+        return nextUid(mailbox.getMailboxId());
+    }
+
+    @Override
+    public Optional<MessageUid> lastUid(Mailbox mailbox) {
+        return lastUidReactive(mailbox).block();
+    }
+
+    @Override
+    public MessageUid nextUid(MailboxId mailboxId) throws MailboxException {
+        return nextUidReactive(mailboxId)
+            .blockOptional()
+            .orElseThrow(() -> new MailboxException("Error during Uid 
update"));
+    }
+
+    @Override
+    public Mono<Optional<MessageUid>> lastUidReactive(Mailbox mailbox) {
+        return mailboxDAO.findLastUidByMailboxId(mailbox.getMailboxId())
+            .map(Optional::of)
+            .switchIfEmpty(Mono.just(Optional.empty()));
+    }
+
+    @Override
+    public Mono<MessageUid> nextUidReactive(MailboxId mailboxId) {
+        return mailboxDAO.incrementAndGetLastUid(mailboxId, 1)
+            .defaultIfEmpty(MessageUid.MIN_VALUE);
+    }
+
+    @Override
+    public Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) {
+        Preconditions.checkArgument(count > 0, "Count need to be positive");
+        Mono<MessageUid> updateNewLastUid = 
mailboxDAO.incrementAndGetLastUid(mailboxId, count)
+            .defaultIfEmpty(MessageUid.MIN_VALUE);
+        return updateNewLastUid.map(lastUid -> range(lastUid, count));
+    }
+
+    private List<MessageUid> range(MessageUid higherInclusive, int count) {
+        return LongStream.range(higherInclusive.asLong() - count + 1, 
higherInclusive.asLong() + 1)
+            .mapToObj(MessageUid::of)
+            .collect(ImmutableList.toImmutableList());
+    }
+
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index 7e6d592bfb..de6a42e826 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -19,18 +19,20 @@
 
 package org.apache.james.mailbox.postgres.mail.dao;
 
+import static 
org.apache.james.mailbox.postgres.PostgresMailboxIdFaker.getMailboxId;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_LAST_UID;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_NAME;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_NAMESPACE;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_UID_VALIDITY;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.TABLE_NAME;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.USER_NAME;
+import static org.jooq.impl.DSL.coalesce;
 import static org.jooq.impl.DSL.count;
 
-import javax.inject.Inject;
-
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
 import org.apache.james.core.Username;
+import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.exception.MailboxExistsException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.model.Mailbox;
@@ -54,7 +56,6 @@ public class PostgresMailboxDAO {
 
     private final PostgresExecutor postgresExecutor;
 
-    @Inject
     public PostgresMailboxDAO(PostgresExecutor postgresExecutor) {
         this.postgresExecutor = postgresExecutor;
     }
@@ -140,4 +141,21 @@ public class PostgresMailboxDAO {
         return new Mailbox(new MailboxPath(record.get(MAILBOX_NAMESPACE), 
Username.of(record.get(USER_NAME)), record.get(MAILBOX_NAME)),
             UidValidity.of(record.get(MAILBOX_UID_VALIDITY)), 
PostgresMailboxId.of(record.get(MAILBOX_ID)));
     }
+
+    public Mono<MessageUid> findLastUidByMailboxId(MailboxId mailboxId) {
+        return postgresExecutor.executeRow(dsl -> 
Mono.from(dsl.select(MAILBOX_LAST_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(getMailboxId(mailboxId).asUuid()))))
+            .flatMap(record -> Mono.justOrEmpty(record.get(MAILBOX_LAST_UID)))
+            .map(MessageUid::of);
+    }
+
+    public Mono<MessageUid> incrementAndGetLastUid(MailboxId mailboxId, int 
count) {
+        return postgresExecutor.executeRow(dsl -> 
Mono.from(dsl.update(TABLE_NAME)
+                .set(MAILBOX_LAST_UID, coalesce(MAILBOX_LAST_UID, 
0L).add(count))
+                .where(MAILBOX_ID.eq(getMailboxId(mailboxId).asUuid()))
+                .returning(MAILBOX_LAST_UID)))
+            .map(record -> record.get(MAILBOX_LAST_UID))
+            .map(MessageUid::of);
+    }
 }
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresUidProviderTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresUidProviderTest.java
new file mode 100644
index 0000000000..f2e20f09ac
--- /dev/null
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresUidProviderTest.java
@@ -0,0 +1,140 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.LongStream;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.UidValidity;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.store.mail.UidProvider;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.github.fge.lambdas.Throwing;
+
+public class PostgresUidProviderTest {
+
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
+
+    private UidProvider uidProvider;
+
+    private Mailbox mailbox;
+
+    @BeforeEach
+    void setup() {
+        PostgresMailboxDAO mailboxDAO = new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor());
+        uidProvider = new PostgresUidProvider(mailboxDAO);
+        MailboxPath mailboxPath = new MailboxPath("gsoc", Username.of("ieugen" 
+ UUID.randomUUID()), "INBOX");
+        UidValidity uidValidity = UidValidity.of(1234);
+        mailbox = mailboxDAO.create(mailboxPath, uidValidity).block();
+    }
+
+    @Test
+    void lastUidShouldRetrieveValueStoredByNextUid() throws Exception {
+        int nbEntries = 100;
+        Optional<MessageUid> result = uidProvider.lastUid(mailbox);
+        assertThat(result).isEmpty();
+        LongStream.range(0, nbEntries)
+            .forEach(Throwing.longConsumer(value -> {
+                    MessageUid uid = uidProvider.nextUid(mailbox);
+                    
assertThat(uid).isEqualTo(uidProvider.lastUid(mailbox).get());
+                })
+            );
+    }
+
+    @Test
+    void nextUidShouldIncrementValueByOne() {
+        int nbEntries = 100;
+        LongStream.range(1, nbEntries)
+            .forEach(Throwing.longConsumer(value -> {
+                MessageUid result = uidProvider.nextUid(mailbox);
+                assertThat(value).isEqualTo(result.asLong());
+            }));
+    }
+
+    @Test
+    void nextUidShouldGenerateUniqueValuesWhenParallelCalls() throws 
ExecutionException, InterruptedException, MailboxException {
+        uidProvider.nextUid(mailbox);
+        int threadCount = 10;
+        int nbEntries = 100;
+
+        ConcurrentSkipListSet<MessageUid> messageUids = new 
ConcurrentSkipListSet<>();
+        ConcurrentTestRunner.builder()
+            .operation((threadNumber, step) -> 
messageUids.add(uidProvider.nextUid(mailbox)))
+            .threadCount(threadCount)
+            .operationCount(nbEntries / threadCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(messageUids).hasSize(nbEntries);
+    }
+
+    @Test
+    void nextUidsShouldGenerateUniqueValuesWhenParallelCalls() throws 
ExecutionException, InterruptedException, MailboxException {
+        uidProvider.nextUid(mailbox);
+
+        int threadCount = 10;
+        int nbOperations = 100;
+
+        ConcurrentSkipListSet<MessageUid> messageUids = new 
ConcurrentSkipListSet<>();
+        ConcurrentTestRunner.builder()
+            .operation((threadNumber, step) -> 
messageUids.addAll(uidProvider.nextUids(mailbox.getMailboxId(), 10).block()))
+            .threadCount(threadCount)
+            .operationCount(nbOperations / threadCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(messageUids).hasSize(nbOperations * 10);
+    }
+
+    @Test
+    void nextUidWithCountShouldReturnCorrectUids() {
+        int count = 10;
+        List<MessageUid> messageUids = 
uidProvider.nextUids(mailbox.getMailboxId(), count).block();
+        assertThat(messageUids).hasSize(count)
+            .containsExactlyInAnyOrder(
+                MessageUid.of(1),
+                MessageUid.of(2),
+                MessageUid.of(3),
+                MessageUid.of(4),
+                MessageUid.of(5),
+                MessageUid.of(6),
+                MessageUid.of(7),
+                MessageUid.of(8),
+                MessageUid.of(9),
+                MessageUid.of(10));
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to