rpuch commented on code in PR #3223:
URL: https://github.com/apache/ignite-3/pull/3223#discussion_r1495370676


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java:
##########
@@ -60,7 +61,11 @@ public interface CatalogService extends 
EventProducer<CatalogEvent, CatalogEvent
 
     Collection<CatalogTableDescriptor> tables(int catalogVersion);
 
-    @Nullable CatalogIndexDescriptor index(String indexName, long timestamp);
+    /**
+     * Returns an <em>alive</em> index with the given name, that is an index 
which exists and is not in the
+     * {@link CatalogIndexStatus#STOPPING} state in the Catalog at a given 
point in time.

Review Comment:
   Should an alive index be defined as an index that has not yet been dropped? 
And then we can clarify that it's not `STOPPING`



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java:
##########
@@ -98,24 +96,39 @@ public List<UpdateEntry> get(Catalog catalog) {
             }
 
             if (indexedColumns.contains(columnName)) {
-                List<String> indexesNames = Arrays.stream(schema.indexes())
-                        .filter(index -> index.tableId() == table.id())
-                        .filter(index -> index instanceof 
CatalogHashIndexDescriptor
-                                ? ((CatalogHashIndexDescriptor) 
index).columns().contains(columnName)
-                                : ((CatalogSortedIndexDescriptor) 
index).columns().stream().map(CatalogIndexColumnDescriptor::name)
-                                        .anyMatch(column -> 
column.equals(columnName))
-                
).map(CatalogIndexDescriptor::name).collect(Collectors.toList());
+                List<String> indexesNames = indexesForTable(schema, table)
+                        .filter(index -> 
indexColumnNames(index).anyMatch(columnName::equals))
+                        .map(CatalogIndexDescriptor::name)
+                        .collect(Collectors.toList());
 
                 throw new CatalogValidationException(format(
                         "Deleting column '{}' used by index(es) {}, it is not 
allowed", columnName, indexesNames));
             }
-        }
+        });
 
         return List.of(
                 new DropColumnsEntry(table.id(), columns, schemaName)
         );
     }
 
+    private static Stream<CatalogIndexDescriptor> 
indexesForTable(CatalogSchemaDescriptor schema, CatalogTableDescriptor table) {

Review Comment:
   ```suggestion
       private static Stream<CatalogIndexDescriptor> 
aliveIndexesForTable(CatalogSchemaDescriptor schema, CatalogTableDescriptor 
table) {
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java:
##########
@@ -98,24 +96,39 @@ public List<UpdateEntry> get(Catalog catalog) {
             }
 
             if (indexedColumns.contains(columnName)) {
-                List<String> indexesNames = Arrays.stream(schema.indexes())
-                        .filter(index -> index.tableId() == table.id())
-                        .filter(index -> index instanceof 
CatalogHashIndexDescriptor
-                                ? ((CatalogHashIndexDescriptor) 
index).columns().contains(columnName)
-                                : ((CatalogSortedIndexDescriptor) 
index).columns().stream().map(CatalogIndexColumnDescriptor::name)
-                                        .anyMatch(column -> 
column.equals(columnName))
-                
).map(CatalogIndexDescriptor::name).collect(Collectors.toList());
+                List<String> indexesNames = indexesForTable(schema, table)
+                        .filter(index -> 
indexColumnNames(index).anyMatch(columnName::equals))
+                        .map(CatalogIndexDescriptor::name)
+                        .collect(Collectors.toList());
 
                 throw new CatalogValidationException(format(
                         "Deleting column '{}' used by index(es) {}, it is not 
allowed", columnName, indexesNames));
             }
-        }
+        });
 
         return List.of(
                 new DropColumnsEntry(table.id(), columns, schemaName)
         );
     }
 
+    private static Stream<CatalogIndexDescriptor> 
indexesForTable(CatalogSchemaDescriptor schema, CatalogTableDescriptor table) {
+        return Arrays.stream(schema.indexes())
+                .filter(index -> index.tableId() == table.id() && 
index.status() != CatalogIndexStatus.STOPPING);

Review Comment:
   Maybe we even add an `alive()` method to `CatalogIndexStatus` to make all 
these checks in the code centralized?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java:
##########
@@ -78,6 +80,18 @@ public CatalogIndexDescriptor[] indexes() {
         return indexes;
     }
 
+    /**
+     * Returns a list of indexes that have the given name.
+     *
+     * <p>There can be multiple indexes with the same name in a {@link 
CatalogIndexStatus#STOPPING} state, but at most one <em>alive</em>

Review Comment:
   Do we really need those STOPPING indexes here?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java:
##########
@@ -65,7 +65,18 @@ private DropIndexCommand(String schemaName, String 
indexName) throws CatalogVali
     public List<UpdateEntry> get(Catalog catalog) {
         CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
 
-        CatalogIndexDescriptor index = indexOrThrow(schema, indexName);
+        List<CatalogIndexDescriptor> indexes = schema.indexes(indexName);

Review Comment:
   Why don't you change `indexes()` methods analogously by filtering out 
STOPPING indices inside it? It looks weird that while looking for 1 index by 
name we don't see STOPPING, but we see such indexes when getting all indexes by 
name.



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItDropIndexMultipleNodesTest.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import 
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for scenarios related to dropping of indices, executed on a 
multiple node cluster.
+ */
+public class ItDropIndexMultipleNodesTest extends BaseSqlIntegrationTest {
+    private static final String TABLE_NAME = "TEST";
+
+    private static final String INDEX_NAME = "TEST_IDX";
+
+    @BeforeEach
+    void createTable() {
+        int partitions = initialNodes();
+
+        int replicas = initialNodes();
+
+        createTable(TABLE_NAME, replicas, partitions);
+    }
+
+    @AfterEach
+    void cleanup() {
+        CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
+
+        dropAllTables();
+    }
+
+    @Test
+    void testDropIndexDuringTransaction() {
+        int indexId = createIndex();
+
+        CompletableFuture<Void> indexRemovedFuture = indexRemovedFuture();
+
+        IgniteImpl node = CLUSTER.aliveNode();
+
+        // Start a transaction. We expect that the index will not be removed 
until this transaction completes.
+        runInRwTransaction(node, tx -> {
+            dropIndex();
+
+            CatalogIndexDescriptor indexDescriptor = 
node.catalogManager().index(indexId, node.clock().nowLong());
+
+            assertThat(indexDescriptor, is(notNullValue()));
+            assertThat(indexDescriptor.status(), 
is(CatalogIndexStatus.STOPPING));
+            assertThat(indexRemovedFuture, willTimeoutFast());
+        });
+
+        assertThat(indexRemovedFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testWritingIntoStoppingIndex() {
+        int indexId = createIndex();
+
+        IgniteImpl node = CLUSTER.aliveNode();
+
+        // Latch for waiting for the RW transaction to start before dropping 
the index.
+        var startTransactionLatch = new CountDownLatch(1);
+        // Latch for waiting for the index to be dropped, before inserting 
data in the transaction.
+        var dropIndexLatch = new CountDownLatch(1);
+
+        CompletableFuture<Void> dropIndexFuture = runAsync(() -> {
+            try {
+                startTransactionLatch.await(1, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                throw new CompletionException(e);
+            }
+
+            dropIndex();
+
+            CatalogIndexDescriptor indexDescriptor = 
node.catalogManager().index(indexId, node.clock().nowLong());
+
+            assertThat(indexDescriptor, is(notNullValue()));
+            assertThat(indexDescriptor.status(), 
is(CatalogIndexStatus.STOPPING));
+
+            dropIndexLatch.countDown();
+        });
+
+        CompletableFuture<Void> insertDataIntoIndexTransaction = runAsync(() 
-> {
+            runInRwTransaction(node, tx -> {
+                startTransactionLatch.countDown();
+
+                try {
+                    dropIndexLatch.await(1, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    throw new CompletionException(e);
+                }
+
+                // Insert data into a STOPPING index. We expect it to be 
inserted.
+                insertPeople(tx, TABLE_NAME, new Person(239, "foo", 0));
+
+                assertQuery((InternalTransaction) tx, String.format("SELECT id 
FROM %s WHERE NAME > 'a'", TABLE_NAME))
+                        .matches(containsIndexScan("PUBLIC", TABLE_NAME, 
INDEX_NAME))
+                        .returns(239)
+                        .check();
+            });
+        });
+
+        assertThat(dropIndexFuture, willCompleteSuccessfully());
+        assertThat(insertDataIntoIndexTransaction, willCompleteSuccessfully());
+    }
+
+    /**
+     * Tests the following scenario.
+     *
+     * <ol>
+     *     <li>Transaction A is started that is expected to observe an index 
in the {@link CatalogIndexStatus#AVAILABLE} state;</li>
+     *     <li>The index is dropped;</li>
+     *     <li>Transaction B is started that is expected to observe the index 
in the {@link CatalogIndexStatus#STOPPING} state;</li>
+     *     <li>Transaction B inserts data into the table and, therefore, into 
the index;</li>
+     *     <li>Transaction A performs a scan that utilizes the index over the 
table and is expected to see the data written by
+     *     Transaction B.</li>
+     * </ol>
+     */
+    @Test
+    void testWritingIntoStoppingIndexInDifferentTransactions() {
+        int indexId = createIndex();
+
+        IgniteImpl node = CLUSTER.aliveNode();
+
+        // Latch that will be released when a transaction expected to observe 
the index in the AVAILABLE state is started.
+        var startAvailableTransactionLatch = new CountDownLatch(1);
+        // Latch that will be released when a transaction expected to observe 
the index in the STOPPING state is started.
+        var insertDataTransactionLatch = new CountDownLatch(1);
+        // Latch for waiting for the index to be dropped.
+        var dropIndexLatch = new CountDownLatch(1);
+
+        CompletableFuture<Void> dropIndexFuture = runAsync(() -> {
+            // Wait for a transaction to start before dropping the index.
+            try {
+                startAvailableTransactionLatch.await(1, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                throw new CompletionException(e);
+            }
+
+            // Drop the index inside a transaction. Until it finishes the 
index will be stuck in the STOPPING state.
+            runInRwTransaction(node, tx -> {

Review Comment:
   We do we start an RW transaction here? There is already the 'old reader RW 
tx' that starts before the index gets dropped and hence holds it in the 
`STOPPING` state



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java:
##########
@@ -65,7 +65,18 @@ private DropIndexCommand(String schemaName, String 
indexName) throws CatalogVali
     public List<UpdateEntry> get(Catalog catalog) {
         CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
 
-        CatalogIndexDescriptor index = indexOrThrow(schema, indexName);
+        List<CatalogIndexDescriptor> indexes = schema.indexes(indexName);

Review Comment:
   If it's because there is a separate case for handling STOPPING in the 
preceding code, I think it can be removed: we can just throw the same exception 
as is thrown when no such index exists.



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItDropIndexMultipleNodesTest.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import 
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for scenarios related to dropping of indices, executed on a 
multiple node cluster.
+ */
+public class ItDropIndexMultipleNodesTest extends BaseSqlIntegrationTest {
+    private static final String TABLE_NAME = "TEST";
+
+    private static final String INDEX_NAME = "TEST_IDX";
+
+    @BeforeEach
+    void createTable() {
+        int partitions = initialNodes();
+
+        int replicas = initialNodes();
+
+        createTable(TABLE_NAME, replicas, partitions);
+    }
+
+    @AfterEach
+    void cleanup() {
+        CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
+
+        dropAllTables();
+    }
+
+    @Test
+    void testDropIndexDuringTransaction() {

Review Comment:
   ```suggestion
       void testActiveRwTransactionPreventsStoppingIndexFromBeingRemoved() {
   ```
   Maybe my suggestion is a bit too long, but the idea is that this describes 
better the effect we want to test for (if I read the test correctly)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to