This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3037a2b2c348dbfafdd49f47566c654970acc13d
Author: Tung Tran <[email protected]>
AuthorDate: Thu Apr 6 08:08:21 2023 +0700

    AnnotationMapper support more reactive api & Cassandra implement
---
 .../cassandra/mail/CassandraAnnotationMapper.java  | 156 +++++++++++++--------
 .../james/mailbox/store/mail/AnnotationMapper.java |  36 +++++
 2 files changed, 132 insertions(+), 60 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
index cce1d2b520..179cc060ac 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
@@ -26,11 +26,11 @@ import static 
com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
 import static 
com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
 
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraAnnotationTable;
 import org.apache.james.mailbox.model.MailboxAnnotation;
@@ -46,21 +46,25 @@ import 
com.datastax.oss.driver.api.querybuilder.select.Select;
 import com.google.common.base.Ascii;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraAnnotationMapper extends NonTransactionalMapper 
implements AnnotationMapper {
 
-    private final CqlSession session;
+    private final CassandraAsyncExecutor asyncExecutor;
     private final PreparedStatement delete;
     private final PreparedStatement insert;
+    private final PreparedStatement getStoredAnnotationsQuery;
+    private final PreparedStatement countStoredAnnotationsQuery;
     private final PreparedStatement getStoredAnnotationsQueryForKeys;
     private final PreparedStatement getStoredAnnotationsQueryLikeKey;
     private final PreparedStatement getStoredAnnotationsQueryByKey;
 
     @Inject
     public CassandraAnnotationMapper(CqlSession session) {
-        this.session = session;
+        this.asyncExecutor = new CassandraAsyncExecutor(session);
         this.delete = 
session.prepare(deleteFrom(CassandraAnnotationTable.TABLE_NAME)
             
.where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)),
                 
column(CassandraAnnotationTable.KEY).isEqualTo(bindMarker(CassandraAnnotationTable.KEY)))
@@ -72,63 +76,109 @@ public class CassandraAnnotationMapper extends 
NonTransactionalMapper implements
             .value(CassandraAnnotationTable.VALUE, 
bindMarker(CassandraAnnotationTable.VALUE))
             .build());
 
-        this.getStoredAnnotationsQueryForKeys = 
getStoredAnnotationsQueryForKeys();
-        this.getStoredAnnotationsQueryLikeKey = 
getStoredAnnotationsQueryLikeKey();
-        this.getStoredAnnotationsQueryByKey = getStoredAnnotationsQueryByKey();
+        this.getStoredAnnotationsQuery = 
session.prepare(getStoredAnnotationsQuery().build());
+
+        this.countStoredAnnotationsQuery = 
session.prepare(selectFrom(CassandraAnnotationTable.TABLE_NAME)
+            .countAll()
+            
.where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)))
+            .build());
+
+        this.getStoredAnnotationsQueryForKeys =
+            session.prepare(getStoredAnnotationsQuery()
+                
.where(column(CassandraAnnotationTable.KEY).in(bindMarker(CassandraAnnotationTable.KEY)))
+                .build());
+
+        this.getStoredAnnotationsQueryLikeKey =
+            session.prepare(getStoredAnnotationsQuery()
+                .where(column(CassandraAnnotationTable.KEY)
+                        
.isGreaterThanOrEqualTo(bindMarker(CassandraAnnotationTable.GREATER_BIND_KEY)),
+                    column(CassandraAnnotationTable.KEY)
+                        
.isLessThanOrEqualTo(bindMarker(CassandraAnnotationTable.LESSER_BIND_KEY)))
+                .build());
+
+        this.getStoredAnnotationsQueryByKey =
+            session.prepare(getStoredAnnotationsQuery()
+                .where(column(CassandraAnnotationTable.KEY)
+                    .isEqualTo(bindMarker(CassandraAnnotationTable.KEY)))
+                .build());
     }
 
     @Override
     public List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return 
Flux.from(session.executeReactive(session.prepare(getStoredAnnotationsQuery().build()).bind()
-                .setUuid(CassandraAnnotationTable.MAILBOX_ID, 
cassandraId.asUuid())))
-            .map(this::toAnnotation)
+        return getAllAnnotationsReactive(mailboxId)
             .collectList()
             .block();
     }
 
