ptupitsyn commented on code in PR #6007: URL: https://github.com/apache/ignite-3/pull/6007#discussion_r2156364467
########## modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java: ########## @@ -877,12 +886,148 @@ int tryGetPartitionCount() { return partitionCount; } + /** + * Batch with indexes. + * + * @param <E> Batch type element. + */ + static class Batch<E> { + List<E> batch = new ArrayList<>(); + List<Integer> originalIndices = new ArrayList<>(); + + void add(E entry, int origIdx) { + batch.add(entry); + originalIndices.add(origIdx); + } + } + + private static <E> void reduceWithKeepOrder(List<E> agg, List<E> cur, List<Integer> originalIndices) { + for (int i = 0; i < cur.size(); i++) { + E val = cur.get(i); + Integer orig = originalIndices.get(i); + agg.set(orig, val); + } + } + + <R, E> CompletableFuture<R> split( + Transaction tx, + Collection<E> keys, + BiFunction<Collection<E>, PartitionAwarenessProvider, CompletableFuture<R>> fun, + @Nullable R initialValue, + Reducer<R> reducer, + BiFunction<ClientSchema, E, Integer> hashFunc + ) { + assert tx != null; + + CompletableFuture<ClientSchema> schemaFut = getSchema(latestSchemaVer); + CompletableFuture<List<String>> partitionsFut = getPartitionAssignment(); + + return CompletableFuture.allOf(schemaFut, partitionsFut) + .thenCompose(v -> { + List<E> unmapped = new ArrayList<>(); + Map<Integer, List<E>> mapped = new HashMap<>(); + for (E key : keys) { + ClientSchema schema = schemaFut.getNow(null); + @Nullable List<String> aff = partitionsFut.getNow(null); + int hash = hashFunc.apply(schema, key); + Integer part = aff == null ? null : Math.abs(hash % aff.size()); Review Comment: I think we can move this check outside of the loop and exit early if the assignment is not available (**or empty** - all elements can be null). ########## modules/client/src/main/java/org/apache/ignite/internal/client/table/PartitionAwarenessProvider.java: ########## @@ -17,45 +17,42 @@ package org.apache.ignite.internal.client.table; -import java.util.function.BiFunction; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; /** * Partition awareness provider. - * Represents 3 use cases: - * 1. Partition awareness is enabled. Use hashFunc to determine partition. - * 2. Transaction is used. Use specific channel. - * 3. Null instance = No partition awareness and no transaction. Use any channel. + * Used to calculate a partition for a specific operation. */ public class PartitionAwarenessProvider { - private final @Nullable Integer partition; + static PartitionAwarenessProvider NULL_PROVIDER = of((Integer) null); - private final @Nullable BiFunction<ClientSchema, Boolean, Integer> hashFunc; + private final @Nullable Integer partition; - private PartitionAwarenessProvider(@Nullable BiFunction<ClientSchema, Boolean, Integer> hashFunc, @Nullable Integer partition) { - assert hashFunc != null ^ partition != null; + private final @Nullable Function<ClientSchema, Integer> hashFunc; + private PartitionAwarenessProvider(@Nullable Function<ClientSchema, Integer> hashFunc, @Nullable Integer partition) { this.hashFunc = hashFunc; this.partition = partition; } - public static PartitionAwarenessProvider of(Integer partition) { + public static PartitionAwarenessProvider of(@Nullable Integer partition) { Review Comment: Let's remove `@Nullable` here, it is only needed for `NULL_PROVIDER` where the warning can be suppressed. For all other usages it should not be null. ########## modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java: ########## @@ -877,12 +886,148 @@ int tryGetPartitionCount() { return partitionCount; } + /** + * Batch with indexes. + * + * @param <E> Batch type element. + */ + static class Batch<E> { + List<E> batch = new ArrayList<>(); + List<Integer> originalIndices = new ArrayList<>(); + + void add(E entry, int origIdx) { + batch.add(entry); + originalIndices.add(origIdx); + } + } + + private static <E> void reduceWithKeepOrder(List<E> agg, List<E> cur, List<Integer> originalIndices) { + for (int i = 0; i < cur.size(); i++) { + E val = cur.get(i); + Integer orig = originalIndices.get(i); + agg.set(orig, val); + } + } + + <R, E> CompletableFuture<R> split( + Transaction tx, + Collection<E> keys, + BiFunction<Collection<E>, PartitionAwarenessProvider, CompletableFuture<R>> fun, + @Nullable R initialValue, + Reducer<R> reducer, + BiFunction<ClientSchema, E, Integer> hashFunc + ) { + assert tx != null; + + CompletableFuture<ClientSchema> schemaFut = getSchema(latestSchemaVer); + CompletableFuture<List<String>> partitionsFut = getPartitionAssignment(); + + return CompletableFuture.allOf(schemaFut, partitionsFut) + .thenCompose(v -> { + List<E> unmapped = new ArrayList<>(); + Map<Integer, List<E>> mapped = new HashMap<>(); + for (E key : keys) { + ClientSchema schema = schemaFut.getNow(null); + @Nullable List<String> aff = partitionsFut.getNow(null); + int hash = hashFunc.apply(schema, key); + Integer part = aff == null ? null : Math.abs(hash % aff.size()); + if (part == null) { + unmapped.add(key); + } else { + mapped.computeIfAbsent(part, k -> new ArrayList<>()).add(key); + } + } + + List<CompletableFuture<R>> res = new ArrayList<>(); + + if (!unmapped.isEmpty()) { + // Disable awareness for unmapped keys. + res.add(fun.apply(unmapped, PartitionAwarenessProvider.NULL_PROVIDER)); + } + + for (Entry<Integer, List<E>> entry : mapped.entrySet()) { + res.add(fun.apply(entry.getValue(), PartitionAwarenessProvider.of(entry.getKey()))); + } + + return CompletableFuture.allOf(res.toArray(new CompletableFuture[0])).thenApply(ignored -> { + R in = initialValue; + + for (CompletableFuture<R> val : res) { + in = reducer.reduce(in, val.getNow(null)); + } + + return in; + }); + }); + } + + <E> CompletableFuture<List<E>> split( + Transaction tx, + Collection<E> keys, + BiFunction<Collection<E>, PartitionAwarenessProvider, CompletableFuture<List<E>>> fun, + BiFunction<ClientSchema, E, Integer> hashFunc + ) { + assert tx != null; + + CompletableFuture<ClientSchema> schemaFut = getSchema(latestSchemaVer); + CompletableFuture<List<String>> partitionsFut = getPartitionAssignment(); + + return CompletableFuture.allOf(schemaFut, partitionsFut) + .thenCompose(v -> { + Batch<E> unmapped = new Batch<>(); + Map<Integer, Batch<E>> mapped = new HashMap<>(); + int idx = 0; + for (E key : keys) { + ClientSchema schema = schemaFut.getNow(null); + @Nullable List<String> aff = partitionsFut.getNow(null); + int hash = hashFunc.apply(schema, key); + Integer part = aff == null ? null : Math.abs(hash % aff.size()); Review Comment: Same as above - move what's possible outside of the loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org