ibessonov commented on code in PR #1102:
URL: https://github.com/apache/ignite-3/pull/1102#discussion_r977495704


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage#read} result.
+ */
+public class ReadResult {
+    /** Unset commit partition id value. */
+    public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
+
+    /** Empty read result. */
+    private static final ReadResult EMPTY = new ReadResult(null, null, null, 
null, UNDEFINED_COMMIT_PARTITION_ID);
+
+    /** Data. */
+    private final @Nullable BinaryRow binaryRow;
+
+    /** Transaction id. Not {@code null} iff this is a write-intent. */
+    private final @Nullable UUID transactionId;
+
+    /** Commit table id. Not {@code null} iff this is a write-intent. */
+    private final @Nullable UUID commitTableId;
+
+    /** Commit table id. If this is not a write-intent it is equal to {@link 
#UNDEFINED_COMMIT_PARTITION_ID}. */
+    private final int commitPartitionId;
+
+    /**
+     * Timestamp of the newest commit of the data. Not {@code null} iff 
committed version exists, this is a
+     * write-intent and read was made with a timestamp.
+     */
+    private final @Nullable HybridTimestamp newestCommitTs;
+
+    private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId, 
@Nullable UUID commitTableId,
+            @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+        this.binaryRow = binaryRow;
+
+        // If transaction is not null, then commitTableId and 
commitPartitionId should be defined.
+        assert (transactionId == null) || (commitTableId != null && 
commitPartitionId != -1);
+
+        // If transaction id is null, then commitTableId and commitPartitionId 
should not be defined.
+        assert (transactionId != null) || (commitTableId == null && 
commitPartitionId == -1);
+
+        this.transactionId = transactionId;
+        this.commitTableId = commitTableId;
+        this.newestCommitTs = newestCommitTs;
+        this.commitPartitionId = commitPartitionId;
+    }
+
+    public static ReadResult createFromWriteIntent(BinaryRow binaryRow, UUID 
transactionId, UUID commitTableId,
+            @Nullable HybridTimestamp lastCommittedTimestamp, int 
commitPartitionId) {
+        return new ReadResult(binaryRow, transactionId, commitTableId, 
lastCommittedTimestamp, commitPartitionId);
+    }
+
+    public static ReadResult createFromCommitted(BinaryRow binaryRow) {
+        return new ReadResult(binaryRow, null, null, null, 
UNDEFINED_COMMIT_PARTITION_ID);
+    }
+
+    public static ReadResult empty() {

Review Comment:
   I don't really understand why you need both constant and method.
   It reminds me something like `emptyList`, but in that case the reason is 
clear.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -280,38 +302,129 @@ private void throwIfChainBelongsToAnotherTx(VersionChain 
versionChain, UUID txId
         assert transactionId != null ^ timestamp != null;
 
         if (transactionId != null) {
-            return findLatestRowVersion(versionChain, transactionId, 
keyFilter);
+            ReadResult res = findLatestRowVersion(versionChain, transactionId, 
keyFilter);
+
+            if (res == null) {
+                return null;
+            }
+
+            return res.binaryRow();
         } else {
-            ByteBufferRow row = findRowVersionByTimestamp(versionChain, 
timestamp);
+            ReadResult res = findRowVersionByTimestamp(versionChain, 
timestamp);
+
+            if (res == null) {
+                return null;
+            }
+
+            BinaryRow row = res.binaryRow();
 
             return keyFilter.test(row) ? row : null;
         }
     }
 
-    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain 
versionChain, HybridTimestamp timestamp) {
-        if (!versionChain.hasCommittedVersions()) {
-            return null;
+    private ReadResult findRowVersionByTimestamp(VersionChain versionChain, 
HybridTimestamp timestamp) {
+        long headLink = versionChain.headLink();
+
+        if (versionChain.isUncommitted()) {
+            if (!versionChain.hasCommittedVersions()) {
+                RowVersion rowVersion = readRowVersion(headLink, 
ALWAYS_LOAD_VALUE);
+
+                assert rowVersion.isUncommitted();
+
+                UUID transactionId = versionChain.transactionId();
+                UUID commitTableId = versionChain.commitTableId();
+                int commitPartitionId = versionChain.commitPartitionId();
+
+                BinaryRow row;
+
+                if (rowVersion.isTombstone()) {
+                    row = null;
+                } else {
+                    row = new ByteBufferRow(rowVersion.value());
+                }
+
+                return ReadResult.createFromWriteIntent(row, transactionId, 
commitTableId, null, commitPartitionId);
+            }
         }
 
-        long newestCommittedLink = versionChain.newestCommittedLink();
+        return walkVersionChain(versionChain, timestamp);
+    }
 
-        ScanVersionChainByTimestamp scanByTimestamp = new 
ScanVersionChainByTimestamp(partitionId);
+    private ReadResult walkVersionChain(VersionChain chainHead, 
HybridTimestamp timestamp) {
+        assert chainHead.hasCommittedVersions();
 
-        try {
-            rowVersionDataPageReader.traverse(newestCommittedLink, 
scanByTimestamp, timestamp);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot search for a row version", e);
+        boolean hasWriteIntent = chainHead.isUncommitted();
+
+        RowVersion firstCommit;
+
+        if (hasWriteIntent) {
+            firstCommit = readRowVersion(chainHead.nextLink(), rowTimestamp -> 
timestamp.compareTo(rowTimestamp) >= 0);

Review Comment:
   I also have bad feelings about this condition



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -280,38 +302,129 @@ private void throwIfChainBelongsToAnotherTx(VersionChain 
versionChain, UUID txId
         assert transactionId != null ^ timestamp != null;
 
         if (transactionId != null) {
-            return findLatestRowVersion(versionChain, transactionId, 
keyFilter);
+            ReadResult res = findLatestRowVersion(versionChain, transactionId, 
keyFilter);
+
+            if (res == null) {
+                return null;
+            }
+
+            return res.binaryRow();
         } else {
-            ByteBufferRow row = findRowVersionByTimestamp(versionChain, 
timestamp);
+            ReadResult res = findRowVersionByTimestamp(versionChain, 
timestamp);
+
+            if (res == null) {
+                return null;
+            }
+
+            BinaryRow row = res.binaryRow();
 
             return keyFilter.test(row) ? row : null;
         }
     }
 
-    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain 
versionChain, HybridTimestamp timestamp) {
-        if (!versionChain.hasCommittedVersions()) {
-            return null;
+    private ReadResult findRowVersionByTimestamp(VersionChain versionChain, 
HybridTimestamp timestamp) {
+        long headLink = versionChain.headLink();
+
+        if (versionChain.isUncommitted()) {
+            if (!versionChain.hasCommittedVersions()) {
+                RowVersion rowVersion = readRowVersion(headLink, 
ALWAYS_LOAD_VALUE);
+
+                assert rowVersion.isUncommitted();
+
+                UUID transactionId = versionChain.transactionId();
+                UUID commitTableId = versionChain.commitTableId();
+                int commitPartitionId = versionChain.commitPartitionId();
+
+                BinaryRow row;
+
+                if (rowVersion.isTombstone()) {
+                    row = null;
+                } else {
+                    row = new ByteBufferRow(rowVersion.value());
+                }
+
+                return ReadResult.createFromWriteIntent(row, transactionId, 
commitTableId, null, commitPartitionId);
+            }
         }
 
-        long newestCommittedLink = versionChain.newestCommittedLink();
+        return walkVersionChain(versionChain, timestamp);
+    }
 
-        ScanVersionChainByTimestamp scanByTimestamp = new 
ScanVersionChainByTimestamp(partitionId);
+    private ReadResult walkVersionChain(VersionChain chainHead, 
HybridTimestamp timestamp) {
+        assert chainHead.hasCommittedVersions();
 
-        try {
-            rowVersionDataPageReader.traverse(newestCommittedLink, 
scanByTimestamp, timestamp);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot search for a row version", e);
+        boolean hasWriteIntent = chainHead.isUncommitted();
+
+        RowVersion firstCommit;
+
+        if (hasWriteIntent) {
+            firstCommit = readRowVersion(chainHead.nextLink(), rowTimestamp -> 
timestamp.compareTo(rowTimestamp) >= 0);
+        } else {
+            firstCommit = readRowVersion(chainHead.headLink(), 
ALWAYS_LOAD_VALUE);

Review Comment:
   I don't know about ALWAYS_LOAD_VALUE, is this necessary?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -443,56 +487,166 @@ public void commitWrite(RowId rowId, HybridTimestamp 
timestamp) throws StorageEx
                         : baseIterator
         ) {
             if (timestamp == null) {
+                ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
                 // Seek to the first appearance of row id if timestamp isn't 
set.
-                // Since timestamps are sorted from newest to oldest, first 
occurance will always be the latest version.
+                // Since timestamps are sorted from newest to oldest, first 
occurrence will always be the latest version.
                 // Unfortunately, copy here is unavoidable with current API.
-                seekIterator.seek(copyOf(keyBuf.array(), keyBuf.position()));
-            } else {
-                // Put timestamp restriction according to N2O timestamps order.
-                putTimestamp(keyBuf, timestamp);
+                assert keyBuf.position() == ROW_PREFIX_SIZE;
+                seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+
+                if (invalid(seekIterator)) {
+                    // No data at all.
+                    return ReadResult.empty();
+                }
+
+                ByteBuffer readKeyBuf = 
MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+
+                int keyLength = seekIterator.key(readKeyBuf);
+
+                if (!matches(rowId, readKeyBuf)) {
+                    // Wrong row id.
+                    return ReadResult.empty();
+                }
+
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                // This seek will either find a key with timestamp that's less 
or equal than required value, or a different key whatsoever.
-                // It is guaranteed by descending order of timestamps.
-                seekIterator.seek(keyBuf.array());
+                byte[] valueBytes = seekIterator.value();

Review Comment:
   This value load might be unnecessary as well. Is is cheaper then having to 
do an extra seek? I don't know. Was that your reasoning?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -280,38 +302,129 @@ private void throwIfChainBelongsToAnotherTx(VersionChain 
versionChain, UUID txId
         assert transactionId != null ^ timestamp != null;
 
         if (transactionId != null) {
-            return findLatestRowVersion(versionChain, transactionId, 
keyFilter);
+            ReadResult res = findLatestRowVersion(versionChain, transactionId, 
keyFilter);
+
+            if (res == null) {
+                return null;
+            }
+
+            return res.binaryRow();
         } else {
-            ByteBufferRow row = findRowVersionByTimestamp(versionChain, 
timestamp);
+            ReadResult res = findRowVersionByTimestamp(versionChain, 
timestamp);
+
+            if (res == null) {
+                return null;
+            }
+
+            BinaryRow row = res.binaryRow();
 
             return keyFilter.test(row) ? row : null;
         }
     }
 
-    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain 
versionChain, HybridTimestamp timestamp) {
-        if (!versionChain.hasCommittedVersions()) {
-            return null;
+    private ReadResult findRowVersionByTimestamp(VersionChain versionChain, 
HybridTimestamp timestamp) {
+        long headLink = versionChain.headLink();
+
+        if (versionChain.isUncommitted()) {
+            if (!versionChain.hasCommittedVersions()) {
+                RowVersion rowVersion = readRowVersion(headLink, 
ALWAYS_LOAD_VALUE);
+
+                assert rowVersion.isUncommitted();
+
+                UUID transactionId = versionChain.transactionId();
+                UUID commitTableId = versionChain.commitTableId();
+                int commitPartitionId = versionChain.commitPartitionId();
+
+                BinaryRow row;
+
+                if (rowVersion.isTombstone()) {
+                    row = null;
+                } else {
+                    row = new ByteBufferRow(rowVersion.value());
+                }
+
+                return ReadResult.createFromWriteIntent(row, transactionId, 
commitTableId, null, commitPartitionId);
+            }
         }
 
-        long newestCommittedLink = versionChain.newestCommittedLink();
+        return walkVersionChain(versionChain, timestamp);
+    }
 
-        ScanVersionChainByTimestamp scanByTimestamp = new 
ScanVersionChainByTimestamp(partitionId);
+    private ReadResult walkVersionChain(VersionChain chainHead, 
HybridTimestamp timestamp) {
+        assert chainHead.hasCommittedVersions();
 
-        try {
-            rowVersionDataPageReader.traverse(newestCommittedLink, 
scanByTimestamp, timestamp);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Cannot search for a row version", e);
+        boolean hasWriteIntent = chainHead.isUncommitted();
+
+        RowVersion firstCommit;
+
+        if (hasWriteIntent) {
+            firstCommit = readRowVersion(chainHead.nextLink(), rowTimestamp -> 
timestamp.compareTo(rowTimestamp) >= 0);
+        } else {
+            firstCommit = readRowVersion(chainHead.headLink(), 
ALWAYS_LOAD_VALUE);

Review Comment:
   I think you get it, there may be unnecessary value loads



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -822,17 +976,24 @@ private static void putTimestamp(ByteBuffer buf, 
HybridTimestamp ts) {
         buf.putInt(~ts.getLogical());
     }
 
-    private static void putTransactionId(byte[] array, int off, UUID txId) {
-        GridUnsafe.putLong(array, GridUnsafe.BYTE_ARR_OFF + off, 
txId.getMostSignificantBits());
-        GridUnsafe.putLong(array, GridUnsafe.BYTE_ARR_OFF + off + Long.BYTES, 
txId.getLeastSignificantBits());
+    private static HybridTimestamp readTimestamp(ByteBuffer buf, int off) {
+        assert buf.order() == BIG_ENDIAN;
+
+        long physical = ~buf.getLong(off);
+        int logical = ~buf.getInt(off + Long.BYTES);
+
+        return new HybridTimestamp(physical, logical);
+    }
+
+    private static void putShort(byte[] array, int off, short value) {
+        GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value);
     }
 
     private static void validateTxId(byte[] valueBytes, UUID txId) {
-        long msb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF);
-        long lsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + 
Long.BYTES);
+        UUID readTxId = bytesToUuid(valueBytes, 0);

Review Comment:
   Extra allocations, aaa



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -199,51 +218,103 @@ private static BinaryRow read(
         assert timestamp == null ^ txId == null;
 
         if (versionChain == null) {
-            return null;
+            return ReadResult.empty();
         }
 
         if (timestamp == null) {
             BinaryRow binaryRow = versionChain.row;
 
             if (filter != null && !filter.test(binaryRow)) {
-                return null;
+                return ReadResult.empty();
             }
 
             if (versionChain.txId != null && !versionChain.txId.equals(txId)) {
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {
+                return ReadResult.createFromWriteIntent(
+                        binaryRow,
+                        versionChain.txId,
+                        versionChain.commitTableId,
+                        versionChain.next != null ? versionChain.next.ts : 
null,
+                        versionChain.commitPartitionId
+                );
+            }
+
+            return ReadResult.createFromCommitted(binaryRow);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.begin == null) {
+        if (cur.ts == null) {
+            if (cur.next == null) {
+                // We only have a write-intent.
+                BinaryRow binaryRow = cur.row;
+
+                if (filter != null && !filter.test(binaryRow)) {
+                    return ReadResult.empty();
+                }
+
+                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, 
cur.commitTableId, null,
+                        cur.commitPartitionId);
+            }
+
             cur = cur.next;
         }
 
+        return walkVersionChain(versionChain, timestamp, filter, cur);
+    }
+
+    private static ReadResult walkVersionChain(VersionChain chainHead, 
HybridTimestamp timestamp, @Nullable Predicate<BinaryRow> filter,
+            VersionChain firstCommit) {
+        boolean hasWriteIntent = chainHead.ts == null;
+
+        if (hasWriteIntent && timestamp.compareTo(firstCommit.ts) > 0) {
+            // It's the latest commit in chain, query ts is greater than 
commit ts and there is a write-intent.
+            // So we just return write-intent.
+            BinaryRow binaryRow = chainHead.row;
+
+            if (filter != null && !filter.test(binaryRow)) {
+                return ReadResult.empty();
+            }
+
+            HybridTimestamp latestCommitTs = chainHead.next != null ? 
firstCommit.ts : null;

Review Comment:
   Wait a minute, haw can `chainHead.next` be null if it's equal to 
`firstCommit`? Very convoluted, please simplify this expression or inline 
`firstCommit.ts`, that would be even better



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -199,51 +218,103 @@ private static BinaryRow read(
         assert timestamp == null ^ txId == null;
 
         if (versionChain == null) {
-            return null;
+            return ReadResult.empty();
         }
 
         if (timestamp == null) {
             BinaryRow binaryRow = versionChain.row;
 
             if (filter != null && !filter.test(binaryRow)) {
-                return null;
+                return ReadResult.empty();
             }
 
             if (versionChain.txId != null && !versionChain.txId.equals(txId)) {
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {
+                return ReadResult.createFromWriteIntent(
+                        binaryRow,
+                        versionChain.txId,
+                        versionChain.commitTableId,
+                        versionChain.next != null ? versionChain.next.ts : 
null,
+                        versionChain.commitPartitionId
+                );
+            }
+
+            return ReadResult.createFromCommitted(binaryRow);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.begin == null) {
+        if (cur.ts == null) {
+            if (cur.next == null) {
+                // We only have a write-intent.
+                BinaryRow binaryRow = cur.row;
+
+                if (filter != null && !filter.test(binaryRow)) {
+                    return ReadResult.empty();
+                }
+
+                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, 
cur.commitTableId, null,
+                        cur.commitPartitionId);
+            }
+
             cur = cur.next;
         }
 
+        return walkVersionChain(versionChain, timestamp, filter, cur);
+    }
+
+    private static ReadResult walkVersionChain(VersionChain chainHead, 
HybridTimestamp timestamp, @Nullable Predicate<BinaryRow> filter,
+            VersionChain firstCommit) {
+        boolean hasWriteIntent = chainHead.ts == null;
+
+        if (hasWriteIntent && timestamp.compareTo(firstCommit.ts) > 0) {
+            // It's the latest commit in chain, query ts is greater than 
commit ts and there is a write-intent.
+            // So we just return write-intent.
+            BinaryRow binaryRow = chainHead.row;
+
+            if (filter != null && !filter.test(binaryRow)) {
+                return ReadResult.empty();
+            }
+
+            HybridTimestamp latestCommitTs = chainHead.next != null ? 
firstCommit.ts : null;
+
+            return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, 
chainHead.commitTableId, latestCommitTs,
+                    chainHead.commitPartitionId);
+        }
+
+        VersionChain cur = firstCommit;
+
         while (cur != null) {
-            if (timestamp.compareTo(cur.begin) >= 0) {
+            int compareResult = timestamp.compareTo(cur.ts);

Review Comment:
   Might be picky, but I truly believe that inlining this expression would make 
code simipler. When I see a pattern `x.compareTo(y) >= 0`, I subconsciously 
replace it with `x >= y`. Splitting evaluation and comparison makes it harder.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -678,9 +712,52 @@ void 
scanWithTxIdThrowsWhenOtherTransactionHasUncommittedChanges() {
     void 
readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
         RowId rowId = commitAbortAndAddUncommitted();
 
-        BinaryRow foundRow = storage.read(rowId, clock.now());
+        BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
 
-        assertRowMatches(foundRow, binaryRow);
+        // We see the uncommitted row.
+        assertRowMatches(foundRow, binaryRow3);
+    }
+
+    @Test
+    void readByTimestampBeforeAndAfterUncommittedWrite() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp commitTs = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow, UUID.randomUUID(), 
UUID.randomUUID(), 5);
+
+            storage.commitWrite(rowId, commitTs);
+            return null;
+        });
+
+        UUID txId2 = UUID.randomUUID();

Review Comment:
   I'm still waiting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to