ibessonov commented on code in PR #3945:
URL: https://github.com/apache/ignite-3/pull/3945#discussion_r1647487957
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -239,10 +270,10 @@ void
testManualRebalanceIfMajorityIsLostSpecifyPartitions() throws Exception {
// Should fail because majority was lost.
List<Throwable> fixingPartErrorsBeforeReset = insertValues(table,
fixingPartId, 0);
- assertThat(fixingPartErrorsBeforeReset, Matchers.not(empty()));
+ assertThat(fixingPartErrorsBeforeReset, not(empty()));
Review Comment:
What do you mean? That's the result of automatic refactoring by IDEA. I used
method `not` in my test, particularly in line `assertThat(localStates,
is(not(anEmptyMap())));`. I added static import to make that code better. Other
lines have been changed as a side effect, it happens sometimes. I didn't
deliberately change the code that you're pointing to, that wasn't my intention.
I see what you're trying to do. I'm not convinced that it's a comparable
situation.
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -299,6 +330,158 @@ void testManualRebalanceIfPartitionIsLost() throws
Exception {
assertThat(errors, is(empty()));
}
+ /**
+ * Tests a scenario where all stable nodes are lost, yet we have data on
one of pending nodes and perform reset partition operation. In
+ * this case we should use that pending node as a source of data for
recovery.
+ *
+ * <p>It goes like this:
+ * <ul>
+ * <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+ * <li>We stop nodes 4 and 5, leaving node 1 alone in stable
assignments.</li>
+ * <li>New distribution is 0, 1 and 3. Rebalance is started via raft
snapshots. It transfers data to node 0, but not node 3.</li>
+ * <li>Node 1 is stopped. Data is only present on node 0.</li>
+ * <li>We execute "resetPartitions" and expect that data from node 0
will be available after that.</li>
+ * </ul>
+ */
+ @Test
+ @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+ public void testIncompleteRebalanceAfterResetPartitions() throws Exception
{
+ int partId = 0;
+
+ Assignments assignment013 = Assignments.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name())
+ );
+
+ IgniteImpl node0 = cluster.node(0);
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 1, 4, 5);
+
+ insertValues(table, partId, 0);
+
+ triggerRaftSnapshot(1, partId);
+ // Second snapshot causes log truncation.
+ triggerRaftSnapshot(1, partId);
+
+ node(1).dropMessages((nodeName, msg) ->
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+
+ stopNodesInParallel(4, 5);
+ waitForScale(node0, 4);
+
+ assertRealAssignments(node0, partId, 0, 1, 3);
+
+ cluster.runningNodes().forEach(node -> node.dropMessages((nodeName,
msg) -> stableKeySwitchMessage(msg, partId, assignment013)));
+
+ CompletableFuture<Void> resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+ var localStatesFut =
node0.disasterRecoveryManager().localPartitionStates(emptySet(),
Set.of(node(3).name()), emptySet());
+ assertThat(localStatesFut, willCompleteSuccessfully());
+
+ Map<TablePartitionId, LocalPartitionStateByNode> localStates =
localStatesFut.join();
+ assertThat(localStates, is(not(anEmptyMap())));
+ assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT,
localStates.values().iterator().next().values().iterator().next().state);
+
+ stopNode(1);
+ waitForScale(node0, 3);
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+ resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 0, 2, 3);
+
+ // Set time in the future to protect us from "getAsync" from the past.
+ // Should be replaced with "sleep" when clock skew validation is
implemented.
+ node0.clock().update(node0.clock().now().addPhysicalTime(
+ SECONDS.toMillis(DEFAULT_IDLE_SAFE_TIME_PROP_DURATION) +
node0.clockService().maxClockSkewMillis())
+ );
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+ // We need wait quite a bit before data is available. Log shows term
mismatches, meaning that right now in only works due to some
+ // miracle. For future improvements we must specify "stable" forced
sub-assignments explicitly, instead of calculating them as an
+ // intersection.
+ Thread.sleep(10_000);
Review Comment:
I agree, and I really want to fix it very soon.
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -299,6 +330,158 @@ void testManualRebalanceIfPartitionIsLost() throws
Exception {
assertThat(errors, is(empty()));
}
+ /**
+ * Tests a scenario where all stable nodes are lost, yet we have data on
one of pending nodes and perform reset partition operation. In
+ * this case we should use that pending node as a source of data for
recovery.
+ *
+ * <p>It goes like this:
+ * <ul>
+ * <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+ * <li>We stop nodes 4 and 5, leaving node 1 alone in stable
assignments.</li>
+ * <li>New distribution is 0, 1 and 3. Rebalance is started via raft
snapshots. It transfers data to node 0, but not node 3.</li>
+ * <li>Node 1 is stopped. Data is only present on node 0.</li>
+ * <li>We execute "resetPartitions" and expect that data from node 0
will be available after that.</li>
+ * </ul>
+ */
+ @Test
+ @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+ public void testIncompleteRebalanceAfterResetPartitions() throws Exception
{
+ int partId = 0;
+
+ Assignments assignment013 = Assignments.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name())
+ );
+
+ IgniteImpl node0 = cluster.node(0);
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 1, 4, 5);
+
+ insertValues(table, partId, 0);
+
+ triggerRaftSnapshot(1, partId);
+ // Second snapshot causes log truncation.
+ triggerRaftSnapshot(1, partId);
+
+ node(1).dropMessages((nodeName, msg) ->
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+
+ stopNodesInParallel(4, 5);
+ waitForScale(node0, 4);
+
+ assertRealAssignments(node0, partId, 0, 1, 3);
+
+ cluster.runningNodes().forEach(node -> node.dropMessages((nodeName,
msg) -> stableKeySwitchMessage(msg, partId, assignment013)));
+
+ CompletableFuture<Void> resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+ var localStatesFut =
node0.disasterRecoveryManager().localPartitionStates(emptySet(),
Set.of(node(3).name()), emptySet());
+ assertThat(localStatesFut, willCompleteSuccessfully());
+
+ Map<TablePartitionId, LocalPartitionStateByNode> localStates =
localStatesFut.join();
+ assertThat(localStates, is(not(anEmptyMap())));
+ assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT,
localStates.values().iterator().next().values().iterator().next().state);
+
+ stopNode(1);
+ waitForScale(node0, 3);
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+ resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 0, 2, 3);
+
+ // Set time in the future to protect us from "getAsync" from the past.
+ // Should be replaced with "sleep" when clock skew validation is
implemented.
+ node0.clock().update(node0.clock().now().addPhysicalTime(
+ SECONDS.toMillis(DEFAULT_IDLE_SAFE_TIME_PROP_DURATION) +
node0.clockService().maxClockSkewMillis())
+ );
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+ // We need wait quite a bit before data is available. Log shows term
mismatches, meaning that right now in only works due to some
+ // miracle. For future improvements we must specify "stable" forced
sub-assignments explicitly, instead of calculating them as an
+ // intersection.
+ Thread.sleep(10_000);
Review Comment:
I think I forgot to add JIRA link, I'll do it
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -299,6 +330,158 @@ void testManualRebalanceIfPartitionIsLost() throws
Exception {
assertThat(errors, is(empty()));
}
+ /**
+ * Tests a scenario where all stable nodes are lost, yet we have data on
one of pending nodes and perform reset partition operation. In
+ * this case we should use that pending node as a source of data for
recovery.
+ *
+ * <p>It goes like this:
+ * <ul>
+ * <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+ * <li>We stop nodes 4 and 5, leaving node 1 alone in stable
assignments.</li>
+ * <li>New distribution is 0, 1 and 3. Rebalance is started via raft
snapshots. It transfers data to node 0, but not node 3.</li>
+ * <li>Node 1 is stopped. Data is only present on node 0.</li>
+ * <li>We execute "resetPartitions" and expect that data from node 0
will be available after that.</li>
+ * </ul>
+ */
+ @Test
+ @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+ public void testIncompleteRebalanceAfterResetPartitions() throws Exception
{
+ int partId = 0;
+
+ Assignments assignment013 = Assignments.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name())
+ );
+
+ IgniteImpl node0 = cluster.node(0);
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 1, 4, 5);
+
+ insertValues(table, partId, 0);
+
+ triggerRaftSnapshot(1, partId);
+ // Second snapshot causes log truncation.
+ triggerRaftSnapshot(1, partId);
+
+ node(1).dropMessages((nodeName, msg) ->
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+
+ stopNodesInParallel(4, 5);
+ waitForScale(node0, 4);
+
+ assertRealAssignments(node0, partId, 0, 1, 3);
+
+ cluster.runningNodes().forEach(node -> node.dropMessages((nodeName,
msg) -> stableKeySwitchMessage(msg, partId, assignment013)));
+
+ CompletableFuture<Void> resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+ var localStatesFut =
node0.disasterRecoveryManager().localPartitionStates(emptySet(),
Set.of(node(3).name()), emptySet());
+ assertThat(localStatesFut, willCompleteSuccessfully());
+
+ Map<TablePartitionId, LocalPartitionStateByNode> localStates =
localStatesFut.join();
+ assertThat(localStates, is(not(anEmptyMap())));
+ assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT,
localStates.values().iterator().next().values().iterator().next().state);
+
+ stopNode(1);
+ waitForScale(node0, 3);
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+ resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 0, 2, 3);
+
+ // Set time in the future to protect us from "getAsync" from the past.
+ // Should be replaced with "sleep" when clock skew validation is
implemented.
+ node0.clock().update(node0.clock().now().addPhysicalTime(
+ SECONDS.toMillis(DEFAULT_IDLE_SAFE_TIME_PROP_DURATION) +
node0.clockService().maxClockSkewMillis())
+ );
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+ // We need wait quite a bit before data is available. Log shows term
mismatches, meaning that right now in only works due to some
+ // miracle. For future improvements we must specify "stable" forced
sub-assignments explicitly, instead of calculating them as an
+ // intersection.
+ Thread.sleep(10_000);
Review Comment:
I think I forgot to add JIRA link, I'll do it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]