This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 34354d2a59 IGNITE-21987 Optimize RO scan in sorted indexes (#3566) 34354d2a59 is described below commit 34354d2a59caddafdc287a81848c4dde68bbcde0 Author: Phillippko <phillip...@gmail.com> AuthorDate: Thu Apr 11 16:34:39 2024 +0400 IGNITE-21987 Optimize RO scan in sorted indexes (#3566) --- .../processor/messages/MessageImplGenerator.java | 1 - .../internal/storage/index/SortedIndexStorage.java | 23 ++- .../index/ThreadAssertingSortedIndexStorage.java | 7 + .../storage/index/TestSortedIndexStorageTest.java | 5 + .../index/AbstractSortedIndexStorageTest.java | 187 +++++++++++++++------ .../index/sorted/PageMemorySortedIndexStorage.java | 22 +++ .../rocksdb/index/RocksDbSortedIndexStorage.java | 117 ++++++++++--- .../replicator/PartitionReplicaListener.java | 2 +- 8 files changed, 287 insertions(+), 77 deletions(-) diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java index c9c81509f7..a90eef9500 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java @@ -222,7 +222,6 @@ public class MessageImplGenerator { messageImpl.addMethod(groupTypeMethod); - // TODO: https://issues.apache.org/jira/browse/IGNITE-17591 MethodSpec toStringMethod = MethodSpec.methodBuilder("toString") .addAnnotation(Override.class) .addModifiers(Modifier.PUBLIC) diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java index d3dd0bf90f..16abcc684e 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.index; import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.util.Cursor; import org.intellij.lang.annotations.MagicConstant; import org.jetbrains.annotations.Nullable; @@ -47,7 +48,7 @@ public interface SortedIndexStorage extends IndexStorage { StorageSortedIndexDescriptor indexDescriptor(); /** - * Returns a range of index values between the lower bound and the upper bound. + * Returns a range of updatable index values between the lower bound and the upper bound, supporting read-write transactions. * * @param lowerBound Lower bound. Exclusivity is controlled by a {@link #GREATER_OR_EQUAL} or {@link #GREATER} flag. * {@code null} means unbounded. @@ -63,4 +64,24 @@ public interface SortedIndexStorage extends IndexStorage { @Nullable BinaryTuplePrefix upperBound, @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags ); + + /** + * Returns a range of index values between the lower bound and the upper bound, use in read-only transactions. + * + * @param lowerBound Lower bound. Exclusivity is controlled by a {@link #GREATER_OR_EQUAL} or {@link #GREATER} flag. + * {@code null} means unbounded. + * @param upperBound Upper bound. Exclusivity is controlled by a {@link #LESS} or {@link #LESS_OR_EQUAL} flag. + * {@code null} means unbounded. + * @param flags Control flags. {@link #GREATER} | {@link #LESS} by default. Other available values + * are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL}. + * @return Cursor with fetched index rows. + * @throws IllegalArgumentException If backwards flag is passed and backwards iteration is not supported by the storage. + */ + default Cursor<IndexRow> readOnlyScan( + @Nullable BinaryTuplePrefix lowerBound, + @Nullable BinaryTuplePrefix upperBound, + @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags + ) { + return scan(lowerBound, upperBound, flags); + } } diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java index 0b61260cfc..7b202b6f36 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.storage.index; import static org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead; import org.apache.ignite.internal.schema.BinaryTuplePrefix; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.worker.ThreadAssertingCursor; import org.apache.ignite.internal.worker.ThreadAssertions; import org.jetbrains.annotations.Nullable; @@ -49,4 +51,9 @@ public class ThreadAssertingSortedIndexStorage extends ThreadAssertingIndexStora return new ThreadAssertingPeekCursor<>(indexStorage.scan(lowerBound, upperBound, flags)); } + + @Override + public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) { + return new ThreadAssertingCursor<>(indexStorage.readOnlyScan(lowerBound, upperBound, flags)); + } } diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java index aeeec2311b..03c24d5d48 100644 --- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java +++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java @@ -36,4 +36,9 @@ public class TestSortedIndexStorageTest extends AbstractSortedIndexStorageTest { initialize(storage); } + + // Not valid for test storage. + @Override + void testReadOnlyScanContractUpdateAfterScan() { + } } diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java index 10e0d198b0..12f1751934 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java @@ -81,6 +81,9 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; /** * Base class for Sorted Index storage tests. @@ -166,11 +169,12 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag assertThat(actual, is(equalTo(columns))); } - @Test - void testEmpty() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testEmpty(boolean readOnly) { SortedIndexStorage index = createIndexStorage(INDEX_NAME, shuffledRandomColumnParams()); - assertThat(scan(index, null, null, 0), is(empty())); + assertThat(scan(index, null, null, 0, readOnly), is(empty())); } /** @@ -205,7 +209,7 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag put(index, row2); put(index, row3); - List<Object[]> actualColumns = scan(index, null, null, 0); + List<Object[]> actualColumns = scan(index, null, null, 0, false); assertThat(actualColumns, contains(columnValues2, columnValues1, columnValues1)); } @@ -233,28 +237,28 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag put(index, row2); put(index, row3); - List<Object[]> actualColumns = scan(index, null, null, 0); + List<Object[]> actualColumns = scan(index, null, null, 0, false); assertThat(actualColumns, contains(columnValues2, columnValues1, columnValues1)); // Test that rows with the same indexed columns can be removed individually remove(index, row2); - actualColumns = scan(index, null, null, 0); + actualColumns = scan(index, null, null, 0, false); assertThat(actualColumns, contains(columnValues2, columnValues1)); // Test that removing a non-existent row does nothing remove(index, row2); - actualColumns = scan(index, null, null, 0); + actualColumns = scan(index, null, null, 0, false); assertThat(actualColumns, contains(columnValues2, columnValues1)); // Test that the first row can be actually removed remove(index, row1); - actualColumns = scan(index, null, null, 0); + actualColumns = scan(index, null, null, 0, false); assertThat(actualColumns, contains((Object) columnValues2)); } @@ -271,7 +275,19 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag * Tests the happy case of the {@link SortedIndexStorage#scan} method. */ @RepeatedTest(5) - void testScan() { + void testReadWriteScan() { + testScan(false); + } + + /** + * Tests the happy case of the {@link SortedIndexStorage#readOnlyScan} method. + */ + @RepeatedTest(5) + void testReadOnlyScan() { + testScan(true); + } + + private void testScan(boolean readOnly) { SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, shuffledColumnParams()); List<TestIndexRow> entries = IntStream.range(0, 10) @@ -297,7 +313,10 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag assertThat(expected, hasSize(greaterThanOrEqualTo(lastIndex - firstIndex + 1))); - try (Cursor<IndexRow> cursor = indexStorage.scan(first.prefix(), last.prefix(), GREATER_OR_EQUAL | LESS_OR_EQUAL)) { + try (Cursor<IndexRow> cursor = readOnly + ? indexStorage.readOnlyScan(first.prefix(), last.prefix(), GREATER_OR_EQUAL | LESS_OR_EQUAL) + : indexStorage.scan(first.prefix(), last.prefix(), GREATER_OR_EQUAL | LESS_OR_EQUAL) + ) { List<IndexRow> actual = cursor.stream().collect(toList()); assertThat(actual, hasSize(expected.size())); @@ -308,8 +327,9 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag } } - @Test - public void testBoundsAndOrder() { + @ParameterizedTest() + @ValueSource(booleans = {true, false}) + public void testBoundsAndOrder(boolean readOnly) { ColumnType string = ColumnType.STRING; ColumnType int32 = ColumnType.INT32; @@ -341,65 +361,67 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag // Test without bounds. assertThat( - scan(index1, null, null, 0), + scan(index1, null, null, 0, readOnly), contains(val1080, val1090, val2080, val2090) ); assertThat( - scan(index2, null, null, 0), + scan(index2, null, null, 0, readOnly), contains(val1090, val1080, val2090, val2080) ); // Lower bound exclusive. assertThat( - scan(index1, prefix(index1, "10"), null, GREATER), + scan(index1, prefix(index1, "10"), null, GREATER, readOnly), contains(val2080, val2090) ); assertThat( - scan(index2, prefix(index2, "10"), null, GREATER), + scan(index2, prefix(index2, "10"), null, GREATER, readOnly), contains(val2090, val2080) ); // Lower bound inclusive. assertThat( - scan(index1, prefix(index1, "10"), null, GREATER_OR_EQUAL), + scan(index1, prefix(index1, "10"), null, GREATER_OR_EQUAL, readOnly), contains(val1080, val1090, val2080, val2090) ); assertThat( - scan(index2, prefix(index2, "10"), null, GREATER_OR_EQUAL), + scan(index2, prefix(index2, "10"), null, GREATER_OR_EQUAL, readOnly), contains(val1090, val1080, val2090, val2080) ); // Upper bound exclusive. assertThat( - scan(index1, null, prefix(index1, "20"), LESS), + scan(index1, null, prefix(index1, "20"), LESS, readOnly), contains(val1080, val1090) ); assertThat( - scan(index2, null, prefix(index2, "20"), LESS), + scan(index2, null, prefix(index2, "20"), LESS, readOnly), contains(val1090, val1080) ); // Upper bound inclusive. assertThat( - scan(index1, null, prefix(index1, "20"), LESS_OR_EQUAL), + scan(index1, null, prefix(index1, "20"), LESS_OR_EQUAL, readOnly), contains(val1080, val1090, val2080, val2090) ); assertThat( - scan(index2, null, prefix(index2, "20"), LESS_OR_EQUAL), + scan(index2, null, prefix(index2, "20"), LESS_OR_EQUAL, readOnly), contains(val1090, val1080, val2090, val2080) ); } /** - * Tests that an empty range is returned if {@link SortedIndexStorage#scan} method is called using overlapping keys. + * Tests that an empty range is returned if {@link SortedIndexStorage#scan} and {@link SortedIndexStorage#readOnlyScan} methods + * are called using overlapping keys. */ - @Test - void testEmptyRange() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testEmptyRange(boolean readOnly) { List<ColumnParams> indexSchema = shuffledRandomColumnParams(); SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, indexSchema); @@ -416,18 +438,20 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag put(indexStorage, entry1); put(indexStorage, entry2); - try (Cursor<IndexRow> cursor = indexStorage.scan( + try (Cursor<IndexRow> cursor = getIndexRowCursor( + indexStorage, entry2.prefix(indexSchema.size()).prefix(), entry1.prefix(indexSchema.size()).prefix(), - 0 + 0, + readOnly )) { assertThat(cursor.stream().collect(toList()), is(empty())); } } @ParameterizedTest - @VariableSource("ALL_TYPES_COLUMN_PARAMS") - void testNullValues(ColumnParams columnParams) { + @MethodSource("allTypesColumnParamsAndReadOnly") + void testNullValues(ColumnParams columnParams, boolean readOnly) { SortedIndexStorage storage = createIndexStorage(INDEX_NAME, List.of(columnParams)); TestIndexRow entry1 = TestIndexRow.randomRow(storage, TEST_PARTITION); @@ -449,10 +473,12 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag entry1 = t; } - try (Cursor<IndexRow> cursor = storage.scan( + try (Cursor<IndexRow> cursor = getIndexRowCursor( + storage, entry1.prefix(1).prefix(), entry2.prefix(1).prefix(), - GREATER_OR_EQUAL | LESS_OR_EQUAL + GREATER_OR_EQUAL | LESS_OR_EQUAL, + readOnly )) { assertThat( cursor.stream().map(row -> row.indexColumns().byteBuffer()).collect(toList()), @@ -464,8 +490,9 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag /** * Checks simple scenarios for a scanning cursor. */ - @Test - void testScanSimple() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testScanSimple(boolean readOnly) { SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, ColumnType.INT32); BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); @@ -476,7 +503,7 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag // Checking without borders. assertThat( - scan(indexStorage, null, null, 0, AbstractSortedIndexStorageTest::firstArrayElement), + scan(indexStorage, null, null, 0, AbstractSortedIndexStorageTest::firstArrayElement, readOnly), contains(0, 1, 2, 3, 4) ); @@ -487,7 +514,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(0), serializer.serializeRowPrefix(4), (GREATER_OR_EQUAL | LESS_OR_EQUAL), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(0, 1, 2, 3, 4) ); @@ -498,7 +526,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(0), serializer.serializeRowPrefix(4), (GREATER_OR_EQUAL | LESS), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(0, 1, 2, 3) ); @@ -509,7 +538,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(0), serializer.serializeRowPrefix(4), (GREATER | LESS_OR_EQUAL), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(1, 2, 3, 4) ); @@ -520,7 +550,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(0), serializer.serializeRowPrefix(4), (GREATER | LESS), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(1, 2, 3) ); @@ -532,7 +563,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(1), null, (GREATER_OR_EQUAL | LESS_OR_EQUAL), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(1, 2, 3, 4) ); @@ -543,7 +575,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(1), null, (GREATER_OR_EQUAL | LESS), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(1, 2, 3, 4) ); @@ -554,7 +587,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(1), null, (GREATER | LESS_OR_EQUAL), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(2, 3, 4) ); @@ -565,7 +599,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag serializer.serializeRowPrefix(1), null, (GREATER | LESS), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(2, 3, 4) ); @@ -577,7 +612,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag null, serializer.serializeRowPrefix(3), (GREATER_OR_EQUAL | LESS_OR_EQUAL), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(0, 1, 2, 3) ); @@ -588,7 +624,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag null, serializer.serializeRowPrefix(3), (GREATER_OR_EQUAL | LESS), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(0, 1, 2) ); @@ -599,7 +636,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag null, serializer.serializeRowPrefix(3), (GREATER | LESS_OR_EQUAL), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(0, 1, 2, 3) ); @@ -610,7 +648,8 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag null, serializer.serializeRowPrefix(3), (GREATER | LESS), - AbstractSortedIndexStorageTest::firstArrayElement + AbstractSortedIndexStorageTest::firstArrayElement, + readOnly ), contains(0, 1, 2) ); @@ -639,6 +678,24 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag assertThrows(NoSuchElementException.class, scan::next); } + @Test + void testReadOnlyScanContractUpdateAfterScan() { + SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, ColumnType.INT32); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + // Put before scan, should be visible. + put(indexStorage, serializer.serializeRow(new Object[]{0}, new RowId(TEST_PARTITION))); + + Cursor<IndexRow> scan = indexStorage.readOnlyScan(null, null, 0); + + // Put after scan, should not be visible. + put(indexStorage, serializer.serializeRow(new Object[]{1}, new RowId(TEST_PARTITION))); + + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + assertFalse(scan::hasNext); + } + @Test void testScanContractAddRowAfterInvokeHasNext() { SortedIndexStorage indexStorage = createIndexStorage(INDEX_NAME, ColumnType.INT32); @@ -1458,21 +1515,26 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag SortedIndexStorage index, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, - @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags + @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags, + boolean readOnly ) { - return scan(index, lowerBound, upperBound, flags, identity()); + return scan(index, lowerBound, upperBound, flags, identity(), readOnly); } private static <T> List<T> scan( - SortedIndexStorage index, + SortedIndexStorage storage, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags, - Function<Object[], T> mapper + Function<Object[], T> mapper, + boolean readOnly ) { - var serializer = new BinaryTupleRowSerializer(index.indexDescriptor()); + var serializer = new BinaryTupleRowSerializer(storage.indexDescriptor()); - try (Cursor<IndexRow> cursor = index.scan(lowerBound, upperBound, flags)) { + try (Cursor<IndexRow> cursor = readOnly + ? storage.readOnlyScan(lowerBound, upperBound, flags) + : storage.scan(lowerBound, upperBound, flags) + ) { return cursor.stream() .map(serializer::deserializeColumns) .map(mapper) @@ -1480,6 +1542,27 @@ public abstract class AbstractSortedIndexStorageTest extends AbstractIndexStorag } } + private static Cursor<IndexRow> getIndexRowCursor( + SortedIndexStorage indexStorage, + BinaryTuplePrefix lowerBound, + BinaryTuplePrefix upperBound, + int flags, + boolean readOnly + ) { + return readOnly + ? indexStorage.readOnlyScan(lowerBound, upperBound, flags) + : indexStorage.scan(lowerBound, upperBound, flags); + } + + private static Stream<Arguments> allTypesColumnParamsAndReadOnly() { + return Stream.concat( + ALL_TYPES_COLUMN_PARAMS.stream() + .map(param -> Arguments.of(param, false)), + ALL_TYPES_COLUMN_PARAMS.stream() + .map(param -> Arguments.of(param, true)) + ); + } + private static <T> Function<IndexRow, T> firstColumn(BinaryTupleRowSerializer serializer) { return indexRow -> (T) serializer.deserializeColumns(indexRow)[0]; } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java index 07ef21ce25..912da5301e 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta; import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.CursorUtils; import org.jetbrains.annotations.Nullable; /** @@ -178,6 +179,27 @@ public class PageMemorySortedIndexStorage extends AbstractPageMemoryIndexStorage }); } + @Override + public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) { + return busyDataRead(() -> { + throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo); + + boolean includeLower = (flags & GREATER_OR_EQUAL) != 0; + boolean includeUpper = (flags & LESS_OR_EQUAL) != 0; + + SortedIndexRowKey lower = createBound(lowerBound, !includeLower); + SortedIndexRowKey upper = createBound(upperBound, includeUpper); + + try { + Cursor<SortedIndexRow> cursor = indexTree.find(lower, upper); + + return CursorUtils.map(cursor, this::toIndexRowImpl); + } catch (IgniteInternalCheckedException e) { + throw new StorageException("Couldn't get index tree cursor", e); + } + }); + } + private @Nullable SortedIndexRowKey createBound(@Nullable BinaryTuplePrefix bound, boolean setEqualityFlag) { if (bound == null) { return null; diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java index 0b4ca1ed02..07ab97f036 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java @@ -23,8 +23,10 @@ import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PAR import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE; import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; import java.nio.ByteBuffer; +import java.util.NoSuchElementException; import java.util.function.Function; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; import org.apache.ignite.internal.rocksdb.ColumnFamily; @@ -41,7 +43,10 @@ import org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper; import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage; import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatchWithIndex; @@ -164,38 +169,106 @@ public class RocksDbSortedIndexStorage extends AbstractRocksDbIndexStorage imple boolean includeUpper, Function<ByteBuffer, T> mapper ) { - byte[] lowerBoundBytes; + byte[] lowerBoundBytes = getBound(lowerBound, partitionStartPrefix, !includeLower); - if (lowerBound == null) { - lowerBoundBytes = partitionStartPrefix; - } else { - lowerBoundBytes = rocksPrefix(lowerBound); + byte[] upperBoundBytes = getBound(upperBound, partitionEndPrefix, includeUpper); - // Skip the lower bound, if needed (RocksDB includes the lower bound by default). - if (!includeLower) { - setEqualityFlag(lowerBoundBytes); + return new UpToDatePeekCursor<>(upperBoundBytes, indexCf, lowerBoundBytes) { + @Override + protected T map(ByteBuffer byteBuffer) { + return mapper.apply(byteBuffer); } - } + }; + } + + @Override + public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) { + return busyDataRead(() -> { + throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo); + + boolean includeLower = (flags & GREATER_OR_EQUAL) != 0; + boolean includeUpper = (flags & LESS_OR_EQUAL) != 0; + + byte[] lowerBoundBytes = getBound(lowerBound, partitionStartPrefix, !includeLower); + byte[] upperBoundBytes = getBound(upperBound, partitionEndPrefix, includeUpper); + + Slice upperBoundSlice = new Slice(upperBoundBytes); + + ReadOptions readOptions = new ReadOptions() + .setIterateUpperBound(upperBoundSlice); + + RocksIterator iterator = indexCf.newIterator(readOptions); + iterator.seek(lowerBoundBytes); + + return new Cursor<IndexRow>() { + private final RocksIterator it = iterator; - byte[] upperBoundBytes; + private byte[] key; - if (upperBound == null) { - upperBoundBytes = partitionEndPrefix; + private boolean advance; + + @Override + public void close() { + try { + closeAll(it, readOptions, upperBoundSlice); + } catch (Exception e) { + throw new StorageException("Error closing RocksDB RO cursor", e); + } + } + + @Override + public boolean hasNext() { + return busyDataRead(this::advanceIfNeededBusy); + } + + @Override + public IndexRow next() { + return busyDataRead(() -> { + if (!advanceIfNeededBusy()) { + throw new NoSuchElementException(); + } + + advance = true; + + return decodeRow((ByteBuffer.wrap(key).order(KEY_BYTE_ORDER))); + }); + } + + private boolean advanceIfNeededBusy() throws StorageException { + throwExceptionIfStorageInProgressOfRebalance(state.get(), () -> createStorageInfo()); + + if (advance) { + it.next(); + advance = false; + } + + if (!it.isValid()) { + return false; + } + + key = it.key(); + + return true; + } + }; + }); + } + + private byte[] getBound(@Nullable BinaryTuplePrefix bound, byte[] partitionPrefix, boolean changeBoundIncluded) { + byte[] boundBytes; + + if (bound == null) { + boundBytes = partitionPrefix; } else { - upperBoundBytes = rocksPrefix(upperBound); + boundBytes = rocksPrefix(bound); - // Include the upper bound, if needed (RocksDB excludes the upper bound by default). - if (includeUpper) { - setEqualityFlag(upperBoundBytes); + // RocksDB excludes upper and includes lower by default), set flag to change. + if (changeBoundIncluded) { + setEqualityFlag(boundBytes); } } - return new UpToDatePeekCursor<>(upperBoundBytes, indexCf, lowerBoundBytes) { - @Override - protected T map(ByteBuffer byteBuffer) { - return mapper.apply(byteBuffer); - } - }; + return boundBytes; } private static void setEqualityFlag(byte[] prefix) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 0950952adf..f8a9090320 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1370,7 +1370,7 @@ public class PartitionReplicaListener implements ReplicaListener { int flags = request.flags(); Cursor<IndexRow> cursor = remotelyTriggeredResourceRegistry.<CursorResource>register(cursorId, request.coordinatorId(), - () -> new CursorResource(indexStorage.scan( + () -> new CursorResource(indexStorage.readOnlyScan( lowerBound, upperBound, flags