ibessonov commented on code in PR #1652:
URL: https://github.com/apache/ignite-3/pull/1652#discussion_r1102834310


##########
modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java:
##########
@@ -2367,6 +2372,138 @@ void testFindNext() throws Exception {
         assertEquals(0L, tree.findNext(-1L, true));
     }
 
+    @Test
+    void testFindOneWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        assertEquals("row0", tree.findOne(0L, row -> "row" + row));
+        assertEquals("rownull", tree.findOne(1L, row -> "row" + row));
+    }
+
+    @Test
+    void testFindWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+        tree.put(1L);
+
+        PeekTreeRowCursor<Long, String> cursor = tree.find(null, null, row -> 
"row" + row);
+
+        assertNull(cursor.peek());
+
+        assertTrue(cursor.hasNext());
+        assertEquals(0L, cursor.peek());
+        assertEquals("row0", cursor.next());
+
+        assertTrue(cursor.hasNext());
+        assertEquals(1L, cursor.peek());
+        assertEquals("row1", cursor.next());
+
+        assertFalse(cursor.hasNext());
+        assertNull(cursor.peek());
+        assertThrows(NoSuchElementException.class, cursor::next);
+    }
+
+    @Test
+    void testInvokeClosureWithOnUpdateCallbackForPut() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        // Checks insert.
+        CompletableFuture<Void> future0 = new CompletableFuture<>();

Review Comment:
   Are you sure that the future is the best solution for the test? Looks like 
an overkill to me.
   You can use flag, for example, or you can use mockito to ensure that method 
has been called (although I wouldn't recommend it in this case)



##########
modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java:
##########
@@ -2367,6 +2372,138 @@ void testFindNext() throws Exception {
         assertEquals(0L, tree.findNext(-1L, true));
     }
 
+    @Test
+    void testFindOneWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        assertEquals("row0", tree.findOne(0L, row -> "row" + row));
+        assertEquals("rownull", tree.findOne(1L, row -> "row" + row));
+    }
+
+    @Test
+    void testFindWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+        tree.put(1L);
+
+        PeekTreeRowCursor<Long, String> cursor = tree.find(null, null, row -> 
"row" + row);
+
+        assertNull(cursor.peek());
+
+        assertTrue(cursor.hasNext());
+        assertEquals(0L, cursor.peek());
+        assertEquals("row0", cursor.next());
+
+        assertTrue(cursor.hasNext());
+        assertEquals(1L, cursor.peek());
+        assertEquals("row1", cursor.next());
+
+        assertFalse(cursor.hasNext());
+        assertNull(cursor.peek());
+        assertThrows(NoSuchElementException.class, cursor::next);
+    }
+
+    @Test
+    void testInvokeClosureWithOnUpdateCallbackForPut() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        // Checks insert.
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+
+        tree.invoke(0L, null, new InvokeClosure<>() {
+            @Override
+            public void call(@Nullable Long oldRow) {
+                assertNull(oldRow);
+            }
+
+            @Override
+            public @Nullable Long newRow() {
+                return 0L;
+            }
+
+            @Override
+            public OperationType operationType() {
+                return PUT;
+            }
+
+            @Override
+            public void onUpdate() {
+                future0.complete(null);
+            }
+        });
+
+        assertThat(future0, willCompleteSuccessfully());
+
+        assertEquals(0L, tree.findOne(0L));
+
+        // Checks replace.
+        CompletableFuture<Void> future1 = new CompletableFuture<>();

Review Comment:
   Same here



##########
modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java:
##########
@@ -2367,6 +2371,157 @@ void testFindNext() throws Exception {
         assertEquals(0L, tree.findNext(-1L, true));
     }
 
