ibessonov commented on code in PR #1471:
URL: https://github.com/apache/ignite-3/pull/1471#discussion_r1056289362


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -165,43 +167,65 @@ default IndexStorage getOrCreateIndex(int partitionId, 
UUID indexId) {
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the 
current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the 
partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full 
rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a 
successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for a full rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition 
storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link 
MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#FULL_REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its 
indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link 
StorageFullRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, only 
write methods will be available, while read and

Review Comment:
   Do all write methods need to be functional? I assume there are only two of 
them that are actually useful



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.destroyPartition(getOutConfigRangePartitionId()));

Review Comment:
   `getOutConfigRangePartitionId` is a very confusing name. Maybe you can 
rename it to something like`getPartitionIdOutOfRange`?
   My problem with it is that usually you view the name of the method as "verb 
+ object + something at the end", and "get out" just feels like an 
inappropriately (and incorrectly) used phrasal verb. I blame that on words 
ordering.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain 
chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {
+            @Nullable
+            private Boolean hasNext;
+
+            @Nullable
+            private VersionChain versionChain;
+
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                checkStorageClosedOrInProcessFullRebalance();
 
-        return Cursor.fromBareIterator(
-                Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
-                        .peek(versionChain -> checkClosed())
-                        .map((VersionChain versionChain) -> 
versionChainToReadResult(versionChain, false))
-                        .iterator()
-        );
+                advanceIfNeeded();
+
+                return hasNext;
+            }
+
+            @Override
+            public ReadResult next() {
+                checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Why don't you move this call inside of `advanceIfNeeded()`?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, 
"1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new 
RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = 
mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = 
mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = 
hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = 
sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = 
sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = 
tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> 
getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = 
tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> 
getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a 
non-existing partition.
+        assertDoesNotThrow(() -> 
tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, 
"1"));
+
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
clock.now());
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+            return null;
+        });
 
-        tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        assertSame(partitionStorage, 
tableStorage.getMvPartition(PARTITION_ID));
+        MvPartitionStorage newMvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+        assertThat(getAll(newMvPartitionStorage.scanVersions(rowId)), empty());
     }
 
     @Test
-    public void testFinishRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> 
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testSuccessFullRebalance() throws Exception {

Review Comment:
   Damn, this test is long. Is there a way to simplify it? It's too much, I 
won't even try to follow it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -537,9 +584,50 @@ public synchronized void clear() {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessFullRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of 
a full rebalancing");
+        }
+    }
+
+    void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+
+        lastAppliedIndex = FULL_REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = FULL_REBALANCE_IN_PROGRESS;
+    }
+
+    void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishFullRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   I see no such assertion in "abort" method, why?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, 
"1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new 
RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = 
mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = 
mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = 
hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = 
sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = 
sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = 
tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> 
getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = 
tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> 
getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a 
non-existing partition.
+        assertDoesNotThrow(() -> 
tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));

Review Comment:
   Well, how about using 
`assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());`? We already have this thing in our test framework.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -561,4 +800,61 @@ private void checkStorageDestroyed(IndexStorage storage) {
 
         assertThrows(StorageClosedException.class, () -> 
storage.remove(indexRow));
     }