+    @Override
+    public Flux<MailboxAnnotation> getAllAnnotationsReactive(MailboxId 
mailboxId) {
+        return asyncExecutor.executeRows(getStoredAnnotationsQuery.bind()
+                .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) 
mailboxId).asUuid()))
+            .map(this::toAnnotation);
+    }
+
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return 
Flux.from(session.executeReactive(getStoredAnnotationsQueryForKeys.bind()
-                .setUuid(CassandraAnnotationTable.MAILBOX_ID, 
cassandraId.asUuid())
-                .setList(CassandraAnnotationTable.KEY, keys.stream()
-                    .map(MailboxAnnotationKey::asString)
-                    .collect(ImmutableList.toImmutableList()), String.class)))
-            .map(this::toAnnotation)
+        return getAnnotationsByKeysReactive(mailboxId, keys)
             .collectList()
             .block();
     }
 
+    @Override
+    public Flux<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxId 
mailboxId, Set<MailboxAnnotationKey> keys) {
+        return 
asyncExecutor.executeRows(getStoredAnnotationsQueryForKeys.bind()
+                .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) 
mailboxId).asUuid())
+                .setList(CassandraAnnotationTable.KEY, keys.stream()
+                    .map(MailboxAnnotationKey::asString)
+                    .collect(ImmutableList.toImmutableList()), String.class))
+            .map(this::toAnnotation);
+    }
+
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId 
mailboxId, Set<MailboxAnnotationKey> keys) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return Flux.fromIterable(keys)
-            .flatMap(annotation -> 
getAnnotationsByKeyWithOneDepth(cassandraId, annotation))
-            .collectList()
+        return getAnnotationsByKeysWithOneDepthReactive(mailboxId, 
keys).collectList()
             .block();
     }
 
     @Override
-    public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId 
mailboxId, Set<MailboxAnnotationKey> keys) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
+    public Flux<MailboxAnnotation> 
getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys) {
         return Flux.fromIterable(keys)
-            .flatMap(annotation -> 
getAnnotationsByKeyWithAllDepth(cassandraId, annotation))
-            .collectList()
+            .flatMap(annotation -> 
getAnnotationsByKeyWithOneDepth((CassandraId) mailboxId, annotation));
+    }
+
+    @Override
+    public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId 
mailboxId, Set<MailboxAnnotationKey> keys) {
+        return getAnnotationsByKeysWithAllDepthReactive(mailboxId, 
keys).collectList()
             .block();
     }
 
+    @Override
+    public Flux<MailboxAnnotation> 
getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(keys)
+            .flatMap(annotation -> 
getAnnotationsByKeyWithAllDepth((CassandraId) mailboxId, annotation));
+    }
+
     @Override
     public void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey 
key) {
-        session.execute(delete.bind()
+        deleteAnnotationReactive(mailboxId, key).block();
+    }
+
+    @Override
+    public Mono<Void> deleteAnnotationReactive(MailboxId mailboxId, 
MailboxAnnotationKey key) {
+        return asyncExecutor.executeVoid(delete.bind()
             .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) 
mailboxId).asUuid())
             .setString(CassandraAnnotationTable.KEY, key.asString()));
     }
 
     @Override
     public void insertAnnotation(MailboxId mailboxId, MailboxAnnotation 
mailboxAnnotation) {
+        insertAnnotationReactive(mailboxId, mailboxAnnotation).block();
+    }
+
+    @Override
+    public Mono<Void> insertAnnotationReactive(MailboxId mailboxId, 
MailboxAnnotation mailboxAnnotation) {
         Preconditions.checkArgument(!mailboxAnnotation.isNil());
-        session.execute(insert.bind()
+        return asyncExecutor.executeVoid(insert.bind()
             .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) 
mailboxId).asUuid())
             .setString(CassandraAnnotationTable.KEY, 
mailboxAnnotation.getKey().asString())
             .setString(CassandraAnnotationTable.VALUE, 
mailboxAnnotation.getValue().get()));
@@ -136,20 +186,25 @@ public class CassandraAnnotationMapper extends 
NonTransactionalMapper implements
 
     @Override
     public boolean exist(MailboxId mailboxId, MailboxAnnotation 
mailboxAnnotation) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        Optional<Row> row = Optional.ofNullable(
-            session.execute(getStoredAnnotationsQueryByKey.bind()
-                    .setUuid(CassandraAnnotationTable.MAILBOX_ID, 
cassandraId.asUuid())
-                    .setString(CassandraAnnotationTable.KEY, 
mailboxAnnotation.getKey().asString()))
-                .one());
-        return row.isPresent();
+        return existReactive(mailboxId, mailboxAnnotation).block();
+    }
+
+    @Override
+    public Mono<Boolean> existReactive(MailboxId mailboxId, MailboxAnnotation 
mailboxAnnotation) {
+        return 
asyncExecutor.executeReturnExists(getStoredAnnotationsQueryByKey.bind()
+            .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) 
mailboxId).asUuid())
+            .setString(CassandraAnnotationTable.KEY, 
mailboxAnnotation.getKey().asString()));
     }
 
     @Override
     public int countAnnotations(MailboxId mailboxId) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return 
