ibessonov commented on code in PR #1191:
URL: https://github.com/apache/ignite-3/pull/1191#discussion_r1003051333


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -118,6 +119,17 @@ default IndexStorage getOrCreateIndex(int partitionId, 
UUID indexId) {
      */
     HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId);
 
+    /**
+     * Returns an already created Hash Index with the given descriptor or 
creates a new one if it does not exist.
+     *
+     * @param partitionId Partition ID for which this index has been 
configured.
+     * @param descriptor A description of the index to be created.
+     * @return Hash Index storage.
+     * @throws StorageException If the given partition does not exist, or the 
given index does not exist or is not configured as a
+     *         hash index.
+     */
+    HashIndexStorage getOrCreateHashIndex(int partitionId, HashIndexDescriptor 
descriptor);

Review Comment:
   Should we deprecate the old method, or make it to "getHashIndex()"? What's 
the purpose of this method?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -380,6 +403,50 @@ private SortedIndexDescriptor convert(SortedIndexView 
indexView) {
         );
     }
 
+    private CompletableFuture<BinaryTuple> toIndexKey(TableIndexView 
indexView, BinaryRow tableRow) {

Review Comment:
   This one is very inefficient. We're planning to redo it in the future, right?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -139,6 +139,20 @@ public HashIndexStorage getOrCreateHashIndex(int 
partitionId, UUID indexId) {
         return sortedIndices.getOrCreateStorage(partitionId);
     }
 
