rpuch commented on code in PR #2859:
URL: https://github.com/apache/ignite-3/pull/2859#discussion_r1401915135


##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {

Review Comment:
   'Collector' might be something that collects something to come kind of a 
container, like Collectors related to Java Streams. Here, it seems that we 
rather choose indices. Would it make sense to rename this to `IndexChooser`?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes

Review Comment:
   Do we need some specific methods defined on `ConcurrentSkipListMap`? 
Otherwise, it'd suggest to specify an interface here.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);

Review Comment:
   ```suggestion
       private static final Comparator<CatalogIndexDescriptor> COMPARE_BY_ID = 
comparingInt(CatalogObjectDescriptor::id);
   ```



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+                    for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+                        if (tableIndex.available() && 
!contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addDroppedAvailableIndex(tableIndex, 
nextCatalogVersion);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        droppedAvailableTableIndexes.clear();
+    }
+
+    /**
+     * Collects a list of table indexes that will need to be used for an 
update operation in RW a transaction. List consists of all indexes
+     * (available and registered) on the requested catalog version, as well as 
all dropped available indexes from previous catalog versions.
+     *
+     * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. It 
is expected that the table exists at the time the method is
+     * called.</p>
+     *
+     * @param catalogVersion Catalog version.
+     * @param tableId Table ID.
+     */
+    List<CatalogIndexDescriptor> collectForRwTxOperation(int catalogVersion, 
int tableId) {
+        return inBusyLock(busyLock, () -> {
+            List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+            assert !tableIndexes.isEmpty() : "catalogVersion=" + 
catalogVersion + ", tableId=" + tableId;
+
+            List<CatalogIndexDescriptor> droppedAvailableTableIndexes = 
getDroppedAvailableIndexes(catalogVersion, tableId);
+
+            if (droppedAvailableTableIndexes.isEmpty()) {
+                return tableIndexes;
+            }
+
+            return unmodifiableList(merge(tableIndexes, 
droppedAvailableTableIndexes));
+        });
+    }
+
+    private void addListeners() {
+        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onDropIndex((DropIndexEventParameters) 
parameters).thenApply(unused -> false);
+        });
+
+        catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onTableDrop((DropTableEventParameters) 
parameters).thenApply(unused -> false);
+        });
+    }
+
+    private CompletableFuture<?> onDropIndex(DropIndexEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int previousCatalogVersion = parameters.catalogVersion() - 1;
+
+            CatalogIndexDescriptor droppedIndexDescriptor = 
catalogService.index(parameters.indexId(), previousCatalogVersion);
+
+            assert droppedIndexDescriptor != null : "indexId=" + 
parameters.indexId() + ", catalogVersion=" + previousCatalogVersion;
+
+            if (!droppedIndexDescriptor.available()) {
+                return completedFuture(null);
+            }
+
+            addDroppedAvailableIndex(droppedIndexDescriptor, 
parameters.catalogVersion());
+
+            return completedFuture(null);
+        });
+    }
+
+    private CompletableFuture<?> onTableDrop(DropTableEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            droppedAvailableTableIndexes.entrySet().removeIf(entry -> 
parameters.tableId() == entry.getKey().tableId);
+
+            return completedFuture(null);
+        });
+    }
+
+    /**
+     * Returns a list of dropped available indexes (sorted by {@link 
#INDEX_COMPARATOR}) for the catalog version of interest from
+     * {@link #droppedAvailableTableIndexes}. If there is no list for the 
requested catalog version, the closest previous catalog version
+     * will be returned.
+     */
+    private List<CatalogIndexDescriptor> getDroppedAvailableIndexes(int 
catalogVersion, int tableId) {
+        var key = new TableIdCatalogVersion(tableId, catalogVersion);
+
+        Entry<TableIdCatalogVersion, List<CatalogIndexDescriptor>> entry = 
droppedAvailableTableIndexes.floorEntry(key);
+
+        return entry != null && tableId == entry.getKey().tableId ? 
entry.getValue() : List.of();
+    }
+
+    /**
+     * Adds the dropped available index to {@link 
#droppedAvailableTableIndexes}.
+     *
+     * <p>If the list is missing for the catalog version from the arguments, 
then we create it by merging the indexes from the previous
+     * catalog version and the new index. Otherwise, we simply add to the 
existing list. List are sorted by {@link #INDEX_COMPARATOR}</p>
+     *
+     * @param droppedIndex Drooped index.
+     * @param catalogVersion Catalog version on which the index was dropped.
+     */
+    private void addDroppedAvailableIndex(CatalogIndexDescriptor droppedIndex, 
int catalogVersion) {
+        assert droppedIndex.available() : droppedIndex.id();
+
+        int tableId = droppedIndex.tableId();
+
+        // For now, there is no need to worry about parallel changes to the 
map, it will change on recovery and in catalog event listeners

Review Comment:
   'For now' kinda means that this can change. Is it planned to change the 
behavior (so we'll have to worry about concurrency)?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+                    for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+                        if (tableIndex.available() && 
!contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addDroppedAvailableIndex(tableIndex, 
nextCatalogVersion);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        droppedAvailableTableIndexes.clear();
+    }
+
+    /**
+     * Collects a list of table indexes that will need to be used for an 
update operation in RW a transaction. List consists of all indexes
+     * (available and registered) on the requested catalog version, as well as 
all dropped available indexes from previous catalog versions.
+     *
+     * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. It 
is expected that the table exists at the time the method is
+     * called.</p>
+     *
+     * @param catalogVersion Catalog version.
+     * @param tableId Table ID.
+     */
+    List<CatalogIndexDescriptor> collectForRwTxOperation(int catalogVersion, 
int tableId) {
+        return inBusyLock(busyLock, () -> {
+            List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+            assert !tableIndexes.isEmpty() : "catalogVersion=" + 
catalogVersion + ", tableId=" + tableId;
+
+            List<CatalogIndexDescriptor> droppedAvailableTableIndexes = 
getDroppedAvailableIndexes(catalogVersion, tableId);
+
+            if (droppedAvailableTableIndexes.isEmpty()) {
+                return tableIndexes;
+            }
+
+            return unmodifiableList(merge(tableIndexes, 
droppedAvailableTableIndexes));
+        });
+    }
+
+    private void addListeners() {
+        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onDropIndex((DropIndexEventParameters) 
parameters).thenApply(unused -> false);
+        });
+
+        catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onTableDrop((DropTableEventParameters) 
parameters).thenApply(unused -> false);
+        });
+    }
+
+    private CompletableFuture<?> onDropIndex(DropIndexEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int previousCatalogVersion = parameters.catalogVersion() - 1;
+
+            CatalogIndexDescriptor droppedIndexDescriptor = 
catalogService.index(parameters.indexId(), previousCatalogVersion);
+
+            assert droppedIndexDescriptor != null : "indexId=" + 
parameters.indexId() + ", catalogVersion=" + previousCatalogVersion;
+
+            if (!droppedIndexDescriptor.available()) {
+                return completedFuture(null);
+            }
+
+            addDroppedAvailableIndex(droppedIndexDescriptor, 
parameters.catalogVersion());
+
+            return completedFuture(null);
+        });
+    }
+
+    private CompletableFuture<?> onTableDrop(DropTableEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            droppedAvailableTableIndexes.entrySet().removeIf(entry -> 
parameters.tableId() == entry.getKey().tableId);
+
+            return completedFuture(null);
+        });
+    }
+
+    /**
+     * Returns a list of dropped available indexes (sorted by {@link 
#INDEX_COMPARATOR}) for the catalog version of interest from
+     * {@link #droppedAvailableTableIndexes}. If there is no list for the 
requested catalog version, the closest previous catalog version
+     * will be returned.
+     */
+    private List<CatalogIndexDescriptor> getDroppedAvailableIndexes(int 
catalogVersion, int tableId) {
+        var key = new TableIdCatalogVersion(tableId, catalogVersion);
+
+        Entry<TableIdCatalogVersion, List<CatalogIndexDescriptor>> entry = 
droppedAvailableTableIndexes.floorEntry(key);
+
+        return entry != null && tableId == entry.getKey().tableId ? 
entry.getValue() : List.of();
+    }
+
+    /**
+     * Adds the dropped available index to {@link 
#droppedAvailableTableIndexes}.
+     *
+     * <p>If the list is missing for the catalog version from the arguments, 
then we create it by merging the indexes from the previous
+     * catalog version and the new index. Otherwise, we simply add to the 
existing list. List are sorted by {@link #INDEX_COMPARATOR}</p>

Review Comment:
   ```suggestion
        * catalog version and the new index. Otherwise, we simply add to the 
existing list. Lists are sorted by {@link #INDEX_COMPARATOR}</p>
   ```



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+                    for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+                        if (tableIndex.available() && 
!contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addDroppedAvailableIndex(tableIndex, 
nextCatalogVersion);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        droppedAvailableTableIndexes.clear();
+    }
+
+    /**
+     * Collects a list of table indexes that will need to be used for an 
update operation in RW a transaction. List consists of all indexes
+     * (available and registered) on the requested catalog version, as well as 
all dropped available indexes from previous catalog versions.
+     *
+     * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. It 
is expected that the table exists at the time the method is
+     * called.</p>
+     *
+     * @param catalogVersion Catalog version.
+     * @param tableId Table ID.
+     */
+    List<CatalogIndexDescriptor> collectForRwTxOperation(int catalogVersion, 
int tableId) {
+        return inBusyLock(busyLock, () -> {
+            List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+            assert !tableIndexes.isEmpty() : "catalogVersion=" + 
catalogVersion + ", tableId=" + tableId;
+
+            List<CatalogIndexDescriptor> droppedAvailableTableIndexes = 
getDroppedAvailableIndexes(catalogVersion, tableId);
+
+            if (droppedAvailableTableIndexes.isEmpty()) {
+                return tableIndexes;
+            }
+
+            return unmodifiableList(merge(tableIndexes, 
droppedAvailableTableIndexes));
+        });
+    }
+
+    private void addListeners() {
+        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onDropIndex((DropIndexEventParameters) 
parameters).thenApply(unused -> false);
+        });
+
+        catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onTableDrop((DropTableEventParameters) 
parameters).thenApply(unused -> false);
+        });
+    }
+
+    private CompletableFuture<?> onDropIndex(DropIndexEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int previousCatalogVersion = parameters.catalogVersion() - 1;
+
+            CatalogIndexDescriptor droppedIndexDescriptor = 
catalogService.index(parameters.indexId(), previousCatalogVersion);
+
+            assert droppedIndexDescriptor != null : "indexId=" + 
parameters.indexId() + ", catalogVersion=" + previousCatalogVersion;
+
+            if (!droppedIndexDescriptor.available()) {
+                return completedFuture(null);
+            }
+
+            addDroppedAvailableIndex(droppedIndexDescriptor, 
parameters.catalogVersion());
+
+            return completedFuture(null);
+        });
+    }
+
+    private CompletableFuture<?> onTableDrop(DropTableEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            droppedAvailableTableIndexes.entrySet().removeIf(entry -> 
parameters.tableId() == entry.getKey().tableId);

Review Comment:
   How about adding a comment that we can remove dropped indexes on table drop 
as we need such indexes only for writing, and write operations will be denied 
right after a table drop has been activated?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+                    for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+                        if (tableIndex.available() && 
!contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addDroppedAvailableIndex(tableIndex, 
nextCatalogVersion);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        droppedAvailableTableIndexes.clear();
+    }
+
+    /**
+     * Collects a list of table indexes that will need to be used for an 
update operation in RW a transaction. List consists of all indexes
+     * (available and registered) on the requested catalog version, as well as 
all dropped available indexes from previous catalog versions.
+     *
+     * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. It 
is expected that the table exists at the time the method is
+     * called.</p>
+     *
+     * @param catalogVersion Catalog version.
+     * @param tableId Table ID.
+     */
+    List<CatalogIndexDescriptor> collectForRwTxOperation(int catalogVersion, 
int tableId) {

Review Comment:
   This should choose indexes for writes, so it probably makes sense to include 
this information in the method name, like ...`ForRwWriteOperation`



##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexCollectorTest.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.PK_INDEX_NAME;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.indexId;
+import static 
org.apache.ignite.internal.index.TestIndexManagementUtils.tableId;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
+import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** For {@link IndexCollector} testing. */
+public class IndexCollectorTest extends BaseIgniteAbstractTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final CatalogManager catalogManager = 
createTestCatalogManager(NODE_NAME, clock);
+
+    private IndexCollector indexCollector = new IndexCollector(catalogManager);
+
+    private int tableId;
+
+    private int catalogVersionAfterCreateTable;
+
+    @BeforeEach
+    void setUp() {
+        catalogManager.start();
+
+        createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
+
+        tableId = tableId(catalogManager, TABLE_NAME, clock);

Review Comment:
   It's strange we need clock for getting a table ID. The clock is probably 
used to get the 'latest' version, but this can be done without any clock.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+                    for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+                        if (tableIndex.available() && 
!contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addDroppedAvailableIndex(tableIndex, 
nextCatalogVersion);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        droppedAvailableTableIndexes.clear();
+    }
+
+    /**
+     * Collects a list of table indexes that will need to be used for an 
update operation in RW a transaction. List consists of all indexes
+     * (available and registered) on the requested catalog version, as well as 
all dropped available indexes from previous catalog versions.
+     *
+     * <p>Returned list is sorted by {@link CatalogObjectDescriptor#id()}. It 
is expected that the table exists at the time the method is

Review Comment:
   What does 'at the time' mean? Is it about the table existing in the 
specified catalog version?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);
+
+                    for (CatalogIndexDescriptor tableIndex : tableIndexes) {
+                        if (tableIndex.available() && 
!contains(nextCatalogVersionTableIndexes, tableIndex)) {
+                            addDroppedAvailableIndex(tableIndex, 
nextCatalogVersion);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        droppedAvailableTableIndexes.clear();
+    }
+
+    /**
+     * Collects a list of table indexes that will need to be used for an 
update operation in RW a transaction. List consists of all indexes

Review Comment:
   ```suggestion
        * Collects a list of table indexes that will need to be used for an 
update operation in an RW transaction. The list consists of all indexes
   ```



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();
+                    int nextCatalogVersion = catalogVersion + 1;
+
+                    List<CatalogIndexDescriptor> tableIndexes = 
catalogService.indexes(catalogVersion, tableId);
+
+                    if (tableIndexes.isEmpty()) {
+                        // Table does not exist yet.
+                        continue;
+                    }
+
+                    List<CatalogIndexDescriptor> 
nextCatalogVersionTableIndexes = catalogService.indexes(nextCatalogVersion, 
tableId);
+
+                    assert !nextCatalogVersionTableIndexes.isEmpty()
+                            : String.format("Table should not be dropped: 
[catalogVersion=%s, tableId=%s]", nextCatalogVersion, tableId);

Review Comment:
   Why should not it be dropped here? Is it because `catalogService.tables()` 
only returns non-dropped tables?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {
+        inBusyLock(busyLock, () -> {
+            // It is expected that the methods will be called only on 
recovery, when the deploy of metastore watches has not yet occurred.
+            int earliestCatalogVersion = 
catalogService.earliestCatalogVersion();
+            int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+            // At the moment, we will only use tables from the latest version, 
since so far only replicas for them are started on the node.
+            for (CatalogTableDescriptor table : 
catalogService.tables(latestCatalogVersion)) {
+                for (int catalogVersion = earliestCatalogVersion; 
catalogVersion < latestCatalogVersion; catalogVersion++) {
+                    int tableId = table.id();

Review Comment:
   This seems to belong to the outer loop



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>

Review Comment:
   Please say in a comment that the example talks about indexes of the same 
table. Also, I suggest changing the table ID in the example from 0 to something 
different (that is not repeated anywhere), like 15. Then you can say something 
like this: 'here is an example (all indexes belong to the same table with 
ID=15)...
   
   (15, 1) -> [I1(A)]
   ...
   
   And so on.
   
   Otherwise, 0 might mean the table ID, an index number, a catalog version, 
which makes it more difficult to comprehend what represents what.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexCollector.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.binarySearch;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Comparator.comparingInt;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index collector for various operations, for example for RW transactions. */
+class IndexCollector implements ManuallyCloseable {
+    private static final Comparator<CatalogIndexDescriptor> INDEX_COMPARATOR = 
comparingInt(CatalogObjectDescriptor::id);
+
+    private final CatalogService catalogService;
+
+    /**
+     * Map that, for each key, contains a list of all dropped available table 
indexes (sorted by {@link #INDEX_COMPARATOR}) for all known
+     * catalog versions.
+     *
+     * <p>Let's look at an example, let's say we have the following versions 
of a catalog with indexes:</p>
+     * <pre>
+     *     0: I0(A) I1(A)
+     *     1: IO(A)
+     *     2: I0(A) I2(R) I3(A)
+     *     3: I0(A)
+     *     4: I0(A)
+     * </pre>
+     *
+     * <p>Then the map will have the following values:</p>
+     * <pre>
+     *     (0, 1) -> [I1(A)]
+     *     (0, 3) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Then, when {@link #getDroppedAvailableIndexes(int, int) getting 
dropped available indexes}, we will return the following:</p>
+     * <pre>
+     *     (0, 0) -> []
+     *     (1, 0) -> [I1(A)]
+     *     (2, 0) -> [I1(A)]
+     *     (3, 0) -> [I1(A), I3(A)]
+     *     (4, 0) -> [I1(A), I3(A)]
+     * </pre>
+     *
+     * <p>Updated on {@link #recover() node recovery} and a catalog events 
processing.</p>
+     */
+    private final ConcurrentSkipListMap<TableIdCatalogVersion, 
List<CatalogIndexDescriptor>> droppedAvailableTableIndexes
+            = new ConcurrentSkipListMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    IndexCollector(CatalogService catalogService) {
+        this.catalogService = catalogService;
+
+        addListeners();
+    }
+
+    /** Recovers internal structures on node recovery. */
+    void recover() {

Review Comment:
   The method collects all indexes, including the ones that are dropped (and 
destroyed) long time ago (so we don't need them). Is this going to be addressed 
somehow?
   
   Also, there is no way to remove an index from `roppedAvailableIndexes` after 
it gets destroyed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to