sanpwc commented on code in PR #1572:
URL: https://github.com/apache/ignite-3/pull/1572#discussion_r1092413754
##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java:
##########
@@ -38,8 +37,8 @@ public class AffinityUtils {
* @param replicas Replicas count.
Review Comment:
javadoc lacks baselineNodes param. And it's no longer baselineNodes. I'd
rather use dataNodes one.
##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java:
##########
@@ -48,32 +47,32 @@ public static List<Set<Assignment>>
calculateAssignments(Collection<ClusterNode>
HashSet::new
);
- return
affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList());
+ return
affinityNodes.stream().map(AffinityUtils::consistentIdsToAssignments).collect(toList());
}
/**
* Calculates affinity assignments for a single partition.
*
- * @param baselineNodes Nodes.
+ * @param consistentIds Consistent ids of nodes.
* @param partition Partition id.
* @param replicas Replicas count.
- * @return List of assignments.
+ * @return Set of assignments.
*/
- public static Set<Assignment>
calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int
partition, int replicas) {
- Set<ClusterNode> affinityNodes =
RendezvousAffinityFunction.assignPartition(
+ public static Set<Assignment>
calculateAssignmentForPartition(Collection<String> consistentIds, int
partition, int replicas) {
Review Comment:
From my point of view consistentIds is a bit confusing, I'd rather use
dataNodes with more meaningful explanation in @param section that will
explicitly denote that it's **consistent** between restarts set of data nodes
identifiers.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
Review Comment:
It's actually an integration rebalance test and not a distributionZones one,
is that correct?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java:
##########
@@ -61,10 +62,27 @@ class DistributionZonesUtil {
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
- static ByteArray zoneDataNodesKey(int zoneId) {
+ public static ByteArray zoneDataNodesKey(int zoneId) {
return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId);
}
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
Review Comment:
Is it really valid to have such one-liners for methods?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItDistributionZonesTest.class);
+
+ /**
+ * Nodes bootstrap configuration pattern.
+ *
+ * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic
snapshot installation failures still
+ * allow tests pass thanks to retries.
+ */
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " network: {\n"
+ + " port:{},\n"
+ + " nodeFinder:{\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " raft.rpcInstallSnapshotTimeout: 10000"
+ + "}";
+
+ @WorkDirectory
+ private Path workDir;
+
+ private Cluster cluster;
+
+ @BeforeEach
+ void createCluster(TestInfo testInfo) {
+ cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+ }
+
+ @AfterEach
+ @Timeout(60)
+ void shutdownCluster() {
+ cluster.shutdown();
+ }
+
+ @Test
+ @Disabled
+ void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+ cluster.startAndInit(4);
+
+ createTestTable();
Review Comment:
Could you please add a comment that we are creating the table with one
partition and three replicas? Minor, though.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItDistributionZonesTest.class);
+
+ /**
+ * Nodes bootstrap configuration pattern.
+ *
+ * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic
snapshot installation failures still
+ * allow tests pass thanks to retries.
+ */
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " network: {\n"
+ + " port:{},\n"
+ + " nodeFinder:{\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " raft.rpcInstallSnapshotTimeout: 10000"
+ + "}";
+
+ @WorkDirectory
+ private Path workDir;
+
+ private Cluster cluster;
+
+ @BeforeEach
+ void createCluster(TestInfo testInfo) {
+ cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+ }
+
+ @AfterEach
+ @Timeout(60)
Review Comment:
And what about timeout values, both 60 and 90. Is it also just a copy paste
from ItTableRaftSnapshotsTest or we really expect given test to run up to 90
seconds?
##########
modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java:
##########
@@ -104,29 +102,26 @@ public void testPartitionDistribution() {
}
@NotNull
- private List<ClusterNode> prepareNetworkTopology(int nodes) {
- var addr = new NetworkAddress("127.0.0.1", 121212);
-
+ private List<String> prepareNetworkTopology(int nodes) {
return IntStream.range(0, nodes)
.mapToObj(i -> "Node " + i)
- .map(name -> new ClusterNode(UUID.randomUUID().toString(),
name, addr))
.collect(Collectors.toUnmodifiableList());
}
@Test
public void serializeAssignment() {
- int nodes = 50;
+ int nodeCount = 50;
int parts = 10_000;
int replicas = 4;
- List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+ List<String> nodes = prepareNetworkTopology(nodeCount);
- assertTrue(parts > nodes, "Partitions should be more that nodes");
+ assertTrue(parts > nodeCount, "Partitions should be more that nodes");
Review Comment:
Same as above mentioned by senior @alievmirza - than.
##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java:
##########
@@ -48,32 +47,32 @@ public static List<Set<Assignment>>
calculateAssignments(Collection<ClusterNode>
HashSet::new
);
- return
affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList());
+ return
affinityNodes.stream().map(AffinityUtils::consistentIdsToAssignments).collect(toList());
}
/**
* Calculates affinity assignments for a single partition.
*
- * @param baselineNodes Nodes.
+ * @param consistentIds Consistent ids of nodes.
* @param partition Partition id.
* @param replicas Replicas count.
- * @return List of assignments.
+ * @return Set of assignments.
*/
- public static Set<Assignment>
calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int
partition, int replicas) {
- Set<ClusterNode> affinityNodes =
RendezvousAffinityFunction.assignPartition(
+ public static Set<Assignment>
calculateAssignmentForPartition(Collection<String> consistentIds, int
partition, int replicas) {
+ Set<String> affinityNodes = RendezvousAffinityFunction.assignPartition(
partition,
- new ArrayList<>(baselineNodes),
+ new ArrayList<>(consistentIds),
replicas,
null,
false,
null,
HashSet::new
);
- return clusterNodesToAssignments(affinityNodes);
+ return consistentIdsToAssignments(affinityNodes);
}
- private static Set<Assignment>
clusterNodesToAssignments(Collection<ClusterNode> nodes) {
- return nodes.stream().map(node ->
Assignment.forPeer(node.name())).collect(toSet());
+ private static Set<Assignment>
consistentIdsToAssignments(Collection<String> nodes) {
+ return nodes.stream().map(node ->
Assignment.forPeer(node)).collect(toSet());
Review Comment:
Let's use method reference here ;)
##########
modules/runner/build.gradle:
##########
@@ -114,6 +114,7 @@ dependencies {
integrationTestImplementation project(':ignite-metastorage')
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-transactions')
+ integrationTestImplementation project(':ignite-distribution-zones')
Review Comment:
Do we really use distribution zones module within runner tests?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItDistributionZonesTest.class);
+
+ /**
+ * Nodes bootstrap configuration pattern.
+ *
+ * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic
snapshot installation failures still
Review Comment:
It looks like a copy paste from ItTableRaftSnapshotsTest. Do we really need
10 seconds adjust here?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItDistributionZonesTest.class);
+
+ /**
+ * Nodes bootstrap configuration pattern.
+ *
+ * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic
snapshot installation failures still
+ * allow tests pass thanks to retries.
+ */
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " network: {\n"
+ + " port:{},\n"
+ + " nodeFinder:{\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " raft.rpcInstallSnapshotTimeout: 10000"
+ + "}";
+
+ @WorkDirectory
+ private Path workDir;
+
+ private Cluster cluster;
+
+ @BeforeEach
+ void createCluster(TestInfo testInfo) {
+ cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+ }
+
+ @AfterEach
+ @Timeout(60)
+ void shutdownCluster() {
+ cluster.shutdown();
+ }
+
+ @Test
+ @Disabled
+ void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+ cluster.startAndInit(4);
+
+ createTestTable();
+
+ assertTrue(waitAssingments(List.of(
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2)
+ )));
+
+ TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+ BinaryRowEx key = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(0).node()).get());
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(1).node()).get());
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(2).node()).get());
+
+ putData();
+
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(2).node()).get());
+
+ try {
+ table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(3).node()).get();
+
+ fail();
+ } catch (Exception e) {
+ assertInstanceOf(ExecutionException.class, e);
+
+ assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+ }
+
+ cluster.knockOutNode(2, PARTITION_NETWORK);
+
+ assertTrue(waitAssingments(List.of(
+ Set.of(0, 1, 3),
+ Set.of(0, 1, 3),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 3)
+ )));
+
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(3).node()).get());
+
+ cluster.reanimateNode(2, PARTITION_NETWORK);
+
+ assertTrue(waitAssingments(List.of(
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2)
+ )));
+
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(2).node()).get());
+
+ try {
+ table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(3).node()).get();
+
+ fail();
+ } catch (Exception e) {
+ assertInstanceOf(ExecutionException.class, e);
+
+ assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+ }
+ }
+
+ private boolean waitAssingments(List<Set<Integer>> nodes) throws
InterruptedException {
+ return waitForCondition(() -> {
+ for (int i = 0; i < nodes.size(); i++) {
+ Set<Integer> excpectedAssignments = nodes.get(i);
+
+ ExtendedTableConfiguration table =
+ (ExtendedTableConfiguration) cluster.node(i)
+
.clusterConfiguration().getConfiguration(TablesConfiguration.KEY).tables().get("TEST");
+
+ byte[] assignmentsBytes = table.assignments().value();
+
+ Set<String> assignments;
+
+ if (assignmentsBytes != null) {
+ assignments = ((List<Set<Assignment>>)
ByteUtils.fromBytes(assignmentsBytes)).get(0)
+ .stream().map(assignment ->
assignment.consistentId()).collect(Collectors.toSet());
+ } else {
+ assignments = Collections.emptySet();
+ }
+
+ LOG.info("Assignments for node " + i + ": " + assignments);
+
+ if (!(excpectedAssignments.size() == assignments.size())
+ || !excpectedAssignments.stream().allMatch(node ->
assignments.contains(cluster.node(node).name()))) {
Review Comment:
It's not a good idea to check whether node name contains the number and
consider that as an equality relation. I'd use full node names for an expected
value.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItDistributionZonesTest.class);
+
+ /**
+ * Nodes bootstrap configuration pattern.
+ *
+ * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic
snapshot installation failures still
+ * allow tests pass thanks to retries.
+ */
+ private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " network: {\n"
+ + " port:{},\n"
+ + " nodeFinder:{\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " raft.rpcInstallSnapshotTimeout: 10000"
+ + "}";
+
+ @WorkDirectory
+ private Path workDir;
+
+ private Cluster cluster;
+
+ @BeforeEach
+ void createCluster(TestInfo testInfo) {
+ cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+ }
+
+ @AfterEach
+ @Timeout(60)
+ void shutdownCluster() {
+ cluster.shutdown();
+ }
+
+ @Test
+ @Disabled
+ void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+ cluster.startAndInit(4);
+
+ createTestTable();
+
+ assertTrue(waitAssingments(List.of(
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2)
+ )));
+
+ TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+ BinaryRowEx key = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(0).node()).get());
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(1).node()).get());
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(2).node()).get());
+
+ putData();
Review Comment:
It's a bit confusing that on the one hand we expose the key in order to
retrieve the data and on the other hand encapsulate it into putData(). Lets be
consistent ;)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]