tkalkirill commented on code in PR #3424:
URL: https://github.com/apache/ignite-3/pull/3424#discussion_r1527924003


##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java:
##########
@@ -93,7 +87,7 @@ void testRecoverBuildingIndex() throws Exception {
             return false;
         });
 
-        createIndex(TABLE_NAME, INDEX_NAME, "ID");
+        runAsync(() -> createIndex(TABLE_NAME, INDEX_NAME, "ID"));

Review Comment:
   Maybe it would be better to wait for the futures to finish executing?



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java:
##########
@@ -285,13 +279,10 @@ void testBuildingIndexWithUpdateSchemaAfterCreateIndex() 
throws Exception {
         Transaction rwTx = node().transactions().begin(new 
TransactionOptions().readOnly(false));
 
         try {
-            setAwaitIndexAvailability(false);
-
-            createIndex(TABLE_NAME, INDEX_NAME, columName);
+            runAsync(() -> createIndex(TABLE_NAME, INDEX_NAME, columName));

Review Comment:
   Same



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItDropIndexMultipleNodesTest.java:
##########
@@ -264,7 +269,7 @@ void testDropIndexDuringBuilding() {
 
         CompletableFuture<Void> indexRemovedFuture = indexRemovedFuture();
 
-        createIndex();
+        runAsync(() -> createIndex());

Review Comment:
   Same



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItDropIndexMultipleNodesTest.java:
##########
@@ -239,7 +242,12 @@ void testDropIndexAfterRegistering() {
         runInRwTransaction(CLUSTER.aliveNode(), tx -> {
             // Create an index inside a transaction, this will prevent the 
index from building.
             try {
-                createIndex();
+                runAsync(() -> createIndex());

Review Comment:
   Same about wait future.



##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java:
##########
@@ -217,6 +217,22 @@ public void testEmptyCatalog() {
         assertThat(manager.latestCatalogVersion(), is(0));
     }
 
+    @Test
+    public void assignsSuccessiveCatalogVersions() {
+        CompletableFuture<Integer> version1Future = 
manager.execute(simpleTable(TABLE_NAME));
+        assertThat(version1Future, willCompleteSuccessfully());

Review Comment:
   Can you do `willBe(1)` and etc?



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java:
##########
@@ -62,16 +66,23 @@ void tearDown() {
         CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
     }
 
+    @SuppressWarnings("resource")
     @Test
-    void testCreateIndexInsideRwTransaction() {
+    void testCreateIndexInsideRwTransaction() throws Exception {
+        CatalogManager catalogManager = node().catalogManager();
+
         TableImpl table = (TableImpl) createZoneAndTable(ZONE_NAME, 
TABLE_NAME, 1, 1, ENGINE_NAME);
 
-        setAwaitIndexAvailability(false);
         dropAnyBuildIndexMessages();
 
         Transaction rwTx = beginRwTransaction();
 
-        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+        runAsync(() -> createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME));

Review Comment:
   Same



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexStatus.java:
##########
@@ -80,6 +80,19 @@ public boolean isAlive() {
         return this != STOPPING;
     }
 
+    /**
+     * Returns {@code true} if the index is {@link #AVAILABLE} or follows it.
+     */
+    public boolean isAvailableOrLater() {

Review Comment:
   Maybe move to `CatalogUtils` ?



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItDropIndexOneNodeTest.java:
##########
@@ -65,14 +69,23 @@ void testCreateIndexAfterDrop() {
 
     @Test
     void testCreateIndexAfterDropWhileTransactionInProgress() {
+        CatalogManager catalogManager = CLUSTER.aliveNode().catalogManager();
+
         runInRwTransaction(tx -> {
             dropIndex(INDEX_NAME);
 
-            // The new index will not become available, since we are inside a 
transaction that has been started before this index was
-            // created.
-            setAwaitIndexAvailability(false);
+            runAsync(() -> createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME));

Review Comment:
   same



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java:
##########
@@ -169,9 +191,74 @@ private static BiFunction<Object, Throwable, Boolean> 
handleModificationResult(b
     /** Handles create index command. */
     private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand 
cmd) {
         return 
catalogManager.execute(DdlToCatalogCommandConverter.convert(cmd))
+                .thenCompose(catalogVersion -> 
waitTillIndexBecomesAvailableOrRemoved(cmd, catalogVersion))
                 .handle(handleModificationResult(cmd.ifNotExists(), 
IndexExistsValidationException.class));
     }
 
+    private CompletionStage<Void> 
waitTillIndexBecomesAvailableOrRemoved(CreateIndexCommand cmd, Integer 
creationCatalogVersion) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        Catalog catalog = catalogManager.catalog(creationCatalogVersion);
+        assert catalog != null : creationCatalogVersion;
+
+        CatalogSchemaDescriptor schema = catalog.schema(cmd.schemaName());
+        assert schema != null : cmd.schemaName();
+
+        CatalogIndexDescriptor index = schema.aliveIndex(cmd.indexName());
+        assert index != null : cmd.indexName();
+
+        EventListener<CatalogEventParameters> availabilityListener = event -> {
+            if (((MakeIndexAvailableEventParameters) event).indexId() == 
index.id()) {

Review Comment:
   U can use `EventListener#fromConsumer`



##########
modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java:
##########
@@ -83,10 +95,12 @@ void testDropIndex() {
      * @param failIfExist Throw an exception if the index exist.
      */
     private static void tryToCreateIndex(String tableName, String indexName, 
boolean failIfExist) {
+        Loggers.forClass(ItIndexDdlTest.class).info("XXX starting index 
creation");

Review Comment:
   Let's move the logger to the class and not use "XXX".



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java:
##########
@@ -198,18 +198,14 @@ public void queryWaitAppropriateSchema() {
 
         createTable(ignite0, TABLE_NAME);
 
-        WatchListenerInhibitor listenerInhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
-
-        listenerInhibitor.startInhibit();
+        
WatchListenerInhibitor.metastorageEventsInhibitor(ignite1).withInhibition(() -> 
{
+            runAsync(() -> sql(ignite0, "CREATE INDEX idx1 ON " + TABLE_NAME + 
"(valint)"));

Review Comment:
   same



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java:
##########
@@ -169,9 +191,74 @@ private static BiFunction<Object, Throwable, Boolean> 
handleModificationResult(b
     /** Handles create index command. */
     private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand 
cmd) {
         return 
catalogManager.execute(DdlToCatalogCommandConverter.convert(cmd))
+                .thenCompose(catalogVersion -> 
waitTillIndexBecomesAvailableOrRemoved(cmd, catalogVersion))
                 .handle(handleModificationResult(cmd.ifNotExists(), 
IndexExistsValidationException.class));
     }
 
+    private CompletionStage<Void> 
waitTillIndexBecomesAvailableOrRemoved(CreateIndexCommand cmd, Integer 
creationCatalogVersion) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        Catalog catalog = catalogManager.catalog(creationCatalogVersion);
+        assert catalog != null : creationCatalogVersion;
+
+        CatalogSchemaDescriptor schema = catalog.schema(cmd.schemaName());
+        assert schema != null : cmd.schemaName();
+
+        CatalogIndexDescriptor index = schema.aliveIndex(cmd.indexName());
+        assert index != null : cmd.indexName();
+
+        EventListener<CatalogEventParameters> availabilityListener = event -> {
+            if (((MakeIndexAvailableEventParameters) event).indexId() == 
index.id()) {
+                completeFutureWhenEventVersionActivates(future, event);
+            }
+
+            return falseCompletedFuture();
+        };
+        catalogManager.listen(CatalogEvent.INDEX_AVAILABLE, 
availabilityListener);
+
+        EventListener<CatalogEventParameters> removalListener = event -> {
+            if (((RemoveIndexEventParameters) event).indexId() == index.id()) {

Review Comment:
   Same about fromComsumer



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java:
##########
@@ -169,9 +191,74 @@ private static BiFunction<Object, Throwable, Boolean> 
handleModificationResult(b
     /** Handles create index command. */
     private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand 
cmd) {
         return 
catalogManager.execute(DdlToCatalogCommandConverter.convert(cmd))
+                .thenCompose(catalogVersion -> 
waitTillIndexBecomesAvailableOrRemoved(cmd, catalogVersion))
                 .handle(handleModificationResult(cmd.ifNotExists(), 
IndexExistsValidationException.class));
     }
 
+    private CompletionStage<Void> 
waitTillIndexBecomesAvailableOrRemoved(CreateIndexCommand cmd, Integer 
creationCatalogVersion) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        Catalog catalog = catalogManager.catalog(creationCatalogVersion);
+        assert catalog != null : creationCatalogVersion;
+
+        CatalogSchemaDescriptor schema = catalog.schema(cmd.schemaName());
+        assert schema != null : cmd.schemaName();

Review Comment:
   I think it would be good to indicate the catalog version in the error 
message.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java:
##########
@@ -169,9 +191,74 @@ private static BiFunction<Object, Throwable, Boolean> 
handleModificationResult(b
     /** Handles create index command. */
     private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand 
cmd) {
         return 
catalogManager.execute(DdlToCatalogCommandConverter.convert(cmd))
+                .thenCompose(catalogVersion -> 
waitTillIndexBecomesAvailableOrRemoved(cmd, catalogVersion))
                 .handle(handleModificationResult(cmd.ifNotExists(), 
IndexExistsValidationException.class));
     }
 
+    private CompletionStage<Void> 
waitTillIndexBecomesAvailableOrRemoved(CreateIndexCommand cmd, Integer 
creationCatalogVersion) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        Catalog catalog = catalogManager.catalog(creationCatalogVersion);
+        assert catalog != null : creationCatalogVersion;
+
+        CatalogSchemaDescriptor schema = catalog.schema(cmd.schemaName());
+        assert schema != null : cmd.schemaName();
+
+        CatalogIndexDescriptor index = schema.aliveIndex(cmd.indexName());
+        assert index != null : cmd.indexName();

Review Comment:
   same



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItDropIndexOneNodeTest.java:
##########
@@ -65,14 +69,23 @@ void testCreateIndexAfterDrop() {
 
     @Test
     void testCreateIndexAfterDropWhileTransactionInProgress() {
+        CatalogManager catalogManager = CLUSTER.aliveNode().catalogManager();
+
         runInRwTransaction(tx -> {
             dropIndex(INDEX_NAME);
 
-            // The new index will not become available, since we are inside a 
transaction that has been started before this index was
-            // created.
-            setAwaitIndexAvailability(false);
+            runAsync(() -> createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME));
+
+            try {
+                assertTrue(waitForCondition(

Review Comment:
   Let's add it as a helper method to the test code?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java:
##########
@@ -256,6 +263,13 @@ public void testAddIndex() {
         tryToCreateIndex(ignite0, TABLE_NAME, false);
     }
 
+    private static void waitForIndexToAppearInAnyState(IgniteImpl ignite0) 
throws InterruptedException {

Review Comment:
   same



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java:
##########
@@ -62,16 +66,23 @@ void tearDown() {
         CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
     }
 
+    @SuppressWarnings("resource")
     @Test
-    void testCreateIndexInsideRwTransaction() {
+    void testCreateIndexInsideRwTransaction() throws Exception {
+        CatalogManager catalogManager = node().catalogManager();
+
         TableImpl table = (TableImpl) createZoneAndTable(ZONE_NAME, 
TABLE_NAME, 1, 1, ENGINE_NAME);
 
-        setAwaitIndexAvailability(false);
         dropAnyBuildIndexMessages();
 
         Transaction rwTx = beginRwTransaction();
 
-        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+        runAsync(() -> createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME));
+
+        assertTrue(waitForCondition(

Review Comment:
   Same



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java:
##########
@@ -29,16 +29,16 @@ public interface CatalogManager extends IgniteComponent, 
CatalogService {
      * Executes given command.
      *
      * @param command Command to execute.
-     * @return Future representing result of execution.
+     * @return Future representing result of execution (it will be completed 
with the created catalog version).
      */
-    CompletableFuture<Void> execute(CatalogCommand command);
+    CompletableFuture<Integer> execute(CatalogCommand command);

Review Comment:
   Didn’t notice where you are using the result of the future, not in the test 
code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to