ptupitsyn commented on code in PR #6007:
URL: https://github.com/apache/ignite-3/pull/6007#discussion_r2156364467


##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -877,12 +886,148 @@ int tryGetPartitionCount() {
         return partitionCount;
     }
 
+    /**
+     * Batch with indexes.
+     *
+     * @param <E> Batch type element.
+     */
+    static class Batch<E> {
+        List<E> batch = new ArrayList<>();
+        List<Integer> originalIndices = new ArrayList<>();
+
+        void add(E entry, int origIdx) {
+            batch.add(entry);
+            originalIndices.add(origIdx);
+        }
+    }
+
+    private static <E> void reduceWithKeepOrder(List<E> agg, List<E> cur, 
List<Integer> originalIndices) {
+        for (int i = 0; i < cur.size(); i++) {
+            E val = cur.get(i);
+            Integer orig = originalIndices.get(i);
+            agg.set(orig, val);
+        }
+    }
+
+    <R, E> CompletableFuture<R> split(
+            Transaction tx,
+            Collection<E> keys,
+            BiFunction<Collection<E>, PartitionAwarenessProvider, 
CompletableFuture<R>> fun,
+            @Nullable R initialValue,
+            Reducer<R> reducer,
+            BiFunction<ClientSchema, E, Integer> hashFunc
+    ) {
+        assert tx != null;
+
+        CompletableFuture<ClientSchema> schemaFut = getSchema(latestSchemaVer);
+        CompletableFuture<List<String>> partitionsFut = 
getPartitionAssignment();
+
+        return CompletableFuture.allOf(schemaFut, partitionsFut)
+                .thenCompose(v -> {
+                    List<E> unmapped = new ArrayList<>();
+                    Map<Integer, List<E>> mapped = new HashMap<>();
+                    for (E key : keys) {
+                        ClientSchema schema = schemaFut.getNow(null);
+                        @Nullable List<String> aff = 
partitionsFut.getNow(null);
+                        int hash = hashFunc.apply(schema, key);
+                        Integer part = aff == null ? null : Math.abs(hash % 
aff.size());

Review Comment:
   I think we can move this check outside of the loop and exit early if the 
assignment is not available (**or empty** - all elements can be null).



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/PartitionAwarenessProvider.java:
##########
@@ -17,45 +17,42 @@
 
 package org.apache.ignite.internal.client.table;
 
-import java.util.function.BiFunction;
+import java.util.function.Function;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Partition awareness provider.
- * Represents 3 use cases:
- * 1. Partition awareness is enabled. Use hashFunc to determine partition.
- * 2. Transaction is used. Use specific channel.
- * 3. Null instance = No partition awareness and no transaction. Use any 
channel.
+ * Used to calculate a partition for a specific operation.
  */
 public class PartitionAwarenessProvider {
-    private final @Nullable Integer partition;
+    static PartitionAwarenessProvider NULL_PROVIDER = of((Integer) null);
 
-    private final @Nullable BiFunction<ClientSchema, Boolean, Integer> 
hashFunc;
+    private final @Nullable Integer partition;
 
-    private PartitionAwarenessProvider(@Nullable BiFunction<ClientSchema, 
Boolean, Integer> hashFunc, @Nullable Integer partition) {
-        assert hashFunc != null ^ partition != null;
+    private final @Nullable Function<ClientSchema, Integer> hashFunc;
 
+    private PartitionAwarenessProvider(@Nullable Function<ClientSchema, 
Integer> hashFunc, @Nullable Integer partition) {
         this.hashFunc = hashFunc;
         this.partition = partition;
     }
 
-    public static PartitionAwarenessProvider of(Integer partition) {
+    public static PartitionAwarenessProvider of(@Nullable Integer partition) {

Review Comment:
   Let's remove `@Nullable` here, it is only needed for `NULL_PROVIDER` where 
the warning can be suppressed. For all other usages it should not be null.



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -877,12 +886,148 @@ int tryGetPartitionCount() {
         return partitionCount;
     }
 
+    /**
+     * Batch with indexes.
+     *
+     * @param <E> Batch type element.
+     */
+    static class Batch<E> {
+        List<E> batch = new ArrayList<>();
+        List<Integer> originalIndices = new ArrayList<>();
+
+        void add(E entry, int origIdx) {
+            batch.add(entry);
+            originalIndices.add(origIdx);
+        }
+    }
+
+    private static <E> void reduceWithKeepOrder(List<E> agg, List<E> cur, 
List<Integer> originalIndices) {
+        for (int i = 0; i < cur.size(); i++) {
+            E val = cur.get(i);
+            Integer orig = originalIndices.get(i);
+            agg.set(orig, val);
+        }
+    }
+
+    <R, E> CompletableFuture<R> split(
+            Transaction tx,
+            Collection<E> keys,
+            BiFunction<Collection<E>, PartitionAwarenessProvider, 
CompletableFuture<R>> fun,
+            @Nullable R initialValue,
+            Reducer<R> reducer,
+            BiFunction<ClientSchema, E, Integer> hashFunc
+    ) {
+        assert tx != null;
+
+        CompletableFuture<ClientSchema> schemaFut = getSchema(latestSchemaVer);
+        CompletableFuture<List<String>> partitionsFut = 
getPartitionAssignment();
+
+        return CompletableFuture.allOf(schemaFut, partitionsFut)
+                .thenCompose(v -> {
+                    List<E> unmapped = new ArrayList<>();
+                    Map<Integer, List<E>> mapped = new HashMap<>();
+                    for (E key : keys) {
+                        ClientSchema schema = schemaFut.getNow(null);
+                        @Nullable List<String> aff = 
partitionsFut.getNow(null);
+                        int hash = hashFunc.apply(schema, key);
+                        Integer part = aff == null ? null : Math.abs(hash % 
aff.size());
+                        if (part == null) {
+                            unmapped.add(key);
+                        } else {
+                            mapped.computeIfAbsent(part, k -> new 
ArrayList<>()).add(key);
+                        }
+                    }
+
+                    List<CompletableFuture<R>> res = new ArrayList<>();
+
+                    if (!unmapped.isEmpty()) {
+                        // Disable awareness for unmapped keys.
+                        res.add(fun.apply(unmapped, 
PartitionAwarenessProvider.NULL_PROVIDER));
+                    }
+
+                    for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+                        res.add(fun.apply(entry.getValue(), 
PartitionAwarenessProvider.of(entry.getKey())));
+                    }
+
+                    return CompletableFuture.allOf(res.toArray(new 
CompletableFuture[0])).thenApply(ignored -> {
+                        R in = initialValue;
+
+                        for (CompletableFuture<R> val : res) {
+                            in = reducer.reduce(in, val.getNow(null));
+                        }
+
+                        return in;
+                    });
+                });
+    }
+
+    <E> CompletableFuture<List<E>> split(
+            Transaction tx,
+            Collection<E> keys,
+            BiFunction<Collection<E>, PartitionAwarenessProvider, 
CompletableFuture<List<E>>> fun,
+            BiFunction<ClientSchema, E, Integer> hashFunc
+    ) {
+        assert tx != null;
+
+        CompletableFuture<ClientSchema> schemaFut = getSchema(latestSchemaVer);
+        CompletableFuture<List<String>> partitionsFut = 
getPartitionAssignment();
+
+        return CompletableFuture.allOf(schemaFut, partitionsFut)
+                .thenCompose(v -> {
+                    Batch<E> unmapped = new Batch<>();
+                    Map<Integer, Batch<E>> mapped = new HashMap<>();
+                    int idx = 0;
+                    for (E key : keys) {
+                        ClientSchema schema = schemaFut.getNow(null);
+                        @Nullable List<String> aff = 
partitionsFut.getNow(null);
+                        int hash = hashFunc.apply(schema, key);
+                        Integer part = aff == null ? null : Math.abs(hash % 
aff.size());

Review Comment:
   Same as above - move what's possible outside of the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to