valepakh commented on code in PR #3673:
URL: https://github.com/apache/ignite-3/pull/3673#discussion_r1581219208


##########
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.HashPartition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Implementation of {@link PartitionManager} for tables with hash partitions.
+ */
+public class HashPartitionManagerImpl implements 
PartitionManager<HashPartition> {
+    private final InternalTable table;
+
+    private final SchemaRegistry schemaReg;
+
+    private final MarshallersProvider marshallers;
+
+    /**
+     * Constructor.
+     *
+     * @param table Internal table.
+     * @param schemaReg Schema registry.
+     * @param marshallers Marshallers.
+     */
+    public HashPartitionManagerImpl(
+            InternalTable table,
+            SchemaRegistry schemaReg,
+            MarshallersProvider marshallers
+    ) {
+        this.table = table;
+        this.schemaReg = schemaReg;
+        this.marshallers = marshallers;
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> partitionLocationAsync(HashPartition 
partition) {
+        return table.partitionLocation(new TablePartitionId(table.tableId(), 
partition.partitionId()));
+    }
+
+    @Override
+    public CompletableFuture<Map<HashPartition, ClusterNode>> 
allPartitionsAsync() {
+        Map<HashPartition, ClusterNode> resultMap = new ConcurrentHashMap<>();

Review Comment:
   You could remove the need for concurrent container by iterating on both 
arrays after the `allOf`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.HashPartition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Implementation of {@link PartitionManager} for tables with hash partitions.
+ */
+public class HashPartitionManagerImpl implements 
PartitionManager<HashPartition> {
+    private final InternalTable table;
+
+    private final SchemaRegistry schemaReg;
+
+    private final MarshallersProvider marshallers;
+
+    /**
+     * Constructor.
+     *
+     * @param table Internal table.
+     * @param schemaReg Schema registry.
+     * @param marshallers Marshallers.
+     */
+    public HashPartitionManagerImpl(
+            InternalTable table,
+            SchemaRegistry schemaReg,
+            MarshallersProvider marshallers
+    ) {
+        this.table = table;
+        this.schemaReg = schemaReg;
+        this.marshallers = marshallers;
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> partitionLocationAsync(HashPartition 
partition) {
+        return table.partitionLocation(new TablePartitionId(table.tableId(), 
partition.partitionId()));
+    }
+
+    @Override
+    public CompletableFuture<Map<HashPartition, ClusterNode>> 
allPartitionsAsync() {
+        Map<HashPartition, ClusterNode> resultMap = new ConcurrentHashMap<>();
+
+        HashPartition[] allPartitions = IntStream.range(0, table.partitions())
+                .mapToObj(HashPartition::new)
+                .toArray(HashPartition[]::new);
+
+        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[allPartitions.length];
+        for (int i = 0; i < allPartitions.length; i++) {
+            HashPartition partition = allPartitions[i];
+            futures[i] = table.partitionLocation(new 
TablePartitionId(table.tableId(), partition.partitionId()))
+                    .thenAccept(node -> resultMap.put(partition, node));
+        }
+
+        return CompletableFuture.allOf(futures)
+                .thenApply(unused -> resultMap);
+    }
+
+    @Override
+    public <K> CompletableFuture<HashPartition> partitionFromKeyAsync(K key, 
Mapper<K> mapper) {
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(mapper);
+
+        BinaryRowEx keyRow;
+        var marshaller = new KvMarshallerImpl<>(schemaReg.lastKnownSchema(), 
marshallers, mapper, mapper);
+        try {
+            keyRow = marshaller.marshal(key);
+        } catch (MarshallerException e) {
+            throw new IgniteInternalException("Cannot marshal key", e);

Review Comment:
   Deprecated constructors.



##########
modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+
+/**
+ * The partition manager provides the ability to obtain information about 
table partitions.
+ * Then interface can be used to get all partitions of a table,
+ * the location of the primary replica of a partition,
+ * the partition for a specific table key.
+ *
+ * @param <T> Partitioning type.
+ */
+public interface PartitionManager<T> {
+    /**
+     * Returns location of primary replica for provided partition.
+     *
+     * @param partition Partition instance.
+     * @return Cluster node where located primary replica of provided 
partition.

Review Comment:
   ```suggestion
        * @return Cluster node where primary replica of provided partition is 
located.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.HashPartition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Implementation of {@link PartitionManager} for tables with hash partitions.
+ */
+public class HashPartitionManagerImpl implements 
PartitionManager<HashPartition> {
+    private final InternalTable table;
+
+    private final SchemaRegistry schemaReg;
+
+    private final MarshallersProvider marshallers;
+
+    /**
+     * Constructor.
+     *
+     * @param table Internal table.
+     * @param schemaReg Schema registry.
+     * @param marshallers Marshallers.
+     */
+    public HashPartitionManagerImpl(
+            InternalTable table,
+            SchemaRegistry schemaReg,
+            MarshallersProvider marshallers
+    ) {
+        this.table = table;
+        this.schemaReg = schemaReg;
+        this.marshallers = marshallers;
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> partitionLocationAsync(HashPartition 
partition) {
+        return table.partitionLocation(new TablePartitionId(table.tableId(), 
partition.partitionId()));
+    }
+
+    @Override
+    public CompletableFuture<Map<HashPartition, ClusterNode>> 
allPartitionsAsync() {
+        Map<HashPartition, ClusterNode> resultMap = new ConcurrentHashMap<>();
+
+        HashPartition[] allPartitions = IntStream.range(0, table.partitions())
+                .mapToObj(HashPartition::new)
+                .toArray(HashPartition[]::new);
+
+        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[allPartitions.length];
+        for (int i = 0; i < allPartitions.length; i++) {
+            HashPartition partition = allPartitions[i];
+            futures[i] = table.partitionLocation(new 
TablePartitionId(table.tableId(), partition.partitionId()))
+                    .thenAccept(node -> resultMap.put(partition, node));
+        }
+
+        return CompletableFuture.allOf(futures)
+                .thenApply(unused -> resultMap);
+    }
+
+    @Override
+    public <K> CompletableFuture<HashPartition> partitionFromKeyAsync(K key, 
Mapper<K> mapper) {
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(mapper);
+
+        BinaryRowEx keyRow;
+        var marshaller = new KvMarshallerImpl<>(schemaReg.lastKnownSchema(), 
marshallers, mapper, mapper);
+        try {
+            keyRow = marshaller.marshal(key);
+        } catch (MarshallerException e) {
+            throw new IgniteInternalException("Cannot marshal key", e);
+        }
+
+        return CompletableFuture.completedFuture(new 
HashPartition(table.partition(keyRow)));

Review Comment:
   `keyRow` declaration and return could be moved inside the try block as in 
the next method. Also static import would be nice.
   



##########
modules/api/src/main/java/org/apache/ignite/table/criteria/Criteria.java:
##########
@@ -69,6 +69,23 @@ static Expression columnValue(String columnName, Condition 
condition) {
         return new Expression(condition.getOperator(), newElements);
     }
 
+    /**
+     * Creates a predicate that test whether the partition value is supply to 
the give condition.

Review Comment:
   Could you rephrase that - I don't understand what does this mean.



##########
modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java:
##########
@@ -124,6 +125,11 @@ public <T> void visit(Expression expression, @Nullable 
Void context) {
         }
     }
 
+    @Override
+    public void visit(Partition partition, @Nullable Void context) {
+        throw new UnsupportedOperationException("This operation doesn't 
implemented yet.");

Review Comment:
   Could you please create tickets for implementing this if needed?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1846,10 +1837,26 @@ protected CompletableFuture<IgniteBiTuple<ClusterNode, 
Long>> enlist(int partId,
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
         tx.assignCommitPartition(tablePartitionId);
 
+        return partitionMeta(tablePartitionId).thenApply(meta -> {
+            TablePartitionId partGroupId = new TablePartitionId(tableId, 
partId);
+
+            return tx.enlist(partGroupId, new IgniteBiTuple<>(
+                    getClusterNode(meta.getLeaseholder()),
+                    meta.getStartTime().longValue())
+            );
+        });
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId 
partition) {
+        return partitionMeta(partition).thenApply(meta -> 
getClusterNode(meta.getLeaseholder()));
+    }
+
+    private CompletableFuture<ReplicaMeta> partitionMeta(ReplicationGroupId 
partition) {
         HybridTimestamp now = clock.now();
 
         CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
-                tablePartitionId,
+                partition,

Review Comment:
   Let's keep the previous name - `tablePartitionId`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1858,20 +1865,22 @@ protected CompletableFuture<IgniteBiTuple<ClusterNode, 
Long>> enlist(int partId,
         return primaryReplicaFuture.handle((primaryReplica, e) -> {
             if (e != null) {
                 throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
-                        + " [tablePartitionId=" + tablePartitionId + ", 
awaitTimestamp=" + now + ']', e);
+                        + " [tablePartitionId=" + partition + ", 
awaitTimestamp=" + now + ']', e);
             }
 
-            ClusterNode node = 
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
+            return primaryReplica;
+        });
+    }
 
-            if (node == null) {
-                throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
-                        + primaryReplica.getLeaseholder() + ']');
-            }
+    private ClusterNode getClusterNode(String leaserHolder) {

Review Comment:
   ```suggestion
       private ClusterNode getClusterNode(@Nullable String leaserHolder) {
   ```



##########
modules/api/src/main/java/org/apache/ignite/table/criteria/Criteria.java:
##########
@@ -69,6 +69,23 @@ static Expression columnValue(String columnName, Condition 
condition) {
         return new Expression(condition.getOperator(), newElements);
     }
 
+    /**
+     * Creates a predicate that test whether the partition value is supply to 
the give condition.
+     *
+     * @param partition Partition value.
+     * @param condition Target condition.
+     * @return The created expression instance.
+     */
+    static Expression partitionValue(Partition partition, Condition condition) 
{

Review Comment:
   This method is unused - probably should add a test that uses it.



##########
modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+
+/**
+ * The partition manager provides the ability to obtain information about 
table partitions.
+ * Then interface can be used to get all partitions of a table,

Review Comment:
   ```suggestion
    * This interface can be used to get all partitions of a table,
   ```



##########
modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+
+/**
+ * The partition manager provides the ability to obtain information about 
table partitions.
+ * Then interface can be used to get all partitions of a table,
+ * the location of the primary replica of a partition,
+ * the partition for a specific table key.
+ *
+ * @param <T> Partitioning type.
+ */
+public interface PartitionManager<T> {
+    /**
+     * Returns location of primary replica for provided partition.
+     *
+     * @param partition Partition instance.
+     * @return Cluster node where located primary replica of provided 
partition.
+     */
+    CompletableFuture<ClusterNode> partitionLocationAsync(T partition);
+
+    /**
+     * Returns map with all partitions and their locations.
+     *
+     * @return Map from partition to cluster node where located primary 
replica of the partition.

Review Comment:
   ```suggestion
        * @return Map from partition to cluster node where primary replica of 
the partition is located.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -793,14 +788,10 @@ private <R> CompletableFuture<R> 
evaluateReadOnlyPrimaryNode(
 
         CompletableFuture<R> fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
             try {
-                ClusterNode node = 
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
-
-                if (node == null) {
-                    throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
-                            + primaryReplica.getLeaseholder() + ']');
-                }
-
-                return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
+                return replicaSvc.invoke(
+                        getClusterNode(primaryReplica.getLeaseholder()),
+                        op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue())
+                );

Review Comment:
   ```suggestion
                   ClusterNode node = 
getClusterNode(primaryReplica.getLeaseholder());
   
                   return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java:
##########
@@ -486,4 +487,12 @@ Publisher<BinaryRow> lookup(
      * @return Streamer flush executor.
      */
     ScheduledExecutorService streamerFlushExecutor();
+
+    /**
+     * Returns cluster node {@link ClusterNode} where located primary replica 
of replication group.

Review Comment:
   ```suggestion
        * Returns {@link ClusterNode} where primary replica of replication 
group is located.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to