ibessonov commented on code in PR #3369:
URL: https://github.com/apache/ignite-3/pull/3369#discussion_r1515964879


##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** Integration test to check the work with indexes on rebalancing. */
+public class ItIndexAndRebalanceTest extends BaseSqlIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private static final String ZONE_NAME = zoneName(TABLE_NAME);
+
+    private static final String INDEX_NAME = "TEST_INDEX";
+
+    private static final String COLUMN_NAME = "SALARY";
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @BeforeEach
+    void setUp() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + ZONE_NAME);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21710";)
+    @Test
+    void testChangeReplicaCountWithoutRestartNodes() throws Exception {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 2, 1);
+
+        insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+        changeZoneReplicas(ZONE_NAME, 1);
+        waitForStableAssignmentsChangeInMetastore(TABLE_NAME, 1, 0);
+        insertPeople(TABLE_NAME, new Person(1, "1", 11.0));
+
+        changeZoneReplicas(ZONE_NAME, 2);
+        waitForStableAssignmentsChangeInMetastore(TABLE_NAME, 2, 0);
+        insertPeople(TABLE_NAME, new Person(2, "2", 12.0));
+
+        List<IgniteImpl> nodes = CLUSTER.runningNodes().collect(toList());
+
+        assertThat(nodes, hasSize(2));
+
+        for (IgniteImpl node : nodes) {
+            assertQuery(node, format("SELECT * FROM {} WHERE {} > 0.0", 
TABLE_NAME, COLUMN_NAME))
+                    .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, 
TABLE_NAME, INDEX_NAME))
+                    .returnRowCount(3)
+                    .check();
+        }
+    }
+
+    private static void changeZoneReplicas(String zoneName, int replicas) {
+        sql(format("ALTER ZONE {} SET REPLICAS={}", zoneName, replicas));
+    }
+
+    private static void waitForStableAssignmentsChangeInMetastore(
+            String tableName,
+            int expReplicaCount,
+            int partitionId
+    ) throws Exception {
+        IgniteImpl node = CLUSTER.aliveNode();
+
+        int tableId = getTableIdStrict(node.catalogManager(), tableName, 
node.clock().nowLong());
+
+        Set<Assignment>[] actualAssignmentsHolder = new Set[]{Set.of()};

Review Comment:
   I don't see where you use this variable after assignment. Maybe you forgot 
to delete it



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** Integration test to check the work with indexes on rebalancing. */
+public class ItIndexAndRebalanceTest extends BaseSqlIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private static final String ZONE_NAME = zoneName(TABLE_NAME);
+
+    private static final String INDEX_NAME = "TEST_INDEX";
+
+    private static final String COLUMN_NAME = "SALARY";
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @BeforeEach
+    void setUp() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + ZONE_NAME);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21710";)
+    @Test
+    void testChangeReplicaCountWithoutRestartNodes() throws Exception {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 2, 1);
+
+        insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+        changeZoneReplicas(ZONE_NAME, 1);
+        waitForStableAssignmentsChangeInMetastore(TABLE_NAME, 1, 0);
+        insertPeople(TABLE_NAME, new Person(1, "1", 11.0));
+
+        changeZoneReplicas(ZONE_NAME, 2);
+        waitForStableAssignmentsChangeInMetastore(TABLE_NAME, 2, 0);
+        insertPeople(TABLE_NAME, new Person(2, "2", 12.0));
+
+        List<IgniteImpl> nodes = CLUSTER.runningNodes().collect(toList());
+
+        assertThat(nodes, hasSize(2));