session.execute(session.prepare(getStoredAnnotationsQuery().build()).bind()
-            .setUuid(CassandraAnnotationTable.MAILBOX_ID, 
cassandraId.asUuid())).getAvailableWithoutFetching();
+        return countAnnotationsReactive(mailboxId).block();
+    }
+
+    public Mono<Integer> countAnnotationsReactive(MailboxId mailboxId) {
+        return 
asyncExecutor.executeSingleRow(countStoredAnnotationsQuery.bind()
+                .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) 
mailboxId).asUuid()))
+            .map(row -> Ints.checkedCast(row.getLong(0)));
     }
 
     private MailboxAnnotation toAnnotation(Row row) {
@@ -163,43 +218,24 @@ public class CassandraAnnotationMapper extends 
NonTransactionalMapper implements
             
.where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)));
     }
 
-    private PreparedStatement getStoredAnnotationsQueryForKeys() {
-        return session.prepare(getStoredAnnotationsQuery()
-            
.where(column(CassandraAnnotationTable.KEY).in(bindMarker(CassandraAnnotationTable.KEY)))
-            .build());
-    }
-
-    private PreparedStatement getStoredAnnotationsQueryLikeKey() {
-        return session.prepare(getStoredAnnotationsQuery()
-            
.where(column(CassandraAnnotationTable.KEY).isGreaterThanOrEqualTo(bindMarker(CassandraAnnotationTable.GREATER_BIND_KEY)),
-                
column(CassandraAnnotationTable.KEY).isLessThanOrEqualTo(bindMarker(CassandraAnnotationTable.LESSER_BIND_KEY)))
-            .build());
-    }
-
-    private PreparedStatement getStoredAnnotationsQueryByKey() {
-        return session.prepare(getStoredAnnotationsQuery()
-            
.where(column(CassandraAnnotationTable.KEY).isEqualTo(bindMarker(CassandraAnnotationTable.KEY)))
-            .build());
-    }
-
     private String buildNextKey(String key) {
         return key + MailboxAnnotationKey.SLASH_CHARACTER + Ascii.MAX;
     }
 
     private Flux<MailboxAnnotation> 
getAnnotationsByKeyWithAllDepth(CassandraId mailboxId, MailboxAnnotationKey 
key) {
-        return 
Flux.from(session.executeReactive(getStoredAnnotationsQueryLikeKey.bind()
+        return 
asyncExecutor.executeRows(getStoredAnnotationsQueryLikeKey.bind()
                 .setUuid(CassandraAnnotationTable.MAILBOX_ID, 
mailboxId.asUuid())
                 .setString(CassandraAnnotationTable.GREATER_BIND_KEY, 
key.asString())
-                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, 
buildNextKey(key.asString()))))
+                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, 
buildNextKey(key.asString())))
             .map(this::toAnnotation)
             .filter(annotation -> 
key.isAncestorOrIsEqual(annotation.getKey()));
     }
 
     private Flux<MailboxAnnotation> 