+    @Test
+    void testFindOneWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        TreeRowClosure<Long, Long> treeRowClosure = new TreeRowClosure<>() {
+            @Override
+            public boolean apply(BplusTree<Long, Long> tree, BplusIo<Long> io, 
long pageAddr, int idx) {
+                return true;
+            }
+
+            @Override
+            public String map(Long treeRow) {

Review Comment:
   I see that this is a closure with `<Long, Long>`. Where's the `String` in 
that signature?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java:
##########
@@ -1270,12 +1270,14 @@ public Cursor<T> find(
      * @param upper Upper bound or {@code null} if unbounded.
      * @param lowIncl {@code true} if lower bound is inclusive.
      * @param upIncl {@code true} if upper bound is inclusive.
-     * @param c Filter closure.
+     * @param c Tree row closure.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @return Cursor.
-     * @throws IgniteInternalCheckedException If failed.
+     * @throws CorruptedDataStructureException If the data structure is broken.
+     * @throws CorruptedTreeException If there were {@link RuntimeException} 
or {@link AssertionError}.
+     * @throws IgniteInternalCheckedException If other errors occurred.
      */
-    public Cursor<T> find(
+    public <R> Cursor<R> find(

Review Comment:
   Same question as before - R is an arbitrary type, it's not good enough, 
there's no type safety whatsoever



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java:
##########
@@ -6117,35 +6194,37 @@ private void iterate() throws 
IgniteInternalCheckedException {
     /**
      * Forward cursor.
      */
-    private final class ForwardCursor extends AbstractForwardCursor implements 
Cursor<T> {
+    private final class ForwardCursor<R> extends AbstractForwardCursor 
implements Cursor<R> {
         /** Implementation specific argument. */
-        @Nullable
-        final Object arg;
+        private final @Nullable Object arg;

Review Comment:
   Why do you make these fields private?



##########
modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java:
##########
@@ -2367,6 +2372,138 @@ void testFindNext() throws Exception {
         assertEquals(0L, tree.findNext(-1L, true));
     }
 
+    @Test
+    void testFindOneWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        assertEquals("row0", tree.findOne(0L, row -> "row" + row));
+        assertEquals("rownull", tree.findOne(1L, row -> "row" + row));
+    }
+
+    @Test
+    void testFindWithMapper() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+        tree.put(1L);
+
+        PeekTreeRowCursor<Long, String> cursor = tree.find(null, null, row -> 
"row" + row);
+
+        assertNull(cursor.peek());
+
+        assertTrue(cursor.hasNext());
+        assertEquals(0L, cursor.peek());
+        assertEquals("row0", cursor.next());
+
+        assertTrue(cursor.hasNext());
+        assertEquals(1L, cursor.peek());
+        assertEquals("row1", cursor.next());
+
+        assertFalse(cursor.hasNext());
+        assertNull(cursor.peek());
+        assertThrows(NoSuchElementException.class, cursor::next);
+    }
+
+    @Test
+    void testInvokeClosureWithOnUpdateCallbackForPut() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        // Checks insert.
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+
+        tree.invoke(0L, null, new InvokeClosure<>() {
+            @Override
+            public void call(@Nullable Long oldRow) {
+                assertNull(oldRow);
+            }
+
+            @Override
+            public @Nullable Long newRow() {
+                return 0L;
+            }
+
+            @Override
+            public OperationType operationType() {
+                return PUT;
+            }
+
+            @Override
+            public void onUpdate() {
+                future0.complete(null);
+            }
+        });
+
+        assertThat(future0, willCompleteSuccessfully());
+
+        assertEquals(0L, tree.findOne(0L));
+
+        // Checks replace.
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+
+        tree.invoke(0L, null, new InvokeClosure<>() {
+            @Override
+            public void call(@Nullable Long oldRow) {
+                assertEquals(0L, oldRow);
+            }
+
+            @Override
+            public @Nullable Long newRow() {
+                return 0L;
+            }
+
+            @Override
+            public OperationType operationType() {
+                return PUT;
+            }
+
+            @Override
+            public void onUpdate() {
+                future1.complete(null);
+            }
+        });
+
+        assertThat(future1, willCompleteSuccessfully());
+
+        assertEquals(0L, tree.findOne(0L));
+    }
+
+    @Test
+    void testInvokeClosureWithOnUpdateCallbackForRemove() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        tree.put(0L);
+
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review Comment:
   And here



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java:
##########
@@ -6117,35 +6194,37 @@ private void iterate() throws 
IgniteInternalCheckedException {
     /**
      * Forward cursor.
      */
-    private final class ForwardCursor extends AbstractForwardCursor implements 
Cursor<T> {
+    private final class ForwardCursor<R> extends AbstractForwardCursor 
implements Cursor<R> {
         /** Implementation specific argument. */
-        @Nullable
-        final Object arg;
+        private final @Nullable Object arg;
 
-        /** Rows. */
-        @Nullable
-        private T[] rows = (T[]) OBJECT_EMPTY_ARRAY;
+        private @Nullable R @Nullable [] results = (R[]) OBJECT_EMPTY_ARRAY;

Review Comment:
   Please document that `null` means the end of iteration, and that's why it's 
started with empty array. It wasn't obvious to me



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#abortWrite(RowId)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AbortWriteInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private OperationType operationType;
+
+    private @Nullable VersionChain newRow;
+
+    private @Nullable RowVersion toRemove;
+
+    private @Nullable BinaryRow previousUncommittedRowVersion;
+
+    AbortWriteInvokeClosure(RowId rowId, AbstractPageMemoryMvPartitionStorage 
storage) {
+        this.rowId = rowId;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow == null || oldRow.transactionId() == null) {
+            // Row doesn't exist or the chain doesn't contain an uncommitted 
write intent.
+            operationType = OperationType.NOOP;
+
+            return;
+        }
+
+        RowVersion latestVersion = storage.readRowVersion(oldRow.headLink(), 
ALWAYS_LOAD_VALUE);
+
+        assert latestVersion.isUncommitted();
+
+        toRemove = latestVersion;
+
+        if (latestVersion.hasNextLink()) {
+            // Next can be safely replaced with any value (like 0), because 
this field is only used when there
+            // is some uncommitted value, but when we add an uncommitted 
value, we 'fix' such placeholder value
+            // (like 0) by replacing it with a valid value.
+            newRow = VersionChain.createCommitted(rowId, 
latestVersion.nextLink(), NULL_LINK);
+
+            operationType = OperationType.PUT;
+        } else {
+            // It was the only version, let's remove the chain as well.
+            operationType = OperationType.REMOVE;
+        }
+
+        previousUncommittedRowVersion = 
storage.rowVersionToBinaryRow(latestVersion);
+    }
+
+    @Override
+    public @Nullable VersionChain newRow() {
+        assert operationType == OperationType.PUT ? newRow != null : newRow == 
null : "newRow=" + newRow + ", op=" + operationType;
+
+        return newRow;
+    }
+
+    @Override
+    public OperationType operationType() {
+        assert operationType != null;
+
+        return operationType;
+    }
+
+    @Override
+    public void onUpdate() {
+        assert operationType == OperationType.NOOP ? toRemove == null : 
toRemove != null : "toRemove=" + toRemove + ", op=" + operationType;
+
+        if (toRemove != null) {
+            storage.removeRowVersion(toRemove);
+        }

Review Comment:
   I don't like the fact that we're cleaning a free-list while holding a write 
lock. We can do it while NOT holding the lock and the code would still be safe, 
right? That's because no one else sees the link that you're deleting. At least 
that's the idea. This is how it's done in invoke closures for indexes, for 
example



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -944,15 +862,67 @@ public void finishCleanup() {
         }
     }
 
-    @Nullable VersionChain readVersionChain(RowId rowId) {
+    void throwExceptionIfStorageNotInRunnableState() {
+        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    }
+
+    /**
+     * Searches version chain by row ID and converts the found version chain 
to the result if found.
+     *
+     * @param rowId Row ID.
+     * @param function Function for converting the version chain to a result, 
function is executed under the read lock of the page on which
+     *      the version chain is located. If the version chain is not found, 
then {@code null} will be passed to the function.
+     */
+    <T> @Nullable T findVersionChain(RowId rowId, Function<VersionChain, T> 
function) {
         try {
-            return versionChainTree.findOne(new VersionChainKey(rowId));
+            return versionChainTree.findOne(new VersionChainKey(rowId), new 
TreeRowClosure<>() {
+                @Override
+                public boolean apply(BplusTree<VersionChainKey, VersionChain> 
tree, BplusIo<VersionChainKey> io, long pageAddr, int idx) {
+                    return true;
+                }
+
+                @Override
+                public T map(VersionChain treeRow) {
+                    return function.apply(treeRow);
+                }
+            }, null);
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error getting version chain: 
[rowId={}, {}]", e, rowId, createStorageInfo());
+            if (e.getCause() instanceof StorageException) {
+                throw (StorageException) e.getCause();
+            }
+
+            throw new StorageException("Row version lookup failed: [rowId={}, 
{}]", e, rowId, createStorageInfo());
         }
     }
 
-    void throwExceptionIfStorageNotInRunnableState() {
-        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    /**
+     * Organizes external synchronization of update operations for the same 
version chain.
+     *
+     * <p>NOTE: When you try to execute in the closures on the pages of the 
tree, it leads to a deadlock.
+     */
+    protected <T> T inUpdateVersionChainLock(RowId rowId, Supplier<T> 
supplier) {
+        LockHolder<ReentrantLock> lockHolder = 
updateVersionChainLockByRowId.compute(rowId, (rowId1, reentrantLockLockHolder) 
-> {

Review Comment:
   Maybe the entire map should be encapsulated into its own class, code would 
be really simple:
   ```
   locker.lock(rowId);
   ...
   locker.unlock(rowId);
   ```
   Anyway, I see no distinction between readings and writings. Do we only lock 
stuff when we need updates? That's nice!



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -944,15 +862,67 @@ public void finishCleanup() {
         }
     }
 
-    @Nullable VersionChain readVersionChain(RowId rowId) {
+    void throwExceptionIfStorageNotInRunnableState() {
+        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    }
+
+    /**
+     * Searches version chain by row ID and converts the found version chain 
to the result if found.
+     *
+     * @param rowId Row ID.
+     * @param function Function for converting the version chain to a result, 
function is executed under the read lock of the page on which
+     *      the version chain is located. If the version chain is not found, 
then {@code null} will be passed to the function.
+     */
+    <T> @Nullable T findVersionChain(RowId rowId, Function<VersionChain, T> 
function) {

Review Comment:
   Can you provide a better name, other than `function`?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -944,15 +862,67 @@ public void finishCleanup() {
         }
     }
 
-    @Nullable VersionChain readVersionChain(RowId rowId) {
+    void throwExceptionIfStorageNotInRunnableState() {
+        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    }
+
+    /**
+     * Searches version chain by row ID and converts the found version chain 
to the result if found.
+     *
+     * @param rowId Row ID.
+     * @param function Function for converting the version chain to a result, 
function is executed under the read lock of the page on which
+     *      the version chain is located. If the version chain is not found, 
then {@code null} will be passed to the function.
+     */
+    <T> @Nullable T findVersionChain(RowId rowId, Function<VersionChain, T> 
function) {
         try {
-            return versionChainTree.findOne(new VersionChainKey(rowId));
+            return versionChainTree.findOne(new VersionChainKey(rowId), new 
TreeRowClosure<>() {
+                @Override
+                public boolean apply(BplusTree<VersionChainKey, VersionChain> 
tree, BplusIo<VersionChainKey> io, long pageAddr, int idx) {
+                    return true;

Review Comment:
   My return true in base implementation



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -944,15 +862,67 @@ public void finishCleanup() {
         }
     }
 
-    @Nullable VersionChain readVersionChain(RowId rowId) {
+    void throwExceptionIfStorageNotInRunnableState() {
+        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    }
+
+    /**
+     * Searches version chain by row ID and converts the found version chain 
to the result if found.
+     *
+     * @param rowId Row ID.
+     * @param function Function for converting the version chain to a result, 
function is executed under the read lock of the page on which
+     *      the version chain is located. If the version chain is not found, 
then {@code null} will be passed to the function.
+     */
+    <T> @Nullable T findVersionChain(RowId rowId, Function<VersionChain, T> 
function) {
         try {
-            return versionChainTree.findOne(new VersionChainKey(rowId));
+            return versionChainTree.findOne(new VersionChainKey(rowId), new 
TreeRowClosure<>() {
+                @Override
+                public boolean apply(BplusTree<VersionChainKey, VersionChain> 
tree, BplusIo<VersionChainKey> io, long pageAddr, int idx) {
+                    return true;
+                }
+
+                @Override
+                public T map(VersionChain treeRow) {
+                    return function.apply(treeRow);
+                }
+            }, null);
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error getting version chain: 
[rowId={}, {}]", e, rowId, createStorageInfo());
+            if (e.getCause() instanceof StorageException) {
+                throw (StorageException) e.getCause();
+            }
+
+            throw new StorageException("Row version lookup failed: [rowId={}, 
{}]", e, rowId, createStorageInfo());
         }
     }
 
-    void throwExceptionIfStorageNotInRunnableState() {
-        StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(), 
this::createStorageInfo);
+    /**
+     * Organizes external synchronization of update operations for the same 
version chain.
+     *
+     * <p>NOTE: When you try to execute in the closures on the pages of the 
tree, it leads to a deadlock.

Review Comment:
   I don't get this note. What's its purpose?
   Generally speaking, if you get locks in the wrong order, there's a chance of 
having a deadlock. That's a  common knowledge. I recommend documenting an 
overall locks model somewhere, instead of doing such notes in seemingly random 
places



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java:
##########
@@ -164,28 +152,27 @@ public void close() {
     abstract ReadResult findRowVersion(VersionChain versionChain);
 
     private void createVersionChainCursorIfMissing() {
-        if (versionChainCursor != null) {
+        if (cursor != null) {
             return;
         }
 
         try {
-            versionChainCursor = storage.versionChainTree.find(null, null, 
(tree, io, pageAddr, idx) -> {
-                // Since the BplusTree cursor caches rows that are on the same 
page, we should try to get actual ReadResult for them in this
-                // filter so as not to get into a situation when we read the 
chain and the links in it are no longer valid.
-
-                VersionChain versionChain = tree.getRow(io, pageAddr, idx);
-
-                // TODO: IGNITE-18717 Perhaps add lock by rowId
-
-                ReadResult readResult = findRowVersion(versionChain);
-
-                if (!readResult.isEmpty()) {
-                    readResultByRowId.put(versionChain.rowId(), readResult);
+            cursor = storage.versionChainTree.find(null, null, new 
TreeRowClosure<>() {
+                @Override
+                public boolean apply(BplusTree<VersionChainKey, VersionChain> 
tree, BplusIo<VersionChainKey> io, long pageAddr, int idx) {
+                    return true;
                 }
 
-                return true;
+                @Override
+                public ReadResult map(VersionChain treeRow) {
+                    return findRowVersion(treeRow);
+                }
             }, null);
         } catch (IgniteInternalCheckedException e) {
+            if (e.getCause() instanceof StorageException) {
+                throw (StorageException) e.getCause();
+            }

Review Comment:
   We should probably extract a method, such check is pretty common here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for
+ * {@link AbstractPageMemoryMvPartitionStorage#addWriteCommitted(RowId, 
BinaryRow, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteCommittedInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final @Nullable BinaryRow row;
+
+    private final HybridTimestamp commitTimestamp;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable VersionChain newRow;
+
+    AddWriteCommittedInvokeClosure(
+            RowId rowId,
+            @Nullable BinaryRow row,
+            HybridTimestamp commitTimestamp,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.rowId = rowId;
+        this.row = row;
+        this.commitTimestamp = commitTimestamp;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow != null && oldRow.isUncommitted()) {
+            // This means that there is a bug in our code as the caller must 
make sure that no write intent exists below this write.
+            throw new StorageException("Write intent exists: [rowId={}, {}]", 
oldRow.rowId(), storage.createStorageInfo());

Review Comment:
   For every thrown exception, we should have some error code. Why do you 
ignore them?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for
+ * {@link AbstractPageMemoryMvPartitionStorage#addWriteCommitted(RowId, 
BinaryRow, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteCommittedInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final @Nullable BinaryRow row;
+
+    private final HybridTimestamp commitTimestamp;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable VersionChain newRow;
+
+    AddWriteCommittedInvokeClosure(
+            RowId rowId,
+            @Nullable BinaryRow row,
+            HybridTimestamp commitTimestamp,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.rowId = rowId;
+        this.row = row;
+        this.commitTimestamp = commitTimestamp;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow != null && oldRow.isUncommitted()) {

Review Comment:
   Can we replace it with the assertion?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, 
int)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} and {@link 
TxIdMismatchException} which will cause form
+ * {@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final @Nullable BinaryRow row;
+
+    private final UUID txId;
+
+    private final UUID commitTableId;
+
+    private final int commitPartitionId;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable VersionChain newRow;
+
+    private @Nullable BinaryRow previousUncommittedRowVersion;
+
+    private @Nullable RowVersion toRemove;
+
+    AddWriteInvokeClosure(
+            RowId rowId,
+            @Nullable BinaryRow row,
+            UUID txId,
+            UUID commitTableId,
+            int commitPartitionId,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.rowId = rowId;
+        this.row = row;
+        this.txId = txId;
+        this.commitTableId = commitTableId;
+        this.commitPartitionId = commitPartitionId;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow == null) {
+            RowVersion newVersion = insertRowVersion(row, NULL_LINK);
+
+            newRow = VersionChain.createUncommitted(rowId, txId, 
commitTableId, commitPartitionId, newVersion.link(), NULL_LINK);
+
+            return;
+        }
+
+        if (oldRow.isUncommitted()) {
+            throwIfChainBelongsToAnotherTx(oldRow, txId);
+        }
+
+        RowVersion newVersion = insertRowVersion(row, 
oldRow.newestCommittedLink());
+
+        if (oldRow.isUncommitted()) {
+            RowVersion currentVersion = 
storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE);
+
+            previousUncommittedRowVersion = 
storage.rowVersionToBinaryRow(currentVersion);
+
+            // As we replace an uncommitted version with new one, we need to 
remove old uncommitted version.
+            toRemove = currentVersion;
+        }
+
+        newRow = VersionChain.createUncommitted(rowId, txId, commitTableId, 
commitPartitionId, newVersion.link(), newVersion.nextLink());
+    }
+
+    @Override
+    public @Nullable VersionChain newRow() {
+        assert newRow != null;
+
+        return newRow;
+    }
+
+    @Override
+    public OperationType operationType() {
+        return OperationType.PUT;
+    }
+
+    @Override
+    public void onUpdate() {
+        if (toRemove != null) {
+            storage.removeRowVersion(toRemove);

Review Comment:
   Again, I recommend removing the data when you don't hold any locks.
   Maybe you can even read the value when holding no locks, it should be safe.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, 
int)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} and {@link 
TxIdMismatchException} which will cause form
+ * {@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteInvokeClosure implements InvokeClosure<VersionChain> {
+    private final RowId rowId;
+
+    private final @Nullable BinaryRow row;
+
+    private final UUID txId;
+
+    private final UUID commitTableId;
+
+    private final int commitPartitionId;
+
+    private final AbstractPageMemoryMvPartitionStorage storage;
+
+    private @Nullable VersionChain newRow;
+
+    private @Nullable BinaryRow previousUncommittedRowVersion;
+
+    private @Nullable RowVersion toRemove;
+
+    AddWriteInvokeClosure(
+            RowId rowId,
+            @Nullable BinaryRow row,
+            UUID txId,
+            UUID commitTableId,
+            int commitPartitionId,
+            AbstractPageMemoryMvPartitionStorage storage
+    ) {
+        this.rowId = rowId;
+        this.row = row;
+        this.txId = txId;
+        this.commitTableId = commitTableId;
+        this.commitPartitionId = commitPartitionId;
+        this.storage = storage;
+    }
+
+    @Override
+    public void call(@Nullable VersionChain oldRow) throws 
IgniteInternalCheckedException {
+        if (oldRow == null) {
+            RowVersion newVersion = insertRowVersion(row, NULL_LINK);
+
+            newRow = VersionChain.createUncommitted(rowId, txId, 
commitTableId, commitPartitionId, newVersion.link(), NULL_LINK);
+
+            return;
+        }
+
+        if (oldRow.isUncommitted()) {
+            throwIfChainBelongsToAnotherTx(oldRow, txId);

Review Comment:
   You don't need to pass `txId` because it's a field. Or this method must be 
static for some reason?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link 
AbstractPageMemoryMvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs 
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be 
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form 
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.

Review Comment:
   I'd prefer a list of error codes as well



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java:
##########
@@ -65,36 +61,32 @@ public boolean hasNext() {
 
             createVersionChainCursorIfMissing();
 
-            currentChain = null;
+            currentRowId = null;
 
             while (true) {
-                if (!versionChainCursor.hasNext()) {
+                if (!cursor.hasNext()) {
                     iterationExhausted = true;
 
                     return false;
                 }
 
-                VersionChain chain = versionChainCursor.next();
-
-                ReadResult result = readResultByRowId.remove(chain.rowId());
+                ReadResult result = cursor.next();
 
-                if (result == null) {
-                    // TODO: IGNITE-18717 Add lock by rowId
-                    chain = storage.readVersionChain(chain.rowId());
+                RowId rowId = result.rowId();
 
-                    if (chain == null) {
-                        continue;
-                    }
-
-                    result = findRowVersion(chain);
+                if (result.isEmpty()) {

Review Comment:
   Total lack of comments makes this code difficult to read and understand.
   For some reason, my guts tell me that this code might be problematic, but I 
don't understand what it does :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to