sashapolo commented on code in PR #1124:
URL: https://github.com/apache/ignite-3/pull/1124#discussion_r983608669


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -327,14 +329,72 @@ public Cursor<BinaryRow> scan(Predicate<BinaryRow> 
filter, UUID txId) {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp 
timestamp) {
-        Iterator<BinaryRow> iterator = map.values().stream()
-                .map(versionChain -> read(versionChain, timestamp, null, 
filter))
-                .map(ReadResult::binaryRow)
-                .filter(Objects::nonNull)
-                .iterator();
+    public PartitionTimestampCursor scan(Predicate<BinaryRow> filter, 
HybridTimestamp timestamp) {
+        Iterator<VersionChain> iterator = map.values().iterator();
 
-        return Cursor.fromIterator(iterator);
+        return new PartitionTimestampCursor() {
+
+            private VersionChain currentChain;

Review Comment:
   This field should be annotated as `@Nullable`



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionTimestampCursor.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor of values at the given timestamp.
+ */
+public interface PartitionTimestampCursor extends Cursor<ReadResult> {
+    /**
+     * Returns a committed row within the current row id that conform the 
given timestamp.

Review Comment:
   ```suggestion
        * Returns a committed row within the current row id that is associated 
with the given timestamp.
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -712,29 +695,132 @@ public void destroy() {
         // TODO: IGNITE-17132 Implement it
     }
 
-    private class ScanCursor implements Cursor<BinaryRow> {
+    private class TimestampCursor implements PartitionTimestampCursor {

Review Comment:
   Please add a javadoc



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -712,29 +695,132 @@ public void destroy() {
         // TODO: IGNITE-17132 Implement it
     }
 
-    private class ScanCursor implements Cursor<BinaryRow> {
+    private class TimestampCursor implements PartitionTimestampCursor {
         private final IgniteCursor<VersionChain> treeCursor;
 
         private final Predicate<BinaryRow> keyFilter;
 
-        private final @Nullable UUID transactionId;
+        private final HybridTimestamp timestamp;
 
-        private final @Nullable HybridTimestamp timestamp;
+        private ReadResult nextRead = null;

Review Comment:
   These two fields should be marked as `@Nullable`



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -684,47 +690,24 @@ public void close() throws Exception {
     /** {@inheritDoc} */
     @Override
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) 
throws TxIdMismatchException, StorageException {
-        return scan(keyFilter, null, txId);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, 
HybridTimestamp timestamp) throws StorageException {
-        return scan(keyFilter, timestamp, null);
-    }
-
-    private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable 
HybridTimestamp timestamp, @Nullable UUID txId)
-            throws TxIdMismatchException, StorageException {
-        assert timestamp == null ^ txId == null;
+        assert txId != null;
 
         // Set next partition as an upper bound.
-        var options = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+        ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
 
         RocksIterator it = db.newIterator(cf, options);
 
         // Seek iterator to the beginning of the partition.
         it.seek(partitionStartPrefix());
 
-        // Size of the buffer to seek values. Should fit partition id, row id 
and maybe timestamp, if it's not null.
-        int seekKeyBufSize = ROW_PREFIX_SIZE + (timestamp == null ? 0 : 
HYBRID_TIMESTAMP_SIZE);
-
         // Here's seek buffer itself. Originally it contains a valid partition 
id, row id payload that's filled with zeroes, and maybe
         // a timestamp value. Zero row id guarantees that it's 
lexicographically less than or equal to any other row id stored in the
         // partition.
         // Byte buffer from a thread-local field can't be used here, because 
of two reasons:
         //  - no one guarantees that there will only be a single cursor;
         //  - no one guarantees that returned cursor will not be used by other 
threads.
         // The thing is, we need this buffer to preserve its content between 
invocations of "hasNext" method.
-        ByteBuffer seekKeyBuf = 
ByteBuffer.allocate(seekKeyBufSize).order(BIG_ENDIAN).putShort((short) 
partitionId);
-
-        if (timestamp != null) {
-            putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
-        }
-
-        // Version without timestamp to compare row ids.
-        ByteBuffer seekKeyBufSliceWithoutTimestamp = timestamp == null
-                ? seekKeyBuf
-                : 
seekKeyBuf.position(0).limit(ROW_PREFIX_SIZE).slice().order(BIG_ENDIAN);
+        ByteBuffer seekKeyBuf = 
ByteBuffer.allocate(ROW_PREFIX_SIZE).order(BIG_ENDIAN).putShort((short) 
partitionId);

Review Comment:
   Well well well. Isn't that a byte order not extracted into a constant?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -780,40 +760,12 @@ public boolean hasNext() {
 
                     directBuffer.limit(ROW_PREFIX_SIZE);
 
-                    boolean wrongRowIdWasFound = 
!directBuffer.equals(seekKeyBufSliceWithoutTimestamp);
-
-                    // To understand this condition please read that huge 
comment above.
-                    if (timestamp == null || wrongRowIdWasFound) {
-                        // Copy actual row id into a "seekKeyBuf" buffer.
-                        GridUnsafe.copyMemory(
-                                null, GridUnsafe.bufferAddress(directBuffer) + 
ROW_ID_OFFSET,
-                                seekKeyBuf.array(), GridUnsafe.BYTE_ARR_OFF + 
ROW_ID_OFFSET,
-                                ROW_ID_SIZE
-                        );
-                    }
-
-                    // Perform additional "seek" if timestamp is not null. 
Motivation for it is described in comments above.
-                    if (timestamp != null && wrongRowIdWasFound) {
-                        // At this point, "seekKeyBuf" has row id that exists 
in partition.
-                        it.seek(seekKeyBuf.array());
-
-                        // Iterator may not be valid if that row was created 
after required timestamp.
-                        if (invalid(it)) {
-                            return false;
-                        }
-
-                        // Or iterator may still be valid even if there's no 
version for required timestamp. In this case row id
-                        // itself will be different, and we must check it.
-                        keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
-
-                        isWriteIntent = keyLength == ROW_PREFIX_SIZE;
-
-                        directBuffer.limit(ROW_PREFIX_SIZE);
-
-                        if 
(!directBuffer.equals(seekKeyBufSliceWithoutTimestamp)) {
-                            found = false;
-                        }
-                    }
+                    // Copy actual row id into a "seekKeyBuf" buffer.
+                    GridUnsafe.copyMemory(

Review Comment:
   Why do we need to use unsafe here? What's wrong with `directBuffer.put`?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -684,47 +690,24 @@ public void close() throws Exception {
     /** {@inheritDoc} */
     @Override
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) 
throws TxIdMismatchException, StorageException {
-        return scan(keyFilter, null, txId);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, 
HybridTimestamp timestamp) throws StorageException {
-        return scan(keyFilter, timestamp, null);
-    }
-
-    private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable 
HybridTimestamp timestamp, @Nullable UUID txId)
-            throws TxIdMismatchException, StorageException {
-        assert timestamp == null ^ txId == null;
+        assert txId != null;
 
         // Set next partition as an upper bound.
-        var options = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+        ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);

Review Comment:
   why do we need the `totalOrderSeek` option here? I believe it's needed when 
a prefix is configured for a Column Family,  but I couldn't find this 
configuration here....



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java:
##########
@@ -327,14 +329,72 @@ public Cursor<BinaryRow> scan(Predicate<BinaryRow> 
filter, UUID txId) {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp 
timestamp) {
-        Iterator<BinaryRow> iterator = map.values().stream()
-                .map(versionChain -> read(versionChain, timestamp, null, 
filter))
-                .map(ReadResult::binaryRow)
-                .filter(Objects::nonNull)
-                .iterator();
+    public PartitionTimestampCursor scan(Predicate<BinaryRow> filter, 
HybridTimestamp timestamp) {
+        Iterator<VersionChain> iterator = map.values().iterator();
 
-        return Cursor.fromIterator(iterator);
+        return new PartitionTimestampCursor() {
+
+            private VersionChain currentChain;
+
+            private ReadResult currentReadResult;

Review Comment:
   Same here



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -712,29 +695,132 @@ public void destroy() {
         // TODO: IGNITE-17132 Implement it
     }
 
-    private class ScanCursor implements Cursor<BinaryRow> {
+    private class TimestampCursor implements PartitionTimestampCursor {
         private final IgniteCursor<VersionChain> treeCursor;
 
         private final Predicate<BinaryRow> keyFilter;
 
-        private final @Nullable UUID transactionId;
+        private final HybridTimestamp timestamp;
 
-        private final @Nullable HybridTimestamp timestamp;
+        private ReadResult nextRead = null;
+
+        private VersionChain currentChain = null;
+
+        private boolean iterationExhausted = false;
+
+        public TimestampCursor(IgniteCursor<VersionChain> treeCursor, 
Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) {
+            this.treeCursor = treeCursor;
+            this.keyFilter = keyFilter;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextRead != null) {
+                return true;
+            }
+
+            if (iterationExhausted) {
+                return false;
+            }
+
+            currentChain = null;
+
+            while (true) {
+                boolean positionedToNext = tryAdvanceTreeCursor();
+
+                if (!positionedToNext) {
+                    iterationExhausted = true;
+
+                    return false;
+                }
+
+                VersionChain chain = getCurrentChainFromTreeCursor();
+                ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+                if (res.isEmpty()) {
+                    continue;
+                }
+
+                if (!keyFilter.test(res.binaryRow())) {
+                    continue;
+                }
+
+                nextRead = res;
+                currentChain = chain;
+
+                return true;
+            }
+        }
+
+        private boolean tryAdvanceTreeCursor() {
+            try {
+                return treeCursor.next();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error when trying to advance tree 
cursor", e);
+            }
+        }
+
+        private VersionChain getCurrentChainFromTreeCursor() {
+            try {
+                return treeCursor.get();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to get element from tree 
cursor", e);
+            }
+        }
+
+        @Override
+        public ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted");
+            }
+
+            assert nextRead != null;
+
+            ReadResult res = nextRead;
+
+            nextRead = null;
+
+            return res;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+            if (currentChain == null) {
+                throw new IllegalStateException();
+            }
+
+            ReadResult res = findRowVersionByTimestamp(currentChain, 
timestamp);
+
+            // We don't check if row conforms the key filter here, because 
we've already checked it.
+            return res.binaryRow();
+        }
+    }
+
+    private class TransactionIdCursor implements Cursor<BinaryRow> {

Review Comment:
   Please add a javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to