This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 34354d2a59 IGNITE-21987 Optimize RO scan in sorted indexes (#3566)
34354d2a59 is described below

commit 34354d2a59caddafdc287a81848c4dde68bbcde0
Author: Phillippko <phillip...@gmail.com>
AuthorDate: Thu Apr 11 16:34:39 2024 +0400

    IGNITE-21987 Optimize RO scan in sorted indexes (#3566)
---
 .../processor/messages/MessageImplGenerator.java   |   1 -
 .../internal/storage/index/SortedIndexStorage.java |  23 ++-
 .../index/ThreadAssertingSortedIndexStorage.java   |   7 +
 .../storage/index/TestSortedIndexStorageTest.java  |   5 +
 .../index/AbstractSortedIndexStorageTest.java      | 187 +++++++++++++++------
 .../index/sorted/PageMemorySortedIndexStorage.java |  22 +++
 .../rocksdb/index/RocksDbSortedIndexStorage.java   | 117 ++++++++++---
 .../replicator/PartitionReplicaListener.java       |   2 +-
 8 files changed, 287 insertions(+), 77 deletions(-)

diff --git 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index c9c81509f7..a90eef9500 100644
--- 
a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ 
b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -222,7 +222,6 @@ public class MessageImplGenerator {
 
         messageImpl.addMethod(groupTypeMethod);
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17591
         MethodSpec toStringMethod = MethodSpec.methodBuilder("toString")
                 .addAnnotation(Override.class)
                 .addModifiers(Modifier.PUBLIC)
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index d3dd0bf90f..16abcc684e 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.index;
 
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.util.Cursor;
 import org.intellij.lang.annotations.MagicConstant;
 import org.jetbrains.annotations.Nullable;
 
@@ -47,7 +48,7 @@ public interface SortedIndexStorage extends IndexStorage {
     StorageSortedIndexDescriptor indexDescriptor();
 
     /**
-     * Returns a range of index values between the lower bound and the upper 
bound.
+     * Returns a range of updatable index values between the lower bound and 
the upper bound, supporting read-write transactions.
      *
      * @param lowerBound Lower bound. Exclusivity is controlled by a {@link 
#GREATER_OR_EQUAL} or {@link #GREATER} flag.
      *      {@code null} means unbounded.
@@ -63,4 +64,24 @@ public interface SortedIndexStorage extends IndexStorage {
             @Nullable BinaryTuplePrefix upperBound,
             @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
     );
+
+    /**
+     * Returns a range of index values between the lower bound and the upper 
bound, use in read-only transactions.
+     *
+     * @param lowerBound Lower bound. Exclusivity is controlled by a {@link 
#GREATER_OR_EQUAL} or {@link #GREATER} flag.
+     *      {@code null} means unbounded.
+     * @param upperBound Upper bound. Exclusivity is controlled by a {@link 
#LESS} or {@link #LESS_OR_EQUAL} flag.
+     *      {@code null} means unbounded.
+     * @param flags Control flags. {@link #GREATER} | {@link #LESS} by 
default. Other available values
+     *      are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL}.
+     * @return Cursor with fetched index rows.
+     * @throws IllegalArgumentException If backwards flag is passed and 
backwards iteration is not supported by the storage.
+     */
+    default Cursor<IndexRow> readOnlyScan(
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
+    ) {
+        return scan(lowerBound, upperBound, flags);
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
index 0b61260cfc..7b202b6f36 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.storage.index;
 import static 
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
 
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertingCursor;
 import org.apache.ignite.internal.worker.ThreadAssertions;
 import org.jetbrains.annotations.Nullable;
 
@@ -49,4 +51,9 @@ public class ThreadAssertingSortedIndexStorage extends 
ThreadAssertingIndexStora
 
         return new ThreadAssertingPeekCursor<>(indexStorage.scan(lowerBound, 
upperBound, flags));
     }
+
+    @Override
+    public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix 
lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+        return new 
ThreadAssertingCursor<>(indexStorage.readOnlyScan(lowerBound, upperBound, 
flags));
+    }
 }
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
index aeeec2311b..03c24d5d48 100644
--- 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
@@ -36,4 +36,9 @@ public class TestSortedIndexStorageTest extends 
AbstractSortedIndexStorageTest {
 
         initialize(storage);
     }
+
+    // Not valid for test storage.
+    @Override
+    void testReadOnlyScanContractUpdateAfterScan() {
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 10e0d198b0..12f1751934 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -81,6 +81,9 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Base class for Sorted Index storage tests.
@@ -166,11 +169,12 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         assertThat(actual, is(equalTo(columns)));
     }
 
-    @Test
-    void testEmpty() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testEmpty(boolean readOnly) {
         SortedIndexStorage index = createIndexStorage(INDEX_NAME, 
shuffledRandomColumnParams());
 
-        assertThat(scan(index, null, null, 0), is(empty()));
+        assertThat(scan(index, null, null, 0, readOnly), is(empty()));
     }
 
     /**
@@ -205,7 +209,7 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         put(index, row2);
         put(index, row3);
 
-        List<Object[]> actualColumns = scan(index, null, null, 0);
+        List<Object[]> actualColumns = scan(index, null, null, 0, false);
 
         assertThat(actualColumns, contains(columnValues2, columnValues1, 
columnValues1));
     }
@@ -233,28 +237,28 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         put(index, row2);
         put(index, row3);
 
-        List<Object[]> actualColumns = scan(index, null, null, 0);
+        List<Object[]> actualColumns = scan(index, null, null, 0, false);
 
         assertThat(actualColumns, contains(columnValues2, columnValues1, 
columnValues1));
 
         // Test that rows with the same indexed columns can be removed 
individually
         remove(index, row2);
 
-        actualColumns = scan(index, null, null, 0);
+        actualColumns = scan(index, null, null, 0, false);
 
         assertThat(actualColumns, contains(columnValues2, columnValues1));
 
         // Test that removing a non-existent row does nothing
         remove(index, row2);
 
-        actualColumns = scan(index, null, null, 0);
+        actualColumns = scan(index, null, null, 0, false);
 
         assertThat(actualColumns, contains(columnValues2, columnValues1));
 
         // Test that the first row can be actually removed
         remove(index, row1);
 
-        actualColumns = scan(index, null, null, 0);
+        actualColumns = scan(index, null, null, 0, false);
 
         assertThat(actualColumns, contains((Object) columnValues2));
     }
@@ -271,7 +275,19 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
      * Tests the happy case of the {@link SortedIndexStorage#scan} method.
      */
     @RepeatedTest(5)
-    void testScan() {
+    void testReadWriteScan() {
+        testScan(false);
+    }
+
+    /**
+     * Tests the happy case of the {@link SortedIndexStorage#readOnlyScan} 
method.
+     */
+    @RepeatedTest(5)
+    void testReadOnlyScan() {
+        testScan(true);
+    }
+
+    private void testScan(boolean readOnly) {
         SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, 
shuffledColumnParams());
 
         List<TestIndexRow> entries = IntStream.range(0, 10)
@@ -297,7 +313,10 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
 
         assertThat(expected, hasSize(greaterThanOrEqualTo(lastIndex - 
firstIndex + 1)));
 
-        try (Cursor<IndexRow> cursor = indexStorage.scan(first.prefix(), 
last.prefix(), GREATER_OR_EQUAL | LESS_OR_EQUAL)) {
+        try (Cursor<IndexRow> cursor = readOnly
+                ? indexStorage.readOnlyScan(first.prefix(), last.prefix(), 
GREATER_OR_EQUAL | LESS_OR_EQUAL)
+                : indexStorage.scan(first.prefix(), last.prefix(), 
GREATER_OR_EQUAL | LESS_OR_EQUAL)
+        ) {
             List<IndexRow> actual = cursor.stream().collect(toList());
 
             assertThat(actual, hasSize(expected.size()));
@@ -308,8 +327,9 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         }
     }
 
-    @Test
-    public void testBoundsAndOrder() {
+    @ParameterizedTest()
+    @ValueSource(booleans = {true, false})
+    public void testBoundsAndOrder(boolean readOnly) {
         ColumnType string = ColumnType.STRING;
         ColumnType int32 = ColumnType.INT32;
 
@@ -341,65 +361,67 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
 
         // Test without bounds.
         assertThat(
-                scan(index1, null, null, 0),
+                scan(index1, null, null, 0, readOnly),
                 contains(val1080, val1090, val2080, val2090)
         );
 
         assertThat(
-                scan(index2, null, null, 0),
+                scan(index2, null, null, 0, readOnly),
                 contains(val1090, val1080, val2090, val2080)
         );
 
         // Lower bound exclusive.
         assertThat(
-                scan(index1, prefix(index1, "10"), null, GREATER),
+                scan(index1, prefix(index1, "10"), null, GREATER, readOnly),
                 contains(val2080, val2090)
         );
 
         assertThat(
-                scan(index2, prefix(index2, "10"), null, GREATER),
+                scan(index2, prefix(index2, "10"), null, GREATER, readOnly),
                 contains(val2090, val2080)
         );
 
         // Lower bound inclusive.
         assertThat(
-                scan(index1, prefix(index1, "10"), null, GREATER_OR_EQUAL),
+                scan(index1, prefix(index1, "10"), null, GREATER_OR_EQUAL, 
readOnly),
                 contains(val1080, val1090, val2080, val2090)
         );
 
         assertThat(
-                scan(index2, prefix(index2, "10"), null, GREATER_OR_EQUAL),
+                scan(index2, prefix(index2, "10"), null, GREATER_OR_EQUAL, 
readOnly),
                 contains(val1090, val1080, val2090, val2080)
         );
 
         // Upper bound exclusive.
         assertThat(
-                scan(index1, null, prefix(index1, "20"), LESS),
+                scan(index1, null, prefix(index1, "20"), LESS, readOnly),
                 contains(val1080, val1090)
         );
 
         assertThat(
-                scan(index2, null, prefix(index2, "20"), LESS),
+                scan(index2, null, prefix(index2, "20"), LESS, readOnly),
                 contains(val1090, val1080)
         );
 
         // Upper bound inclusive.
         assertThat(
-                scan(index1, null, prefix(index1, "20"), LESS_OR_EQUAL),
+                scan(index1, null, prefix(index1, "20"), LESS_OR_EQUAL, 
readOnly),
                 contains(val1080, val1090, val2080, val2090)
         );
 
         assertThat(
-                scan(index2, null, prefix(index2, "20"), LESS_OR_EQUAL),
+                scan(index2, null, prefix(index2, "20"), LESS_OR_EQUAL, 
readOnly),
                 contains(val1090, val1080, val2090, val2080)
         );
     }
 
     /**
-     * Tests that an empty range is returned if {@link 
SortedIndexStorage#scan} method is called using overlapping keys.
+     * Tests that an empty range is returned if {@link 
SortedIndexStorage#scan} and {@link SortedIndexStorage#readOnlyScan} methods
+     * are called using overlapping keys.
      */
-    @Test
-    void testEmptyRange() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testEmptyRange(boolean readOnly) {
         List<ColumnParams> indexSchema = shuffledRandomColumnParams();
 
         SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, 
indexSchema);
@@ -416,18 +438,20 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         put(indexStorage, entry1);
         put(indexStorage, entry2);
 
-        try (Cursor<IndexRow> cursor = indexStorage.scan(
+        try (Cursor<IndexRow> cursor = getIndexRowCursor(
+                indexStorage,
                 entry2.prefix(indexSchema.size()).prefix(),
                 entry1.prefix(indexSchema.size()).prefix(),
-                0
+                0,
+                readOnly
         )) {
             assertThat(cursor.stream().collect(toList()), is(empty()));
         }
     }
 
     @ParameterizedTest
-    @VariableSource("ALL_TYPES_COLUMN_PARAMS")
-    void testNullValues(ColumnParams columnParams) {
+    @MethodSource("allTypesColumnParamsAndReadOnly")
+    void testNullValues(ColumnParams columnParams, boolean readOnly) {
         SortedIndexStorage storage = createIndexStorage(INDEX_NAME, 
List.of(columnParams));
 
         TestIndexRow entry1 = TestIndexRow.randomRow(storage, TEST_PARTITION);
@@ -449,10 +473,12 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
             entry1 = t;
         }
 
-        try (Cursor<IndexRow> cursor = storage.scan(
+        try (Cursor<IndexRow> cursor = getIndexRowCursor(
+                storage,
                 entry1.prefix(1).prefix(),
                 entry2.prefix(1).prefix(),
-                GREATER_OR_EQUAL | LESS_OR_EQUAL
+                GREATER_OR_EQUAL | LESS_OR_EQUAL,
+                readOnly
         )) {
             assertThat(
                     cursor.stream().map(row -> 
row.indexColumns().byteBuffer()).collect(toList()),
@@ -464,8 +490,9 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
     /**
      * Checks simple scenarios for a scanning cursor.
      */
-    @Test
-    void testScanSimple() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testScanSimple(boolean readOnly) {
         SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, 
ColumnType.INT32);
 
         BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
@@ -476,7 +503,7 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
 
         // Checking without borders.
         assertThat(
-                scan(indexStorage, null, null, 0, 
AbstractSortedIndexStorageTest::firstArrayElement),
+                scan(indexStorage, null, null, 0, 
AbstractSortedIndexStorageTest::firstArrayElement, readOnly),
                 contains(0, 1, 2, 3, 4)
         );
 
@@ -487,7 +514,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(0),
                         serializer.serializeRowPrefix(4),
                         (GREATER_OR_EQUAL | LESS_OR_EQUAL),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(0, 1, 2, 3, 4)
         );
@@ -498,7 +526,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(0),
                         serializer.serializeRowPrefix(4),
                         (GREATER_OR_EQUAL | LESS),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(0, 1, 2, 3)
         );
@@ -509,7 +538,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(0),
                         serializer.serializeRowPrefix(4),
                         (GREATER | LESS_OR_EQUAL),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(1, 2, 3, 4)
         );
@@ -520,7 +550,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(0),
                         serializer.serializeRowPrefix(4),
                         (GREATER | LESS),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(1, 2, 3)
         );
@@ -532,7 +563,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(1),
                         null,
                         (GREATER_OR_EQUAL | LESS_OR_EQUAL),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(1, 2, 3, 4)
         );
@@ -543,7 +575,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(1),
                         null,
                         (GREATER_OR_EQUAL | LESS),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(1, 2, 3, 4)
         );
@@ -554,7 +587,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(1),
                         null,
                         (GREATER | LESS_OR_EQUAL),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(2, 3, 4)
         );
@@ -565,7 +599,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         serializer.serializeRowPrefix(1),
                         null,
                         (GREATER | LESS),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(2, 3, 4)
         );
@@ -577,7 +612,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         null,
                         serializer.serializeRowPrefix(3),
                         (GREATER_OR_EQUAL | LESS_OR_EQUAL),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(0, 1, 2, 3)
         );
@@ -588,7 +624,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         null,
                         serializer.serializeRowPrefix(3),
                         (GREATER_OR_EQUAL | LESS),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(0, 1, 2)
         );