Review Comment:
   What's the purpose of this assertion? You don't stop nodes in this test



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** Integration test to check the work with indexes on rebalancing. */
+public class ItIndexAndRebalanceTest extends BaseSqlIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private static final String ZONE_NAME = zoneName(TABLE_NAME);
+
+    private static final String INDEX_NAME = "TEST_INDEX";
+
+    private static final String COLUMN_NAME = "SALARY";
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @BeforeEach
+    void setUp() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + ZONE_NAME);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21710";)
+    @Test
+    void testChangeReplicaCountWithoutRestartNodes() throws Exception {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 2, 1);
+
+        insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+        changeZoneReplicas(ZONE_NAME, 1);
+        waitForStableAssignmentsChangeInMetastore(TABLE_NAME, 1, 0);
+        insertPeople(TABLE_NAME, new Person(1, "1", 11.0));
+
+        changeZoneReplicas(ZONE_NAME, 2);
+        waitForStableAssignmentsChangeInMetastore(TABLE_NAME, 2, 0);
+        insertPeople(TABLE_NAME, new Person(2, "2", 12.0));
+
+        List<IgniteImpl> nodes = CLUSTER.runningNodes().collect(toList());
+
+        assertThat(nodes, hasSize(2));
+
+        for (IgniteImpl node : nodes) {
+            assertQuery(node, format("SELECT * FROM {} WHERE {} > 0.0", 
TABLE_NAME, COLUMN_NAME))

Review Comment:
   What's the point of running the query from all nodes? SQL engine will 
execute query distributively anyway. Please add clarifying comment, I don't 
understand your intentions



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1809,21 +1809,31 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
 
         Assignments nonStableNodeAssignmentsFinal = nonStableNodeAssignments;
 
+        int partitionId = replicaGrpId.partitionId();
+
         if (shouldStartLocalGroupNode) {
+            var singlePartitionIdSet = PartitionSet.of(partitionId);

Review Comment:
   "var" here violates codestyle



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java:
##########
@@ -19,12 +19,13 @@
 
 import java.util.BitSet;
 import java.util.stream.IntStream;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
 
-/**
- * {@link BitSet} implementation of the {@link PartitionSet}.
- */
+/** {@link BitSet} implementation of the {@link PartitionSet}. */

Review Comment:
   Please revert it back. I don't get all these useless re-formattings that you 
do.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java:
##########
@@ -71,14 +71,14 @@ public static void registerIndexToTable(
     }
 
     /**
-     * Registers indexes to a table on node recovery..
+     * Registers indexes to a table on node recovery or rebalance.

Review Comment:
   This comment explains nothing.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1809,21 +1809,31 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
 
         Assignments nonStableNodeAssignmentsFinal = nonStableNodeAssignments;
 
+        int partitionId = replicaGrpId.partitionId();
+
         if (shouldStartLocalGroupNode) {
+            var singlePartitionIdSet = PartitionSet.of(partitionId);
+
             localServicesStartFuture = localPartitionsVv.get(revision)
                     // TODO https://issues.apache.org/jira/browse/IGNITE-20957 
Revisit this code
-                    .thenApply(unused -> 
localPartsByTableId.get(tableId).copy())
-                    .thenComposeAsync(partitionSet -> inBusyLock(busyLock,
-                            () -> getOrCreatePartitionStorages(tbl, 
partitionSet)
-                    ), ioExecutor)
-                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
startPartitionAndStartClient(
-                            tbl,
-                            replicaGrpId.partitionId(),
-                            pendingAssignments,
-                            nonStableNodeAssignmentsFinal,
-                            zoneId,
-                            isRecovery
-                    )), ioExecutor);
+                    .thenComposeAsync(
+                            partitionSet -> inBusyLock(busyLock, () -> 
getOrCreatePartitionStorages(tbl, singlePartitionIdSet)),
+                            ioExecutor
+                    )
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+                        if (!isRecovery) {
+                            
registerIndexesToTableOnNodeRecoveryOrRebalance(tbl, catalogService, 
singlePartitionIdSet, tbl.schemaView());
+                        }

Review Comment:
   I don't understand what's happening. Please comment.
   This code will probably only be executed during a rebalance. By that time, 
table should already be created and indexes should already be registered. Am I 
missing something?
   
   Maybe this method should be renamed, because "register ... to table" is 
misleading. What you really do is you do something with individual partitions. 
Right?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -50,7 +50,7 @@
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
-import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTableOnNodeRecovery;
+import static 
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTableOnNodeRecoveryOrRebalance;

Review Comment:
   I don't like this name. Why not just call it "registerIndexesToTable"?
   Should name and implementation be limited to two specific usages? I don't 
think so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to