getAnnotationsByKeyWithOneDepth(CassandraId mailboxId, MailboxAnnotationKey 
key) {
-        return 
Flux.from(session.executeReactive(getStoredAnnotationsQueryLikeKey.bind()
+        return 
asyncExecutor.executeRows(getStoredAnnotationsQueryLikeKey.bind()
                 .setUuid(CassandraAnnotationTable.MAILBOX_ID, 
mailboxId.asUuid())
                 .setString(CassandraAnnotationTable.GREATER_BIND_KEY, 
key.asString())
-                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, 
buildNextKey(key.asString()))))
+                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, 
buildNextKey(key.asString())))
             .map(this::toAnnotation)
             .filter(annotation -> key.isParentOrIsEqual(annotation.getKey()));
     }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
index ec775de6c6..da3146fcc9 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
@@ -26,6 +26,10 @@ import org.apache.james.mailbox.model.MailboxAnnotation;
 import org.apache.james.mailbox.model.MailboxAnnotationKey;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.transaction.Mapper;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public interface AnnotationMapper extends Mapper {
     /**
@@ -37,6 +41,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId);
 
+    default Publisher<MailboxAnnotation> getAllAnnotationsReactive(MailboxId 
mailboxId) {
+        return Flux.fromIterable(getAllAnnotations(mailboxId));
+    }
+
     /**
      * Search all the <code>MailboxAnnotation</code> of selected mailbox by 
the set of annotation's keys. The result is not ordered and should not
      * contain duplicate by key
@@ -47,6 +55,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys);
 
+    default Publisher<MailboxAnnotation> 
getAnnotationsByKeysReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> 
keys) {
+        return Flux.fromIterable(getAnnotationsByKeys(mailboxId, keys));
+    }
+
     /**
      * Search all the <code>MailboxAnnotation</code> of selected mailbox by 
the set of annotation's keys as well as its children entries
      * The result is not ordered and should not contain duplicate by key
@@ -57,6 +69,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId 
mailboxId, Set<MailboxAnnotationKey> keys);
 
+    default Publisher<MailboxAnnotation> 
getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(getAnnotationsByKeysWithOneDepth(mailboxId, 
keys));
+    }
+
     /**
      * Search all the <code>MailboxAnnotation</code> of selected mailbox by 
the set of annotation's keys and entries below the keys
      * The result is not ordered and should not contain duplicate by key
@@ -67,6 +83,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId 
mailboxId, Set<MailboxAnnotationKey> keys);
 
+    default Publisher<MailboxAnnotation> 
getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, 
Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(getAnnotationsByKeysWithAllDepth(mailboxId, 
keys));
+    }
+
     /**
      * Delete the annotation of selected mailbox by its key.
      *
@@ -75,6 +95,10 @@ public interface AnnotationMapper extends Mapper {
      */
     void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey key);
 
+    default Publisher<Void> deleteAnnotationReactive(MailboxId mailboxId, 
MailboxAnnotationKey key) {
+        return Mono.fromRunnable(() -> deleteAnnotation(mailboxId, key));
+    }
+
     /**
      * - Insert new annotation if it does not exist on store
      * - Update the new value for existed annotation
@@ -84,6 +108,10 @@ public interface AnnotationMapper extends Mapper {
      */
     void insertAnnotation(MailboxId mailboxId, MailboxAnnotation 
mailboxAnnotation);
 
+    default Publisher<Void> insertAnnotationReactive(MailboxId mailboxId, 
MailboxAnnotation mailboxAnnotation) {
+        return Mono.fromRunnable(() -> insertAnnotation(mailboxId, 
mailboxAnnotation));
+    }
+
     /**
      * Checking the current annotation of selected mailbox exists on store or 
not. It's checked by annotation key, not by its value.
      *
@@ -93,9 +121,17 @@ public interface AnnotationMapper extends Mapper {
      */
     boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation);
 
+    default Publisher<Boolean> existReactive(MailboxId mailboxId, 
MailboxAnnotation mailboxAnnotation) {
+        return Mono.fromCallable(() -> exist(mailboxId, mailboxAnnotation));
+    }
+
     /**
      * Getting total number of current annotation on mailbox
      *
      */
     int countAnnotations(MailboxId mailboxId);
+
+    default Publisher<Integer> countAnnotationsReactive(MailboxId mailboxId) {
+        return Mono.fromCallable(() -> countAnnotations(mailboxId));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to