@@ -599,7 +636,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         null,
                         serializer.serializeRowPrefix(3),
                         (GREATER | LESS_OR_EQUAL),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(0, 1, 2, 3)
         );
@@ -610,7 +648,8 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
                         null,
                         serializer.serializeRowPrefix(3),
                         (GREATER | LESS),
-                        AbstractSortedIndexStorageTest::firstArrayElement
+                        AbstractSortedIndexStorageTest::firstArrayElement,
+                        readOnly
                 ),
                 contains(0, 1, 2)
         );
@@ -639,6 +678,24 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         assertThrows(NoSuchElementException.class, scan::next);
     }
 
+    @Test
+    void testReadOnlyScanContractUpdateAfterScan() {
+        SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, 
ColumnType.INT32);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        // Put before scan, should be visible.
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, new 
RowId(TEST_PARTITION)));
+
+        Cursor<IndexRow> scan = indexStorage.readOnlyScan(null, null, 0);
+
+        // Put after scan, should not be visible.
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, new 
RowId(TEST_PARTITION)));
+
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+        assertFalse(scan::hasNext);
+    }
+
     @Test
     void testScanContractAddRowAfterInvokeHasNext() {
         SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, 
ColumnType.INT32);
@@ -1458,21 +1515,26 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
             SortedIndexStorage index,
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
-            @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
+            @MagicConstant(flagsFromClass = SortedIndexStorage.class) int 
flags,
+            boolean readOnly
     ) {
-        return scan(index, lowerBound, upperBound, flags, identity());
+        return scan(index, lowerBound, upperBound, flags, identity(), 
readOnly);
     }
 
     private static <T> List<T> scan(
-            SortedIndexStorage index,
+            SortedIndexStorage storage,
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             @MagicConstant(flagsFromClass = SortedIndexStorage.class) int 
flags,
-            Function<Object[], T> mapper
+            Function<Object[], T> mapper,
+            boolean readOnly
     ) {
-        var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
+        var serializer = new 
BinaryTupleRowSerializer(storage.indexDescriptor());
 
-        try (Cursor<IndexRow> cursor = index.scan(lowerBound, upperBound, 
flags)) {
+        try (Cursor<IndexRow> cursor = readOnly
+                ? storage.readOnlyScan(lowerBound, upperBound, flags)
+                : storage.scan(lowerBound, upperBound, flags)
+        ) {
             return cursor.stream()
                     .map(serializer::deserializeColumns)
                     .map(mapper)
@@ -1480,6 +1542,27 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
         }
     }
 
+    private static Cursor<IndexRow> getIndexRowCursor(
+            SortedIndexStorage indexStorage,
+            BinaryTuplePrefix lowerBound,
+            BinaryTuplePrefix upperBound,
+            int flags,
+            boolean readOnly
+    ) {
+        return readOnly
+                ? indexStorage.readOnlyScan(lowerBound, upperBound, flags)
+                : indexStorage.scan(lowerBound, upperBound, flags);
+    }
+
+    private static Stream<Arguments> allTypesColumnParamsAndReadOnly() {
+        return Stream.concat(
+                ALL_TYPES_COLUMN_PARAMS.stream()
+                        .map(param -> Arguments.of(param, false)),
+                ALL_TYPES_COLUMN_PARAMS.stream()
+                        .map(param -> Arguments.of(param, true))
+        );
+    }
+
     private static <T> Function<IndexRow, T> 
firstColumn(BinaryTupleRowSerializer serializer) {
         return indexRow -> (T) serializer.deserializeColumns(indexRow)[0];
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 07ef21ce25..912da5301e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -178,6 +179,27 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
         });
     }
 
