ibessonov commented on code in PR #814:
URL: https://github.com/apache/ignite-3/pull/814#discussion_r881461336


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import java.util.Objects;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reads full data row value (as byte array) from page memory. Supports 
fragmented values occupying more than one slot.
+ *
+ * <p>This works for the cases when the following conditions are satisfied:
+ * 1. Row data starts with a fixed-length header, which is followed by the 
'value'; the 'value' ends the row data
+ * 2. Row data header contains 'value' size as an int at a fixed offset known 
beforehand
+ * 3. The beginning of the header, including the 'value' size, is always 
stored in the first slot (i.e.
+ * {@link Storable#headerSize()} is enough to include 'value' size
+ */
+public abstract class ReadPageMemoryRowValue implements 
PageMemoryTraversal<Void> {
+    /**
+     * First it's {@code true} (this means that we traverse first slots of 
versions of the Version Chain using NextLink);
+     * then it's {@code false} (when we found the version we need and we read 
its value).
+     */
+    private boolean readingFirstSlot = true;
+
+    private int valueSize;
+    /**
+     * Used to collect all the bytes of the target version value.
+     */
+    private byte @Nullable [] allValueBytes;
+    /**
+     * Number of bytes written to {@link #allValueBytes}.
+     */
+    private int transferredBytes = 0;
+
+    {
+        reset();
+    }
+
+    @Override
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Void ignoredArg) {
+        if (readingFirstSlot) {
+            readingFirstSlot = false;
+            return readFullyOrStartReadingFragmented(pageAddr, payload);
+        } else {
+            // we are continuing reading a fragmented row
+            return readNextFragment(pageAddr, payload);
+        }
+    }
+
+    private long readFullyOrStartReadingFragmented(long pageAddr, 
DataPagePayload payload) {
+        valueSize = readValueSize(pageAddr, payload);
+
+        if (!payload.hasMoreFragments()) {
+            return readFully(pageAddr, payload);
+        } else {
+            allValueBytes = new byte[valueSize];
+            transferredBytes = 0;
+
+            readValueFragmentToArray(pageAddr, payload);

Review Comment:
   I believe that this method doesn't skip a header with timestamp and size. 
Are there tests for this case?



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -291,8 +292,12 @@ void 
readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
 
         BinaryRow foundRow = storage.read(rowId, txId);
 
-        assertThat(foundRow, is(notNullValue()));
-        assertThat(foundRow.bytes(), is(equalTo(binaryRow.bytes())));
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    private void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, 
BinaryRow expectedRow) {

Review Comment:
   Usually expected value goes first, why did you swap them?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal that scans Version Chain until first version visible at the given 
timestamp is found; then the version
+ * is converted to {@link ByteBufferRow} and finally made available via {@link 
#result()}.
+ *
+ * <p>NB: this traversal first traverses starting data slots of the Version 
Chain one after another; when it finds the
+ * version it needs, it switches to traversing the slots comprising the 
version (because it might be fragmented).
+ */
+class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
+    /**
+     * Contains the result when the traversal ends.
+     */
+    @Nullable
+    private ByteBufferRow result;
+
+    /**
+     * First it's {@code true} (this means that we traverse first slots of 
versions of the Version Chain using NextLink);
+     * then it's {@code false} (when we found the version we need and we read 
its value).
+     */
+    private boolean lookingForVersion = true;
+
+    private final ReadRowVersionValue readRowVersionValue = new 
ReadRowVersionValue();
+
+    {
+        reset();
+    }
+
+    @Override
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Timestamp timestamp) {
+        if (lookingForVersion) {
+            Timestamp rowVersionTs = Timestamps.readTimestamp(pageAddr, 
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+
+            if (rowTimestampMatches(rowVersionTs, timestamp)) {
+                return readFullyOrStartReadingFragmented(link, pageAddr, 
payload);
+            } else {
+                return advanceToNextVersion(pageAddr, payload, 
partitionIdFromLink(link));
+            }
+        } else {
+            // we are continuing reading a fragmented row

Review Comment:
   I can't link you to the source, but according to our codestyle, comments 
start with capital letter and end with the dot.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal that scans Version Chain until first version visible at the given 
timestamp is found; then the version
+ * is converted to {@link ByteBufferRow} and finally made available via {@link 
#result()}.
+ *
+ * <p>NB: this traversal first traverses starting data slots of the Version 
Chain one after another; when it finds the
+ * version it needs, it switches to traversing the slots comprising the 
version (because it might be fragmented).
+ */
+class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
+    /**
+     * Contains the result when the traversal ends.
+     */
+    @Nullable
+    private ByteBufferRow result;
+
+    /**
+     * First it's {@code true} (this means that we traverse first slots of 
versions of the Version Chain using NextLink);
+     * then it's {@code false} (when we found the version we need and we read 
its value).
+     */
+    private boolean lookingForVersion = true;
+
+    private final ReadRowVersionValue readRowVersionValue = new 
ReadRowVersionValue();
+
+    {

Review Comment:
   Why can't we have regular constructors? This construction is barely ever 
used in Ignite code, and you invoke virtual method in it. Why?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java:
##########
@@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, 
IoStatisticsHolder sta
     }
 
     /**
-     * Returns a row by link. To get the row bytes, more than one pages may be 
traversed (if the corresponding row
-     * was fragmented when stored).
+     * Traverses page memory starting at the given link. At each step, reads 
the current data row and feeds it to the given
+     * {@link PageMemoryTraversal} object which updates itself (usually) and 
returns next link to continue traversal
+     * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop.
      *
      * @param link Row link
-     * @return row object assembled from the row bytes
+     * @param traversal object consuming payloads and controlling the traversal

Review Comment:
   Params descriptions usually start with a capital letters. Please try to 
adapt your code style to what you see in the project, not what technically 
counts as appropriate according to TC. We should all try to write code in 
similar manner, it's very important



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -72,8 +83,17 @@ public PageMemoryMvPartitionStorage(int partitionId, 
TableView tableConfig, Page
 
         groupId = StorageUtils.groupId(tableConfig);
 
-        versionChainFreeList = ((VolatilePageMemoryDataRegion) 
dataRegion).versionChainFreeList();
-        rowVersionFreeList = ((VolatilePageMemoryDataRegion) 
dataRegion).rowVersionFreeList();
+        try {
+            versionChainFreeList = 
createVersionChainFreeList(dataRegion.pageMemory(), groupId, partitionId);

Review Comment:
   Why did you decide to create these lists per partition? It would be better 
to turn them into parameters. Maybe there was a slight misunderstanding between 
us -  volatile region should have shared freelists to better utilize memory, 
it's an optimization that we should do. Persistent regions must have separate 
freelists for each partition. Right now we have separate lists in volatile 
region, which is not what I expect.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.nio.ByteBuffer;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal for reading the latest row version. If the version is 
uncommitted, returns its value; otherwise, does NOT return it.
+ */
+class ReadLatestRowVersion implements 
PageMemoryTraversal<Predicate<Timestamp>> {
+    private RowVersion result;
+
+    private boolean readingFirstSlot = true;
+
+    private long firstFragmentLink;
+    @Nullable
+    private Timestamp timestamp;
+    private long nextLink;
+
+    private final ReadRowVersionValue readRowVersionValue = new 
ReadRowVersionValue();
+
+    {
+        reset();
+    }
+
+    @Override
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload 
payload, Predicate<Timestamp> loadValue) {
+        if (readingFirstSlot) {
+            readingFirstSlot = false;
+            return readFullOrInitiateReadFragmented(link, pageAddr, payload, 
loadValue);
+        } else {
+            return readRowVersionValue.consumePagePayload(link, pageAddr, 
payload, null);
+        }
+    }
+
+    private long readFullOrInitiateReadFragmented(long link, long pageAddr, 
DataPagePayload payload, Predicate<Timestamp> loadValue) {
+        firstFragmentLink = link;
+
+        timestamp = Timestamps.readTimestamp(pageAddr, payload.offset() + 
RowVersion.TIMESTAMP_OFFSET);
+        nextLink = PartitionlessLinks.readFromMemory(pageAddr, 
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+        if (!loadValue.test(timestamp)) {
+            result = new RowVersion(partitionIdFromLink(link), 
firstFragmentLink, timestamp, nextLink, null);
+            return STOP_TRAVERSAL;
+        }
+
+        return readRowVersionValue.consumePagePayload(link, pageAddr, payload, 
null);

Review Comment:
   Same problem might be present here, I don't see where you skip the header



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java:
##########
@@ -38,10 +41,24 @@ public class RowVersion extends StorableBase {
      */
     public static final long NULL_LINK = 0;
 
+    private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
+    private static final int NEXT_LINK_STORE_SIZE_BYTES = 
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+    private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
+
+    public static final int TIMESTAMP_OFFSET = 0;
+    public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES;
+    public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + 
NEXT_LINK_STORE_SIZE_BYTES;
+    public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + 
VALUE_SIZE_STORE_SIZE_BYTES;
+
+    private final int partitionId;
+    private long link;
+
     @Nullable
     private final Timestamp timestamp;
     private final long nextLink;
     private final int valueSize;
+    @IgniteToStringExclude

Review Comment:
   I know that you are against these comments, but please use line separators 
more often, with these changes code becomes more and more inconsistent.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java:
##########
@@ -38,10 +41,24 @@ public class RowVersion extends StorableBase {
      */
     public static final long NULL_LINK = 0;
 
+    private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
+    private static final int NEXT_LINK_STORE_SIZE_BYTES = 
PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+    private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
+
+    public static final int TIMESTAMP_OFFSET = 0;
+    public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES;
+    public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + 
NEXT_LINK_STORE_SIZE_BYTES;
+    public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + 
VALUE_SIZE_STORE_SIZE_BYTES;
+
+    private final int partitionId;

Review Comment:
   Can you please explain, why we need two fields if partition id can be 
derived from the link value?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Code to work with {@link Timestamp}s.
+ */
+public class Timestamps {
+    /**
+     * Reads a {@link Timestamp} value from memory.
+     *
+     * @param pageAddr address where page data starts
+     * @param offset   offset to the timestamp value relative to pageAddr
+     * @return the timestamp
+     */
+    @Nullable
+    static Timestamp readTimestamp(long pageAddr, int offset) {
+        long nodeId = getLong(pageAddr, offset);
+        long localTimestamp = getLong(pageAddr, offset + Long.BYTES);
+
+        Timestamp timestamp = new Timestamp(localTimestamp, nodeId);
+        if (timestamp.equals(RowVersion.NULL_TIMESTAMP)) {
+            timestamp = null;
+        }
+
+        return timestamp;
+    }
+
+    /**
+     * Writes a {@link Timestamp} to memory starting at the given address + 
offset.
+     *
+     * @param addr      memory address
+     * @param offset    offset added to the address
+     * @param timestamp the timestamp to write
+     * @return number of bytes written
+     */
+    public static int writeTimestamp(long addr, int offset, @Nullable 
Timestamp timestamp) {

Review Comment:
   Ok, some methods are public, some are not. Why?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java:
##########
@@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, 
IoStatisticsHolder sta
     }
 
     /**
-     * Returns a row by link. To get the row bytes, more than one pages may be 
traversed (if the corresponding row
-     * was fragmented when stored).
+     * Traverses page memory starting at the given link. At each step, reads 
the current data row and feeds it to the given

Review Comment:
   I would avoid "data row" mentioning in this context. I tried to abstract 
from pages content.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utils to work with Transaction IDs.
+ */
+public class TransactionIds {
+    /**
+     * Writes transaction ID to memory starting at the specified address.
+     *
+     * @param addr   addreds where to start writing
+     * @param offset offset to add to the address
+     * @param txId   transaction ID to write
+     * @return number of bytes written
+     */
+    public static int writeTransactionId(long addr, int offset, @Nullable UUID 
txId) {
+        long txIdHigh;
+        long txIdLow;
+        if (txId != null) {
+            txIdHigh = txId.getMostSignificantBits();
+            txIdLow = txId.getLeastSignificantBits();
+        } else {
+            txIdHigh = VersionChain.NULL_UUID_COMPONENT;
+            txIdLow = VersionChain.NULL_UUID_COMPONENT;
+        }
+
+        putLong(addr, offset, txIdHigh);
+        putLong(addr, offset + Long.BYTES, txIdLow);
+
+        return 2 * Long.BYTES;
+    }
+
+    private TransactionIds() {

Review Comment:
   Why is this necessary?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java:
##########
@@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, 
IoStatisticsHolder sta
     }
 
     /**
-     * Returns a row by link. To get the row bytes, more than one pages may be 
traversed (if the corresponding row
-     * was fragmented when stored).
+     * Traverses page memory starting at the given link. At each step, reads 
the current data row and feeds it to the given
+     * {@link PageMemoryTraversal} object which updates itself (usually) and 
returns next link to continue traversal
+     * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop.

Review Comment:
   You forgot a closing parenthesis.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java:
##########
@@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, 
IoStatisticsHolder sta
     }
 
     /**
-     * Returns a row by link. To get the row bytes, more than one pages may be 
traversed (if the corresponding row
-     * was fragmented when stored).
+     * Traverses page memory starting at the given link. At each step, reads 
the current data row and feeds it to the given
+     * {@link PageMemoryTraversal} object which updates itself (usually) and 
returns next link to continue traversal
+     * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop.
      *
      * @param link Row link
-     * @return row object assembled from the row bytes
+     * @param traversal object consuming payloads and controlling the traversal
+     * @param argument argument that is passed to the traversal
      * @throws IgniteInternalCheckedException If failed
      * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, 
int)
+     * @see PageMemoryTraversal
      */
-    @Nullable
-    public T getRowByLink(final long link) throws 
IgniteInternalCheckedException {
+    public <T> void traverse(final long link, PageMemoryTraversal<T> 
traversal, @Nullable T argument)
+            throws IgniteInternalCheckedException {
         assert link != 0;
 
         int pageSize = pageMemory.realPageSize(groupId);
 
-        ByteArrayOutputStream baos = null;
-
-        long nextLink = link;
+        long currentLink = link;
 
         do {
-            final long pageId = pageId(nextLink);
-
+            final long pageId = pageId(currentLink);
             final long page = pageMemory.acquirePage(groupId, pageId, 
statisticsHolder);
 
             try {
                 long pageAddr = pageMemory.readLock(groupId, pageId, page);
-
-                assert pageAddr != 0L : nextLink;
+                assert pageAddr != 0L : currentLink;
 
                 try {
                     AbstractDataPageIo<?> dataIo = 
pageMemory.ioRegistry().resolve(pageAddr);
 
-                    int itemId = itemId(nextLink);
-
-                    if (handleNonExistentItemsGracefully() && 
!dataIo.itemExists(pageAddr, itemId, pageSize)) {
-                        assert nextLink == link : "It is not first page of a 
fragmented row, but the item does not exist, pageId="
-                                + pageId + ", itemId=" + itemId;
-
-                        return rowForNonExistingItem(pageId, itemId);
-                    }
+                    int itemId = itemId(currentLink);
 
                     DataPagePayload data = dataIo.readPayload(pageAddr, 
itemId, pageSize);
 
-                    if (data.nextLink() == 0 && nextLink == link) {
-                        // Good luck: we can read the row without fragments.
-                        return readRowFromAddress(link, pageAddr + 
data.offset());
-                    }
-
-                    ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
-
-                    dataBuf.position(data.offset());
-                    dataBuf.limit(data.offset() + data.payloadSize());
-
-                    if (baos == null) {
-                        baos = new ByteArrayOutputStream();
-                    }
-
-                    byte[] bytes = new byte[data.payloadSize()];
-                    dataBuf.get(bytes);
-                    try {
-                        baos.write(bytes);
-                    } catch (IOException e) {
-                        throw new IllegalStateException("Should not happen", 
e);
-                    }
-
-                    nextLink = data.nextLink();
+                    currentLink = traversal.consumePagePayload(currentLink, 
pageAddr, data, argument);

Review Comment:
   Ok, it took me a while to understand this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to