+
+    private int getOutConfigRangePartitionId() {

Review Comment:
   Same comment about the name



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -464,13 +507,15 @@ public ReadResult next() {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws 
StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();

Review Comment:
   Again, word usage raises some questions. Maybe "in process **of** full 
reblance"



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -229,34 +236,90 @@ public CompletableFuture<Void> destroy() {
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+    public CompletableFuture<Void> startFullRebalancePartition(int 
partitionId) {

Review Comment:
   Why do we need to add "full" everywhere? As if we have a dedicated process 
for non-full rebalance, that is known to the storage? No, we don't have it



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -314,154 +315,392 @@ public void testMisconfiguredIndices() {
     }
 
     @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+    public void testDestroyPartition() throws Exception {
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.destroyPartition(getOutConfigRangePartitionId()));
 
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, 
"1"));
+
+        IndexRow indexRow = indexRow(binaryRow, rowId);
 
-            partitionStorage.lastApplied(100, 10);
+        mvPartitionStorage.runConsistently(() -> {
+            mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
clock.now());
+
+            hashIndexStorage.put(indexRow);
 
-            partitionStorage.committedGroupConfiguration(new 
RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
+            sortedIndexStorage.put(indexRow);
 
             return null;
         });
 
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+        Cursor<ReadResult> scanVersionsCursor = 
mvPartitionStorage.scanVersions(rowId);
+        PartitionTimestampCursor scanTimestampCursor = 
mvPartitionStorage.scan(clock.now());
+
+        Cursor<RowId> getFromHashIndexCursor = 
hashIndexStorage.get(indexRow.indexColumns());
+
+        Cursor<RowId> getFromSortedIndexCursor = 
sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<IndexRow> scanFromSortedIndexCursor = 
sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
-        MvPartitionStorage newPartitionStorage0 = 
tableStorage.getMvPartition(PARTITION_ID);
+        // Let's check that we won't get destroyed storages.
+        assertNull(tableStorage.getMvPartition(PARTITION_ID));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
+        assertThrows(StorageException.class, () -> 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
+        checkStorageDestroyed(mvPartitionStorage);
+        checkStorageDestroyed(hashIndexStorage);
+        checkStorageDestroyed(sortedIndexStorage);
 
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanVersionsCursor));
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanTimestampCursor));
 
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+        assertThrows(StorageClosedException.class, () -> 
getAll(getFromHashIndexCursor));
 
-        MvPartitionStorage newPartitionStorage1 = 
tableStorage.getMvPartition(PARTITION_ID);
+        assertThrows(StorageClosedException.class, () -> 
getAll(getFromSortedIndexCursor));
+        assertThrows(StorageClosedException.class, () -> 
getAll(scanFromSortedIndexCursor));
 
-        assertSame(newPartitionStorage0, newPartitionStorage1);
+        // Let's check that nothing will happen if we try to destroy a 
non-existing partition.
+        assertDoesNotThrow(() -> 
tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS));
     }
 
     @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    public void testReCreatePartition() throws Exception {

Review Comment:
   I would add an explicit assumption here that the storage is not volatile, 
otherwise it makes no sense.
   (my that I mean `assumeTrue` or `assumeThat` methods)



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -315,4 +314,51 @@ private RowId getRowId(@Nullable Entry<RowId, ?> 
rowIdEntry) {
             return rowIdEntry == null ? null : rowIdEntry.getKey();
         }
     }
+
+    private void checkStorageClosed() {
+        if (closed) {
+            throw new StorageClosedException("Storage is already closed");
+        }
+    }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+
+        if (fullRebalance) {
+            throw new StorageFullRebalanceException("Storage in the process of 
a full rebalancing");
+        }
+    }
+
+    /**
+     * Starts a full rebalancing for the storage.
+     */
+    public void startFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = true;
+
+        clear();
+    }
+
+    /**
+     * Aborts a full rebalance of the storage.
+     */
+    public void abortFullRebalance() {
+        checkStorageClosed();
+
+        fullRebalance = false;
+
+        clear();
+    }
+
+    /**
+     * Completes a full rebalance of the storage.
+     */
+    public void finishFullRebalance() {
+        checkStorageClosed();
+
+        assert fullRebalance;

Review Comment:
   Same question here



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -110,6 +113,10 @@ public MvPartitionStorage getMvPartition(int partitionId) {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        if (fullRebalanceFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageException("Partition in the process of full 
rebalancing: " + partitionId);

Review Comment:
   Wait, I thought that we discussed it. How is it possible that there's a 
rebalance during destruction? This should not happen.
   Also, where's the documentation of this behavior?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java:
##########
@@ -378,19 +381,59 @@ private static ReadResult walkVersionChain(VersionChain 
chainHead, HybridTimesta
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessFullRebalance();
+
+        return new Cursor<>() {

Review Comment:
   Can it be an inner class, not anonymous?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to