+    @Override
+    public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix 
lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+        return busyDataRead(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+            boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+            SortedIndexRowKey lower = createBound(lowerBound, !includeLower);
+            SortedIndexRowKey upper = createBound(upperBound, includeUpper);
+
+            try {
+                Cursor<SortedIndexRow> cursor = indexTree.find(lower, upper);
+
+                return CursorUtils.map(cursor, this::toIndexRowImpl);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Couldn't get index tree cursor", 
e);
+            }
+        });
+    }
+
     private @Nullable SortedIndexRowKey createBound(@Nullable 
BinaryTuplePrefix bound, boolean setEqualityFlag) {
         if (bound == null) {
             return null;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 0b4ca1ed02..07ab97f036 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -23,8 +23,10 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PAR
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
 import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
 import java.util.function.Function;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -41,7 +43,10 @@ import 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteBatchWithIndex;
 
@@ -164,38 +169,106 @@ public class RocksDbSortedIndexStorage extends 
AbstractRocksDbIndexStorage imple
             boolean includeUpper,
             Function<ByteBuffer, T> mapper
     ) {
-        byte[] lowerBoundBytes;
+        byte[] lowerBoundBytes = getBound(lowerBound, partitionStartPrefix, 
!includeLower);
 
-        if (lowerBound == null) {
-            lowerBoundBytes = partitionStartPrefix;
-        } else {
-            lowerBoundBytes = rocksPrefix(lowerBound);
+        byte[] upperBoundBytes = getBound(upperBound, partitionEndPrefix, 
includeUpper);
 
-            // Skip the lower bound, if needed (RocksDB includes the lower 
bound by default).
-            if (!includeLower) {
-                setEqualityFlag(lowerBoundBytes);
+        return new UpToDatePeekCursor<>(upperBoundBytes, indexCf, 
lowerBoundBytes) {
+            @Override
+            protected T map(ByteBuffer byteBuffer) {
+                return mapper.apply(byteBuffer);
             }
-        }
+        };
+    }
+
+    @Override
+    public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix 
lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+        return busyDataRead(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+            boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+            byte[] lowerBoundBytes = getBound(lowerBound, 
partitionStartPrefix, !includeLower);
+            byte[] upperBoundBytes = getBound(upperBound, partitionEndPrefix, 
includeUpper);
+
+            Slice upperBoundSlice = new Slice(upperBoundBytes);
+
+            ReadOptions readOptions = new ReadOptions()
+                    .setIterateUpperBound(upperBoundSlice);
+
+            RocksIterator iterator = indexCf.newIterator(readOptions);
+            iterator.seek(lowerBoundBytes);
+
+            return new Cursor<IndexRow>() {
+                private final RocksIterator it = iterator;
 
-        byte[] upperBoundBytes;
+                private byte[] key;
 
-        if (upperBound == null) {
-            upperBoundBytes = partitionEndPrefix;
+                private boolean advance;
+
+                @Override
+                public void close() {
+                    try {
+                        closeAll(it, readOptions, upperBoundSlice);
+                    } catch (Exception e) {
+                        throw new StorageException("Error closing RocksDB RO 
cursor", e);
+                    }
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return busyDataRead(this::advanceIfNeededBusy);
+                }
+
+                @Override
+                public IndexRow next() {
+                    return busyDataRead(() -> {
+                        if (!advanceIfNeededBusy()) {
+                            throw new NoSuchElementException();
+                        }
+
+                        advance = true;
+
+                        return 
decodeRow((ByteBuffer.wrap(key).order(KEY_BYTE_ORDER)));
+                    });
+                }
+
+                private boolean advanceIfNeededBusy() throws StorageException {
+                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
() -> createStorageInfo());
+
+                    if (advance) {
+                        it.next();
+                        advance = false;
+                    }
+
+                    if (!it.isValid()) {
+                        return false;
+                    }
+
+                    key = it.key();
+
+                    return true;
+                }
+            };
+        });
+    }
+
+    private byte[] getBound(@Nullable BinaryTuplePrefix bound, byte[] 
partitionPrefix, boolean changeBoundIncluded) {
+        byte[] boundBytes;
+
+        if (bound == null) {
+            boundBytes = partitionPrefix;
         } else {
-            upperBoundBytes = rocksPrefix(upperBound);
+            boundBytes = rocksPrefix(bound);
 
-            // Include the upper bound, if needed (RocksDB excludes the upper 
bound by default).
-            if (includeUpper) {
-                setEqualityFlag(upperBoundBytes);
+            // RocksDB excludes upper and includes lower by default), set flag 
to change.
+            if (changeBoundIncluded) {
+                setEqualityFlag(boundBytes);
             }
         }
 
-        return new UpToDatePeekCursor<>(upperBoundBytes, indexCf, 
lowerBoundBytes) {
-            @Override
-            protected T map(ByteBuffer byteBuffer) {
-                return mapper.apply(byteBuffer);
-            }
-        };
+        return boundBytes;
     }
 
     private static void setEqualityFlag(byte[] prefix) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0950952adf..f8a9090320 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1370,7 +1370,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         int flags = request.flags();
 
         Cursor<IndexRow> cursor = 
remotelyTriggeredResourceRegistry.<CursorResource>register(cursorId, 
request.coordinatorId(),
-                () -> new CursorResource(indexStorage.scan(
+                () -> new CursorResource(indexStorage.readOnlyScan(
                         lowerBound,
                         upperBound,
                         flags


Reply via email to