rpuch commented on code in PR #3414:
URL: https://github.com/apache/ignite-3/pull/3414#discussion_r1528587642


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -101,13 +115,17 @@ public void close() {
     /**
      * Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow, 
UUID, int, int, int)} (write intent).
      *
+     * <p>NOTE: When updating a low watermark, the index storages that were 
returned from the method may begin to be destroyed, such a
+     * situation should occur by the calling code.</p>

Review Comment:
   ```suggestion
        * situation should be handled by the calling code.</p>
   ```



##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -350,7 +350,7 @@ public class IgniteImpl implements Ignite {
 
     private final ClockWaiter clockWaiter;
 
-    private final LowWatermark lowWatermark;
+    private final LowWatermarkImpl lowWatermarkImpl;

Review Comment:
   ```suggestion
       private final LowWatermarkImpl lowWatermark;
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandler.java:
##########
@@ -247,13 +249,52 @@ private TableSchemaAwareIndexStorage[] 
indexesSnapshot(@Nullable List<Integer> i
      * @throws StorageException If failed to set the row ID.
      */
     public void setNextRowIdToBuildIndex(int indexId, @Nullable RowId rowId) {
+        TableSchemaAwareIndexStorage indexStorage = 
indexStorageById().get(indexId);
+
+        // We assume that if the index is missing in indexStorageById, then it 
has begun to be destroyed and we do not need it.
+        if (indexStorage != null) {
+            setNextRowIdToBuildToIndex(indexStorage, rowId);
+        }
+    }
+
+    private Map<Integer, TableSchemaAwareIndexStorage> indexStorageById() {
         Map<Integer, TableSchemaAwareIndexStorage> indexStorageById = 
indexes.get();
 
-        TableSchemaAwareIndexStorage indexStorage = 
indexStorageById.get(indexId);
+        assert !indexStorageById.isEmpty();
 
-        if (indexStorage != null) {
-            // TODO: IGNITE-21514 Handle index destruction
-            indexStorage.storage().setNextRowIdToBuild(rowId);
+        return indexStorageById;
+    }
+
+    private static void putToIndex(TableSchemaAwareIndexStorage indexStorage, 
BinaryRow binaryRow, RowId rowId) {
+        try {
+            indexStorage.put(binaryRow, rowId);
+        } catch (StorageDestroyedException ignore) {
+            // Index is in the process of being destroyed, which means there 
is no need to write to it.

Review Comment:
   How about logging this attempt to write to a destroyed index at debug()? 
Might be useful



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -17,264 +17,29 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.failure.FailureContext;
-import org.apache.ignite.internal.failure.FailureProcessor;
-import org.apache.ignite.internal.hlc.HybridClock;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.manager.IgniteComponent;
-import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Class to manage the low watermark.
- *
- * <p>Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
+ * Low watermark is the node's local time, which ensures that read-only 
transactions have completed by this time, and new read-only
  * transactions will only be created after this time, and we can safely delete 
obsolete/garbage data such as: obsolete versions of table
  * rows, remote indexes, remote tables, etc.
  *
  * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol";>IEP-91</a>
  */
-public class LowWatermark implements IgniteComponent {
-    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermark.class);
-
-    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
-
-    private final LowWatermarkConfiguration lowWatermarkConfig;
-
-    private final HybridClock clock;
-
-    private final TxManager txManager;
-
-    private final VaultManager vaultManager;
-
-    private final List<LowWatermarkChangedListener> updateListeners = new 
CopyOnWriteArrayList<>();
-
-    private final ScheduledExecutorService scheduledThreadPool;
-
-    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
-    private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-    private volatile @Nullable HybridTimestamp lowWatermark;
-
-    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
-
-    private final FailureProcessor failureProcessor;
-
-    /**
-     * Constructor.
-     *
-     * @param nodeName Node name.
-     * @param lowWatermarkConfig Low watermark configuration.
-     * @param clock A hybrid logical clock.
-     * @param txManager Transaction manager.
-     * @param vaultManager Vault manager.
-     * @param failureProcessor Failure processor tha is used to handle 
critical errors.
-     */
-    public LowWatermark(
-            String nodeName,
-            LowWatermarkConfiguration lowWatermarkConfig,
-            HybridClock clock,
-            TxManager txManager,
-            VaultManager vaultManager,
-            FailureProcessor failureProcessor
-    ) {
-        this.lowWatermarkConfig = lowWatermarkConfig;
-        this.clock = clock;
-        this.txManager = txManager;
-        this.vaultManager = vaultManager;
-        this.failureProcessor = failureProcessor;
-
-        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
-                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
-        );
-    }
-
-    /**
-     * Starts the watermark manager.
-     */
-    @Override
-    public CompletableFuture<Void> start() {
-        inBusyLock(busyLock, () -> {
-            lowWatermark = readLowWatermarkFromVault();
-        });
-
-        return nullCompletedFuture();
-    }
+public interface LowWatermark {
+    /** Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet. */
+    @Nullable HybridTimestamp getLowWatermark();
 
     /**
-     * Schedule watermark updates.
+     * Gets a low watermark that will not change until the consumer is 
executed, {@code null} means no low watermark has been assigned yet.
      */
-    public void scheduleUpdates() {
-        inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermarkCandidate = lowWatermark;
-
-            if (lowWatermarkCandidate == null) {
-                LOG.info("Previous value of the low watermark was not found, 
will schedule to update it");
-
-                scheduleUpdateLowWatermarkBusy();
-
-                return;
-            }
-
-            LOG.info("Low watermark has been scheduled to be updated: {}", 
lowWatermarkCandidate);
-
-            txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
-                    .whenComplete((unused, throwable) -> {
-                        if (throwable == null) {
-                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                        } else if (!(throwable instanceof 
NodeStoppingException)) {
-                            LOG.error("Error during the Watermark manager 
start", throwable);
-
-                            failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
-
-                            inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                        }
-                    });
-        });
-    }
-
-    private @Nullable HybridTimestamp readLowWatermarkFromVault() {
-        VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
-
-        return vaultEntry == null ? null : 
ByteUtils.fromBytes(vaultEntry.value());
-    }
-
-    @Override
-    public void stop() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
-        }
-
-        busyLock.block();
-
-        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
-
-        if (lastScheduledTaskFuture != null) {
-            lastScheduledTaskFuture.cancel(true);
-        }
-
-        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
-    }
-
-    /**
-     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
-     */
-    public @Nullable HybridTimestamp getLowWatermark() {
-        return lowWatermark;
-    }
-
-    CompletableFuture<Void> updateLowWatermark() {
-        return inBusyLock(busyLock, () -> {
-            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
-
-            // Wait until all the read-only transactions are finished before 
the new candidate, since no new RO transactions could be
-            // created, then we can safely promote the candidate as a new low 
watermark, store it in vault, and we can safely start cleaning
-            // up the stale/junk data in the tables.
-            return txManager.updateLowWatermark(lowWatermarkCandidate)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
-                        vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate));
-
-                        lowWatermark = lowWatermarkCandidate;
-
-                        return notifyListeners(lowWatermarkCandidate);
-                    }), scheduledThreadPool)
-                    .whenComplete((unused, throwable) -> {
-                        if (throwable != null) {
-                            if (!(throwable instanceof NodeStoppingException)) 
{
-                                LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, lowWatermarkCandidate);
-
-                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                            }
-                        } else {
-                            LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
-
-                            scheduleUpdateLowWatermarkBusy();
-                        }
-                    });
-        });
-    }
-
-    public void addUpdateListener(LowWatermarkChangedListener listener) {
-        updateListeners.add(listener);
-    }
-
-    public void removeUpdateListener(LowWatermarkChangedListener listener) {
-        updateListeners.remove(listener);
-    }
-
-    private CompletableFuture<Void> notifyListeners(HybridTimestamp 
lowWatermark) {
-        if (updateListeners.isEmpty()) {
-            return nullCompletedFuture();
-        }
-
-        ArrayList<CompletableFuture<?>> res = new ArrayList<>();
-        for (LowWatermarkChangedListener updateListener : updateListeners) {
-            res.add(updateListener.onLwmChanged(lowWatermark));
-        }
-
-        return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new));
-    }
-
-    private void scheduleUpdateLowWatermarkBusy() {
-        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
-
-        assert previousScheduledFuture == null || 
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
-
-        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
-                this::updateLowWatermark,
-                lowWatermarkConfig.updateFrequency().value(),
-                TimeUnit.MILLISECONDS
-        );
-
-        boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);
-
-        assert casResult : "only one scheduled task is expected";
-    }
-
-    HybridTimestamp createNewLowWatermarkCandidate() {
-        HybridTimestamp now = clock.now();
-
-        HybridTimestamp lowWatermarkCandidate = now.addPhysicalTime(
-                -lowWatermarkConfig.dataAvailabilityTime().value() - 
getMaxClockSkew()
-        );
-
-        HybridTimestamp lowWatermark = this.lowWatermark;
-
-        assert lowWatermark == null || 
lowWatermarkCandidate.compareTo(lowWatermark) > 0 :
-                "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" + 
lowWatermarkCandidate;
+    void getLowWatermarkSafe(Consumer<@Nullable HybridTimestamp> consumer);

Review Comment:
   I think a better name can be invented. This `safe` is pretty vague. How 
about `withPinnedLowWatermark(Consumer<...>)`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -101,13 +115,17 @@ public void close() {
     /**
      * Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow, 
UUID, int, int, int)} (write intent).
      *
+     * <p>NOTE: When updating a low watermark, the index storages that were 
returned from the method may begin to be destroyed, such a
+     * situation should occur by the calling code.</p>
+     *
      * <p>Index selection algorithm:</p>
      * <ul>
      *     <li>If the index in the snapshot catalog version is in status 
{@link CatalogIndexStatus#BUILDING},
-     *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING}.</li>
+     *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING} and not removed in latest catalog version.</li>
      *     <li>If the index in status {@link CatalogIndexStatus#REGISTERED} 
and it is in this status on the active version of the catalog
-     *     for {@code beginTs}.</li>
-     *     <li>For a read-only index, if {@code beginTs} is strictly less than 
the activation time of dropping the index.</li>
+     *     for {@code beginTs} and not removed in latest catalog version.</li>

Review Comment:
   ```suggestion
        *     for {@code beginTs} and not removed in the latest catalog 
version.</li>
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
 public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 0;
 
+    private static final int INDEX_ID = 1;
+
     @Test
     void testBuildIndex() {
         TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
 
-        int indexId = 1;
-
-        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
-
-        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
 
         BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
         BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1), 
row1.rowId().increment());
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1), 
row1.rowId().increment());
 
         verify(indexStorage).put(row0.binaryRow(), row0.rowId());
         verify(indexStorage).put(row1.binaryRow(), row1.rowId());
 
         
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
 
         // Let's check one more batch - it will be the finishing one.
-        clearInvocations(indexes, indexStorage);
+        clearInvocations(indexStorage);
 
         BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
 
         verify(indexStorage).put(row2.binaryRow(), row2.rowId());
 
         verify(indexStorage.storage()).setNextRowIdToBuild(null);
     }
 
+    @Test
+    void testAddToIndexesOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId, 
List.of(INDEX_ID, INDEX_ID + 1)));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexesWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(
+                StorageException.class,
+                () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class), 
new RowId(PARTITION_ID), List.of(INDEX_ID))
+        );
+    }
+
+    @Test
+    void testAddToIndexOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID));
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID + 1));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID), 
INDEX_ID));
+    }
+
+    @Test
+    void testBuildIndexOnDestroyIndex() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+        
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+        var binaryRowAndRowId = new BinaryRowAndRowId(row, rowId);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID, 
Stream.of(binaryRowAndRowId), rowId));
+        assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID + 1, 
Stream.of(binaryRowAndRowId), rowId));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+        verify(storage).setNextRowIdToBuild(eq(rowId));
+    }
+
+    @Test
+    void testBuildIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var rowId = new RowId(PARTITION_ID);
+        var binaryRowAndRowId = new BinaryRowAndRowId(mock(BinaryRow.class), 
rowId);
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+
+        IndexStorage storage = indexStorage.storage();
+
+        doNothing().when(indexStorage).put(any(), any());
+        
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+    }
+
+    @Test
+    void testGetNextRowIdToBuildIndexOnDestroyIndex() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        
doThrow(StorageDestroyedException.class).when(storage).getNextRowIdToBuild();
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+        assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID + 1));
+
+        verify(storage).getNextRowIdToBuild();
+    }
+
+    @Test
+    void testGetNextRowIdToBuildIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        doThrow(StorageException.class).when(storage).getNextRowIdToBuild();
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+    }
+
+    @Test
+    void testSetNextRowIdToBuildIndexOnDestroyIndex() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+        var rowId = new RowId(PARTITION_ID);
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertDoesNotThrow(() -> 
indexUpdateHandler.setNextRowIdToBuildIndex(INDEX_ID, rowId));
+        assertDoesNotThrow(() -> 
indexUpdateHandler.setNextRowIdToBuildIndex(INDEX_ID + 1, rowId));
+
+        verify(storage).setNextRowIdToBuild(eq(rowId));
+    }
+
+    @Test
+    void testSetNextRowIdToBuildIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+        var rowId = new RowId(PARTITION_ID);
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.setNextRowIdToBuildIndex(INDEX_ID, rowId));
+    }
+
+    @Test
+    void testTryRemoveFromIndexesOnDestroyIndex() {

Review Comment:
   ```suggestion
       void testTryRemoveFromIndexesOnDestroyedIndex() {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -238,21 +260,36 @@ private static List<Integer> 
mergeWithoutDuplicates(List<Integer> l0, List<Integ
     private void addListenersBusy() {
         catalogService.listen(INDEX_CREATE, 
fromConsumer(this::onIndexCreated));
         catalogService.listen(INDEX_REMOVED, 
fromConsumer(this::onIndexRemoved));
+
+        lowWatermark.addUpdateListener(this::onLwmChanged);
     }
 
     private void onIndexRemoved(RemoveIndexEventParameters parameters) {
         inBusyLock(busyLock, () -> {
             int indexId = parameters.indexId();
             int catalogVersion = parameters.catalogVersion();
 
-            CatalogIndexDescriptor index = indexBusy(indexId, catalogVersion - 
1);
+            lowWatermark.getLowWatermarkSafe(lwm -> {
+                int lwmCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
 
-            if (index.status() == AVAILABLE) {
-                // On drop table event.
-                readOnlyIndexes.add(new ReadOnlyIndexInfo(index, 
catalogActivationTimestampBusy(catalogVersion)));
-            } else if (index.status() == STOPPING) {
-                readOnlyIndexes.add(new ReadOnlyIndexInfo(index, 
findStoppingActivationTsBusy(indexId, catalogVersion - 1)));
-            }
+                if (catalogVersion <= lwmCatalogVersion) {
+                    // There is no need to add a read-only indexes, since the 
index should be destroyed under the updated low watermark.
+                    tableVersionByIndexId.remove(indexId);
+                } else {
+                    CatalogIndexDescriptor index = indexBusy(indexId, 
catalogVersion - 1);
+
+                    if (index.status() == AVAILABLE) {
+                        // On drop table event.
+                        readOnlyIndexes.add(new ReadOnlyIndexInfo(index, 
catalogActivationTimestampBusy(catalogVersion), catalogVersion));
+                    } else if (index.status() == STOPPING) {
+                        readOnlyIndexes.add(
+                                new ReadOnlyIndexInfo(index, 
findStoppingActivationTsBusy(indexId, catalogVersion - 1), catalogVersion)
+                        );
+                    } else {
+                        tableVersionByIndexId.remove(indexId);

Review Comment:
   Please add a comment what this case means (probably, it's about an index 
that is dropped before even becoming available?)



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -138,11 +156,15 @@ public List<IndexIdAndTableVersion> chooseForAddWrite(int 
catalogVersion, int ta
     /**
      * Collect indexes for {@link PartitionAccess#addWriteCommitted(RowId, 
BinaryRow, HybridTimestamp, int)} (write committed only).
      *
+     * <p>NOTE: When updating a low watermark, the index storages that were 
returned from the method may begin to be destroyed, such a
+     * situation should occur by the calling code.</p>

Review Comment:
   ```suggestion
        * situation should be handled by the calling code.</p>
   ```



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/distributed/TestLowWatermarkImpl.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/** Test implementation. */
+public class TestLowWatermarkImpl implements LowWatermark {

Review Comment:
   ```suggestion
   public class TestLowWatermark implements LowWatermark {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -274,41 +311,56 @@ private long catalogActivationTimestampBusy(int 
catalogVersion) {
         return catalog.time();
     }
 
+    // TODO: IGNITE-21771 Deal with catalog compaction
     private void recoverStructuresBusy() {
         int earliestCatalogVersion = catalogService.earliestCatalogVersion();
         int latestCatalogVersion = catalogService.latestCatalogVersion();
+        int lwnCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
 
-        var readOnlyIndexById = new HashMap<Integer, ReadOnlyIndexInfo>();
-        var previousCatalogVersionTableIds = Set.<Integer>of();
-        var tableVersionById = new HashMap<Integer, Integer>();
+        var tableVersionByIndexId = new HashMap<Integer, Integer>();
+        var readOnlyIndexes = new HashSet<ReadOnlyIndexInfo>();
+
+        var stoppingActivationTsByIndexId = new HashMap<Integer, Long>();
+        var previousCatalogVersionIndexIds = Set.<Integer>of();
 
-        // TODO: IGNITE-21514 Deal with catalog compaction
         for (int catalogVersion = earliestCatalogVersion; catalogVersion <= 
latestCatalogVersion; catalogVersion++) {
-            long activationTs = catalogActivationTimestampBusy(catalogVersion);
             int finalCatalogVersion = catalogVersion;
 
-            for (CatalogIndexDescriptor index : 
catalogService.indexes(catalogVersion)) {
-                tableVersionById.computeIfAbsent(index.id(), i -> 
tableVersionBusy(index, finalCatalogVersion));
+            var indexIds = new HashSet<Integer>();
+
+            catalogService.indexes(finalCatalogVersion).forEach(index -> {
+                tableVersionByIndexId.computeIfAbsent(index.id(), i -> 
tableVersionBusy(index, finalCatalogVersion));
 
                 if (index.status() == STOPPING) {
-                    readOnlyIndexById.computeIfAbsent(index.id(), i -> new 
ReadOnlyIndexInfo(index, activationTs));
+                    stoppingActivationTsByIndexId.computeIfAbsent(index.id(), 
i -> catalogActivationTimestampBusy(finalCatalogVersion));
                 }
-            }
 
-            Set<Integer> currentCatalogVersionTableIds = 
tableIds(catalogVersion);
-
-            // Here we look for indices that transitioned directly from 
AVAILABLE to [deleted] (corresponding to the logical READ_ONLY
-            // state) as such transitions only happen when a table is dropped.
-            difference(previousCatalogVersionTableIds, 
currentCatalogVersionTableIds).stream()
-                    .flatMap(droppedTableId -> 
catalogService.indexes(finalCatalogVersion - 1, droppedTableId).stream())
-                    .filter(index -> index.status() == AVAILABLE)
-                    .forEach(index -> 
readOnlyIndexById.computeIfAbsent(index.id(), i -> new ReadOnlyIndexInfo(index, 
activationTs)));
+                indexIds.add(index.id());
+            });
 
-            previousCatalogVersionTableIds = currentCatalogVersionTableIds;
+            // We are looking for removed indexes.
+            difference(previousCatalogVersionIndexIds, indexIds).stream()
+                    .map(index -> catalogService.index(index, 
finalCatalogVersion - 1))

Review Comment:
   ```suggestion
                       .map(indexId -> catalogService.index(index, 
finalCatalogVersion - 1))
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -274,41 +311,56 @@ private long catalogActivationTimestampBusy(int 
catalogVersion) {
         return catalog.time();
     }
 
+    // TODO: IGNITE-21771 Deal with catalog compaction
     private void recoverStructuresBusy() {
         int earliestCatalogVersion = catalogService.earliestCatalogVersion();
         int latestCatalogVersion = catalogService.latestCatalogVersion();
+        int lwnCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));

Review Comment:
   ```suggestion
           int lwmCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -138,11 +156,15 @@ public List<IndexIdAndTableVersion> chooseForAddWrite(int 
catalogVersion, int ta
     /**
      * Collect indexes for {@link PartitionAccess#addWriteCommitted(RowId, 
BinaryRow, HybridTimestamp, int)} (write committed only).
      *
+     * <p>NOTE: When updating a low watermark, the index storages that were 
returned from the method may begin to be destroyed, such a
+     * situation should occur by the calling code.</p>
+     *
      * <p>Index selection algorithm:</p>
      * <ul>
      *     <li>If the index in the snapshot catalog version is in status 
{@link CatalogIndexStatus#BUILDING},
-     *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING}.</li>
-     *     <li>For a read-only index, if {@code commitTs} is strictly less 
than the activation time of dropping the index.</li>
+     *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING} and not removed in latest catalog version.</li>

Review Comment:
   ```suggestion
        *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING} and not removed in the latest catalog version.</li>
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -101,13 +115,17 @@ public void close() {
     /**
      * Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow, 
UUID, int, int, int)} (write intent).
      *
+     * <p>NOTE: When updating a low watermark, the index storages that were 
returned from the method may begin to be destroyed, such a
+     * situation should occur by the calling code.</p>
+     *
      * <p>Index selection algorithm:</p>
      * <ul>
      *     <li>If the index in the snapshot catalog version is in status 
{@link CatalogIndexStatus#BUILDING},
-     *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING}.</li>
+     *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING} and not removed in latest catalog version.</li>

Review Comment:
   ```suggestion
        *     {@link CatalogIndexStatus#AVAILABLE} or {@link 
CatalogIndexStatus#STOPPING} and not removed in the latest catalog version.</li>
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -238,21 +260,36 @@ private static List<Integer> 
mergeWithoutDuplicates(List<Integer> l0, List<Integ
     private void addListenersBusy() {
         catalogService.listen(INDEX_CREATE, 
fromConsumer(this::onIndexCreated));
         catalogService.listen(INDEX_REMOVED, 
fromConsumer(this::onIndexRemoved));
+
+        lowWatermark.addUpdateListener(this::onLwmChanged);
     }
 
     private void onIndexRemoved(RemoveIndexEventParameters parameters) {
         inBusyLock(busyLock, () -> {
             int indexId = parameters.indexId();
             int catalogVersion = parameters.catalogVersion();
 
-            CatalogIndexDescriptor index = indexBusy(indexId, catalogVersion - 
1);
+            lowWatermark.getLowWatermarkSafe(lwm -> {
+                int lwmCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
 
-            if (index.status() == AVAILABLE) {
-                // On drop table event.
-                readOnlyIndexes.add(new ReadOnlyIndexInfo(index, 
catalogActivationTimestampBusy(catalogVersion)));
-            } else if (index.status() == STOPPING) {
-                readOnlyIndexes.add(new ReadOnlyIndexInfo(index, 
findStoppingActivationTsBusy(indexId, catalogVersion - 1)));
-            }
+                if (catalogVersion <= lwmCatalogVersion) {

Review Comment:
   Here, an index is not tracked anymore if the moment of its **removal** is 
below the LWM. This seems correct, but we can stop tracking the index earlier: 
when it's **dropped**. If it's dropped with its table, it's the same as 
removal; but if it's dropped individually, then the moment of its drop is the 
moment when it becomes STOPPED.
   
   Same thing applies to the recovery logic and the LWM listener.
   
   I'm not sure if this needs to be fixed now. Maybe just create a ticket and 
add TODOs?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
 public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 0;
 
+    private static final int INDEX_ID = 1;
+
     @Test
     void testBuildIndex() {
         TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
 
-        int indexId = 1;
-
-        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
-
-        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
 
         BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
         BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1), 
row1.rowId().increment());
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1), 
row1.rowId().increment());
 
         verify(indexStorage).put(row0.binaryRow(), row0.rowId());
         verify(indexStorage).put(row1.binaryRow(), row1.rowId());
 
         
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
 
         // Let's check one more batch - it will be the finishing one.
-        clearInvocations(indexes, indexStorage);
+        clearInvocations(indexStorage);
 
         BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
 
         verify(indexStorage).put(row2.binaryRow(), row2.rowId());
 
         verify(indexStorage.storage()).setNextRowIdToBuild(null);
     }
 
+    @Test
+    void testAddToIndexesOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId, 
List.of(INDEX_ID, INDEX_ID + 1)));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexesWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(
+                StorageException.class,
+                () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class), 
new RowId(PARTITION_ID), List.of(INDEX_ID))
+        );
+    }
+
+    @Test
+    void testAddToIndexOnDestroyIndexes() {

Review Comment:
   ```suggestion
       void testAddToIndexOnDestroyedIndexes() {
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
 public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 0;
 
+    private static final int INDEX_ID = 1;
+
     @Test
     void testBuildIndex() {
         TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
 
-        int indexId = 1;
-
-        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
-
-        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
 
         BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
         BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1), 
row1.rowId().increment());
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1), 
row1.rowId().increment());
 
         verify(indexStorage).put(row0.binaryRow(), row0.rowId());
         verify(indexStorage).put(row1.binaryRow(), row1.rowId());
 
         
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
 
         // Let's check one more batch - it will be the finishing one.
-        clearInvocations(indexes, indexStorage);
+        clearInvocations(indexStorage);
 
         BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
 
         verify(indexStorage).put(row2.binaryRow(), row2.rowId());
 
         verify(indexStorage.storage()).setNextRowIdToBuild(null);
     }
 
+    @Test
+    void testAddToIndexesOnDestroyIndexes() {

Review Comment:
   ```suggestion
       void testAddToIndexesOnDestroyedIndexes() {
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
 public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 0;
 
+    private static final int INDEX_ID = 1;
+
     @Test
     void testBuildIndex() {
         TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
 
-        int indexId = 1;
-
-        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
-
-        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
 
         BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
         BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1), 
row1.rowId().increment());
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1), 
row1.rowId().increment());
 
         verify(indexStorage).put(row0.binaryRow(), row0.rowId());
         verify(indexStorage).put(row1.binaryRow(), row1.rowId());
 
         
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
 
         // Let's check one more batch - it will be the finishing one.
-        clearInvocations(indexes, indexStorage);
+        clearInvocations(indexStorage);
 
         BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
 
         verify(indexStorage).put(row2.binaryRow(), row2.rowId());
 
         verify(indexStorage.storage()).setNextRowIdToBuild(null);
     }
 
+    @Test
+    void testAddToIndexesOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId, 
List.of(INDEX_ID, INDEX_ID + 1)));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexesWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(
+                StorageException.class,
+                () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class), 
new RowId(PARTITION_ID), List.of(INDEX_ID))
+        );
+    }
+
+    @Test
+    void testAddToIndexOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID));
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID + 1));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID), 
INDEX_ID));
+    }
+
+    @Test
+    void testBuildIndexOnDestroyIndex() {

Review Comment:
   ```suggestion
       void testBuildIndexOnDestroyedIndex() {
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
 public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 0;
 
+    private static final int INDEX_ID = 1;
+
     @Test
     void testBuildIndex() {
         TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
 
-        int indexId = 1;
-
-        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
-
-        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
 
         BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
         BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1), 
row1.rowId().increment());
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1), 
row1.rowId().increment());
 
         verify(indexStorage).put(row0.binaryRow(), row0.rowId());
         verify(indexStorage).put(row1.binaryRow(), row1.rowId());
 
         
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
 
         // Let's check one more batch - it will be the finishing one.
-        clearInvocations(indexes, indexStorage);
+        clearInvocations(indexStorage);
 
         BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
 
         verify(indexStorage).put(row2.binaryRow(), row2.rowId());
 
         verify(indexStorage.storage()).setNextRowIdToBuild(null);
     }
 
+    @Test
+    void testAddToIndexesOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId, 
List.of(INDEX_ID, INDEX_ID + 1)));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexesWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(
+                StorageException.class,
+                () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class), 
new RowId(PARTITION_ID), List.of(INDEX_ID))
+        );
+    }
+
+    @Test
+    void testAddToIndexOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID));
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID + 1));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID), 
INDEX_ID));
+    }
+
+    @Test
+    void testBuildIndexOnDestroyIndex() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+        
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+        var binaryRowAndRowId = new BinaryRowAndRowId(row, rowId);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID, 
Stream.of(binaryRowAndRowId), rowId));
+        assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID + 1, 
Stream.of(binaryRowAndRowId), rowId));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+        verify(storage).setNextRowIdToBuild(eq(rowId));
+    }
+
+    @Test
+    void testBuildIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var rowId = new RowId(PARTITION_ID);
+        var binaryRowAndRowId = new BinaryRowAndRowId(mock(BinaryRow.class), 
rowId);
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+
+        IndexStorage storage = indexStorage.storage();
+
+        doNothing().when(indexStorage).put(any(), any());
+        
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+    }
+
+    @Test
+    void testGetNextRowIdToBuildIndexOnDestroyIndex() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        
doThrow(StorageDestroyedException.class).when(storage).getNextRowIdToBuild();
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+        assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID + 1));
+
+        verify(storage).getNextRowIdToBuild();
+    }
+
+    @Test
+    void testGetNextRowIdToBuildIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        doThrow(StorageException.class).when(storage).getNextRowIdToBuild();
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+    }
+
+    @Test
+    void testSetNextRowIdToBuildIndexOnDestroyIndex() {

Review Comment:
   ```suggestion
       void testSetNextRowIdToBuildIndexOnDestroyedIndex() {
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
 public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 0;
 
+    private static final int INDEX_ID = 1;
+
     @Test
     void testBuildIndex() {
         TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
 
-        int indexId = 1;
-
-        TableIndexStoragesSupplier indexes = 
mock(TableIndexStoragesSupplier.class);
-
-        when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
-        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexes);
+        IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
 
         BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
         BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1), 
row1.rowId().increment());
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1), 
row1.rowId().increment());
 
         verify(indexStorage).put(row0.binaryRow(), row0.rowId());
         verify(indexStorage).put(row1.binaryRow(), row1.rowId());
 
         
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
 
         // Let's check one more batch - it will be the finishing one.
-        clearInvocations(indexes, indexStorage);
+        clearInvocations(indexStorage);
 
         BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class), 
new RowId(PARTITION_ID));
 
-        indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+        indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
 
         verify(indexStorage).put(row2.binaryRow(), row2.rowId());
 
         verify(indexStorage.storage()).setNextRowIdToBuild(null);
     }
 
+    @Test
+    void testAddToIndexesOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId, 
List.of(INDEX_ID, INDEX_ID + 1)));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexesWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(
+                StorageException.class,
+                () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class), 
new RowId(PARTITION_ID), List.of(INDEX_ID))
+        );
+    }
+
+    @Test
+    void testAddToIndexOnDestroyIndexes() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID));
+        assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId, 
INDEX_ID + 1));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+    }
+
+    @Test
+    void testAddToIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID), 
INDEX_ID));
+    }
+
+    @Test
+    void testBuildIndexOnDestroyIndex() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+        IndexStorage storage = indexStorage.storage();
+
+        doThrow(StorageDestroyedException.class).when(indexStorage).put(any(), 
any());
+        
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var row = mock(BinaryRow.class);
+        var rowId = new RowId(PARTITION_ID);
+        var binaryRowAndRowId = new BinaryRowAndRowId(row, rowId);
+
+        assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID, 
Stream.of(binaryRowAndRowId), rowId));
+        assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID + 1, 
Stream.of(binaryRowAndRowId), rowId));
+
+        verify(indexStorage).put(eq(row), eq(rowId));
+        verify(storage).setNextRowIdToBuild(eq(rowId));
+    }
+
+    @Test
+    void testBuildIndexWithStorageException() {
+        TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+        doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+        var indexUpdateHandler = new 
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+        var rowId = new RowId(PARTITION_ID);
+        var binaryRowAndRowId = new BinaryRowAndRowId(mock(BinaryRow.class), 
rowId);
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+
+        IndexStorage storage = indexStorage.storage();
+
+        doNothing().when(indexStorage).put(any(), any());
+        
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+        assertThrows(StorageException.class, () -> 
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+    }
+
+    @Test
+    void testGetNextRowIdToBuildIndexOnDestroyIndex() {

Review Comment:
   ```suggestion
       void testGetNextRowIdToBuildIndexOnDestroyedIndex() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to