+    @Override
+    public HashIndexStorage getOrCreateHashIndex(int partitionId, 
HashIndexDescriptor descriptor) {
+        if (!partitions.containsKey(partitionId)) {
+            throw new StorageException("Partition ID " + partitionId + " does 
not exist");
+        }
+
+        HashIndices sortedIndices = hashIndicesById.computeIfAbsent(

Review Comment:
   Please rename the variable



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PkStorage.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Facade to ease of work with pk constraint storage.
+ *
+ * <p>Encapsulates logic of conversion of ByteBuffer representing the table's 
key to storage key
+ */
+public class PkStorage {
+    private static final BinaryTupleSchema PK_KEY_SCHEMA = 
BinaryTupleSchema.create(new Element[]{

Review Comment:
   I need a better explanation of what's going on. What is `__rawKey`? Why do 
we have a BLOB as an indexed column?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbBinaryTupleComparator.java:
##########
@@ -65,6 +65,14 @@ public int compare(ByteBuffer a, ByteBuffer b) {
         ByteBuffer firstBinaryTupleBuffer = 
a.slice().order(ByteOrder.LITTLE_ENDIAN);
         ByteBuffer secondBinaryTupleBuffer = 
b.slice().order(ByteOrder.LITTLE_ENDIAN);
 
+        if (!firstBinaryTupleBuffer.hasRemaining()) {
+            return -1;
+        }
+
+        if (!secondBinaryTupleBuffer.hasRemaining()) {
+            return 1;
+        }
+

Review Comment:
   What is this? A copy of the code that's already in this method, please 
remove it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PkStorage.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Facade to ease of work with pk constraint storage.
+ *
+ * <p>Encapsulates logic of conversion of ByteBuffer representing the table's 
key to storage key
+ */
+public class PkStorage {
+    private static final BinaryTupleSchema PK_KEY_SCHEMA = 
BinaryTupleSchema.create(new Element[]{
+            new Element(NativeTypes.BYTES, false)
+    });
+
+    /**
+     * Creates a primary key constraint storage for given table.
+     *
+     * @param tableId An identifier of a table to create a constraint storage 
for.
+     * @param storageFactory A storage factory to create a storage by.
+     * @return A storage for pk constraint.
+     */
+    public static PkStorage createPkStorage(UUID tableId, 
Function<HashIndexDescriptor, IndexStorage> storageFactory) {

Review Comment:
   What's up with the factory? Can't we pass a storage?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1910,4 +2030,132 @@ BiFunction<List<Peer>, Long, CompletableFuture<Void>> 
movePartition(Supplier<Raf
     private <T extends ConfigurationProperty<?>> T directProxy(T property) {
         return getMetadataLocallyOnly ? property : 
ConfigurationUtil.directProxy(property);
     }
+
+    private Supplier<List<TableSchemaAwareIndexStorage>> 
indexStorageAdapters(UUID tableId, int partId) {
+        return () -> {
+            List<IndexStorageAdapterFactory> factories = new 
ArrayList<>(indexStorageAdapterFactories
+                    .getOrDefault(tableId, Map.of()).values());
+
+            List<TableSchemaAwareIndexStorage> adapters = new 
ArrayList<>(factories.size());
+
+            for (IndexStorageAdapterFactory factory : factories) {
+                adapters.add(factory.create(partId));
+            }
+
+            return adapters;
+        };
+    }
+
+    private Supplier<List<IndexLocker>> indexesLockers(UUID tableId, int 
partId) {
+        return () -> {
+            List<IndexLockerFactory> factories = new 
ArrayList<>(indexLockerFactories.getOrDefault(tableId, Map.of()).values());
+
+            List<IndexLocker> lockers = new ArrayList<>(factories.size());
+
+            for (IndexLockerFactory factory : factories) {
+                lockers.add(factory.create(partId));
+            }
+
+            return lockers;
+        };
+    }
+
+    private static class HashIndexLocker implements IndexLocker {
+        private final UUID indexId;
+        private final LockManager lockManager;
+        private final Function<BinaryRow, CompletableFuture<BinaryTuple>> 
indexRowResolver;
+
+        private HashIndexLocker(UUID indexId, LockManager lockManager,
+                Function<BinaryRow, CompletableFuture<BinaryTuple>> 
indexRowResolver) {
+            this.indexId = indexId;
+            this.lockManager = lockManager;
+            this.indexRowResolver = indexRowResolver;
+        }
+
+        @Override
+        public CompletableFuture<?> locksForInsert(UUID txId, BinaryRow 
tableRow, RowId rowId) {
+            return indexRowResolver.apply(tableRow)
+                    .thenCompose(tuple ->
+                            lockManager.acquire(txId, new LockKey(indexId, 
tuple.byteBuffer()), LockMode.IX)
+                    );
+        }
+
+        @Override
+        public CompletableFuture<?> locksForRemove(UUID txId, BinaryRow 
tableRow, RowId rowId) {
+            return indexRowResolver.apply(tableRow)
+                    .thenCompose(tuple ->
+                            lockManager.acquire(txId, new LockKey(indexId, 
tuple.byteBuffer()), LockMode.IX)
+                    );
+        }
+    }
+
+    private static class SortedIndexLocker implements IndexLocker {
+        private static final BinaryTuple POSITIVE_INF = new BinaryTuple(
+                BinaryTupleSchema.create(new Element[0]),
+                new BinaryTupleBuilder(0, false).build()
+        );
+
+        private final UUID indexId;
+        private final LockManager lockManager;
+        private final SortedIndexStorage storage;
+        private final Function<BinaryRow, CompletableFuture<BinaryTuple>> 
indexRowResolver;
+
+        private SortedIndexLocker(UUID indexId, LockManager lockManager, 
SortedIndexStorage storage,
+                Function<BinaryRow, CompletableFuture<BinaryTuple>> 
indexRowResolver) {
+            this.indexId = indexId;
+            this.lockManager = lockManager;
+            this.storage = storage;
+            this.indexRowResolver = indexRowResolver;
+        }
+
+        @Override
+        public CompletableFuture<?> locksForInsert(UUID txId, BinaryRow 
tableRow, RowId rowId) {
+            return indexRowResolver.apply(tableRow)
+                    .thenCompose(key -> {
+                        BinaryTuplePrefix prefix = 
BinaryTuplePrefix.fromBinaryTuple(key);
+
+                        // find next key
+                        Cursor<IndexRow> cursor = storage.scan(prefix, null, 
SortedIndexStorage.GREATER);

Review Comment:
   Note for the future: we should add a hint that only one entry will be read. 
This will allow us to make some optimizations in the storage



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1557,6 +1577,103 @@ public CompletableFuture<TableImpl> 
tableAsyncInternal(UUID id, boolean checkCon
         return getTblFut.whenComplete((unused, throwable) -> 
tablesByIdVv.removeWhenComplete(tablesListener));
     }
 
+    /**
+     * Register the index with given id in a table.
+     *
+     * @param tableId A table id to register index in.
+     * @param indexId An index id os the index to register.
+     * @param searchRowResolver Function which converts given table row to an 
index key.
+     */
+    public void registerHashIndex(UUID tableId, UUID indexId, 
Function<BinaryRow, CompletableFuture<BinaryTuple>> searchRowResolver) {

Review Comment:
   Why does it have to be so complicated? Is this because of our weird modules 
dependencies and the fact that indexes are in their own module?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -241,6 +253,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     /** Resolver that resolves a network address to cluster node. */
     private final Function<NetworkAddress, ClusterNode> clusterNodeResolver;
 
+    private final Map<UUID, Map<UUID, IndexStorageAdapterFactory>> 
indexStorageAdapterFactories = new ConcurrentHashMap<>();

Review Comment:
   Should these be inside of the TableImpl or something?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PkStorage.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Facade to ease of work with pk constraint storage.
+ *
+ * <p>Encapsulates logic of conversion of ByteBuffer representing the table's 
key to storage key
+ */
+public class PkStorage {
+    private static final BinaryTupleSchema PK_KEY_SCHEMA = 
BinaryTupleSchema.create(new Element[]{

Review Comment:
   Is this for the case when table has no PK columns? If so, please explicitly 
state that in the description. Otherwise it looks like something very 
questionable



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -599,42 +557,80 @@ private CompletableFuture 
processTxCleanupAction(TxCleanupReplicaRequest request
     }
 
     /**
-     * Returns index id of default {@lonk INDEX_SCAN_ID} index that will be 
used for operation.
+     * Finds the row and its identifier by given pk search row.
      *
-     * @param indexId Index id or {@code null}.
-     * @return Index id.
+     * @param key A bytes representing a primary key.
+     * @param ts A timestamp regarding which we need to resolve the given row.
+     * @param action An action to perform on a resolved row.
+     * @param <T> A type of the value returned by action.
+     * @return Result of the given action.
      */
-    private @NotNull UUID indexIdOrDefault(@Nullable UUID indexId) {
-        return indexId != null ? indexId : indexScanId;
-    }
+    private <T> T resolveRowByPk(
+            ByteBuffer key,
+            HybridTimestamp ts,
+            BiFunction<@Nullable RowId, @Nullable BinaryRow, T> action
+    ) {
+        try (Cursor<RowId> cursor = pkConstraintStorage.get(key)) {
+            for (RowId rowId : cursor) {
+                ReadResult readResult = mvDataStorage.read(rowId, ts);
 
-    /**
-     * Find out a row id by an index.
-     * TODO: IGNITE-17479 Integrate indexes into replicaListener command 
handlers
-     *
-     * @param indexId Index id.
-     * @param key     Key to find.
-     * @return Value or {@code null} if the key does not determine a value.
-     */
-    private RowId rowIdByKey(@NotNull UUID indexId, ByteBuffer key) {
-        if (indexPkId.equals(indexId)) {
-            return primaryIndex.get(key);
-        }
+                BinaryRow row = resolveReadResult(readResult, ts, () -> {
+                    if (readResult.newestCommitTimestamp() == null) {
+                        return null;
+                    }
 
-        if (indexScanId.equals(indexId)) {
-            RowId[] rowIdHolder = new RowId[1];
+                    ReadResult committedReadResult = mvDataStorage.read(rowId, 
readResult.newestCommitTimestamp());
+
+                    assert !committedReadResult.isWriteIntent() :
+                            "The result is not committed [rowId=" + rowId + ", 
timestamp="
+                                    + readResult.newestCommitTimestamp() + ']';
+
+                    return committedReadResult.binaryRow();
+                });
 
-            mvDataStorage.forEach((rowId, binaryRow) -> {
-                if (rowIdHolder[0] == null && 
binaryRow.keySlice().equals(key)) {
-                    rowIdHolder[0] = rowId;
+                if (row != null && row.hasValue()) {
+                    return action.apply(rowId, row);
                 }
-            });
+            }
 
-            return rowIdHolder[0];
+            return action.apply(null, null);
+        } catch (Exception e) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    format("Unable to close cursor [tableId={}]", tableId), e);
         }
+    }
 
-        throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                IgniteStringFormatter.format("The index does not exist 
[indexId={}]", indexId));
+    /**
+     * Finds the row and its identifier by given pk search row.
+     *
+     * @param key A bytes representing a primary key.
+     * @param txId An identifier of the transaction regarding which we need to 
resolve the given row.
+     * @param action An action to perform on a resolved row.
+     * @param <T> A type of the value returned by action.
+     * @return A future object representing the result of the given action.
+     */
+    private <T> CompletableFuture<T> resolveRowByPk(

Review Comment:
   This method has the same name and comment as the previous one. But it does 
some locks magic. Should we rename it and update the documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to