tkalkirill commented on code in PR #3945:
URL: https://github.com/apache/ignite-3/pull/3945#discussion_r1647524011


##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -239,10 +270,10 @@ void 
testManualRebalanceIfMajorityIsLostSpecifyPartitions() throws Exception {
 
         // Should fail because majority was lost.
         List<Throwable> fixingPartErrorsBeforeReset = insertValues(table, 
fixingPartId, 0);
-        assertThat(fixingPartErrorsBeforeReset, Matchers.not(empty()));
+        assertThat(fixingPartErrorsBeforeReset, not(empty()));

Review Comment:
   I'm just having fun.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -299,6 +330,158 @@ void testManualRebalanceIfPartitionIsLost() throws 
Exception {
         assertThat(errors, is(empty()));
     }
 
+    /**
+     * Tests a scenario where all stable nodes are lost, yet we have data on 
one of pending nodes and perform reset partition operation. In
+     * this case we should use that pending node as a source of data for 
recovery.
+     *
+     * <p>It goes like this:
+     * <ul>
+     *     <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+     *     <li>We stop nodes 4 and 5, leaving node 1 alone in stable 
assignments.</li>
+     *     <li>New distribution is 0, 1 and 3. Rebalance is started via raft 
snapshots. It transfers data to node 0, but not node 3.</li>
+     *     <li>Node 1 is stopped. Data is only present on node 0.</li>
+     *     <li>We execute "resetPartitions" and expect that data from node 0 
will be available after that.</li>
+     * </ul>
+     */
+    @Test
+    @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+    public void testIncompleteRebalanceAfterResetPartitions() throws Exception 
{
+        int partId = 0;
+
+        Assignments assignment013 = Assignments.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name())
+        );
+
+        IgniteImpl node0 = cluster.node(0);
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 1, 4, 5);
+
+        insertValues(table, partId, 0);
+
+        triggerRaftSnapshot(1, partId);
+        // Second snapshot causes log truncation.
+        triggerRaftSnapshot(1, partId);
+
+        node(1).dropMessages((nodeName, msg) -> 
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+
+        stopNodesInParallel(4, 5);
+        waitForScale(node0, 4);
+
+        assertRealAssignments(node0, partId, 0, 1, 3);
+
+        cluster.runningNodes().forEach(node -> node.dropMessages((nodeName, 
msg) -> stableKeySwitchMessage(msg, partId, assignment013)));
+
+        CompletableFuture<Void> resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+        var localStatesFut = 
node0.disasterRecoveryManager().localPartitionStates(emptySet(), 
Set.of(node(3).name()), emptySet());
+        assertThat(localStatesFut, willCompleteSuccessfully());
+
+        Map<TablePartitionId, LocalPartitionStateByNode> localStates = 
localStatesFut.join();
+        assertThat(localStates, is(not(anEmptyMap())));
+        assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, 
localStates.values().iterator().next().values().iterator().next().state);
+
+        stopNode(1);
+        waitForScale(node0, 3);
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+        resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 0, 2, 3);
+
+        // Set time in the future to protect us from "getAsync" from the past.
+        // Should be replaced with "sleep" when clock skew validation is 
implemented.
+        node0.clock().update(node0.clock().now().addPhysicalTime(
+                SECONDS.toMillis(DEFAULT_IDLE_SAFE_TIME_PROP_DURATION) + 
node0.clockService().maxClockSkewMillis())
+        );
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+        //  We need wait quite a bit before data is available. Log shows term 
mismatches, meaning that right now in only works due to some
+        //  miracle. For future improvements we must specify "stable" forced 
sub-assignments explicitly, instead of calculating them as an
+        //  intersection.
+        Thread.sleep(10_000);

Review Comment:
   Someday we will fix everything.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java:
##########
@@ -79,31 +123,165 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
         HybridTimestamp msSafeTime = 
disasterRecoveryManager.metaStorageManager.timestampByRevision(msRevision);
 
         int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msSafeTime.longValue());
+
+        if (this.catalogVersion != catalogVersion) {
+            return CompletableFuture.failedFuture(
+                    new DisasterRecoveryException(CLUSTER_NOT_IDLE_ERR, 
"Cluster is not idle, concurrent DDL update detected.")
+            );
+        }
+
         Catalog catalog = 
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
 
         CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
         CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
 
+        CompletableFuture<Map<TablePartitionId, 
LocalPartitionStateMessageByNode>> localStates
+                = 
disasterRecoveryManager.localPartitionStatesInternal(Set.of(zoneDescriptor.name()),
 emptySet(), partitionIds, catalog);
+
         CompletableFuture<Set<String>> dataNodesFuture = 
disasterRecoveryManager.dzManager.dataNodes(msRevision, catalogVersion, zoneId);
 
-        return dataNodesFuture.thenCompose(dataNodes -> {
+        return dataNodesFuture.thenCombine(localStates, (dataNodes, 
localStatesMap) -> {
             Set<String> nodeConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology()
                     .stream()
                     .map(NodeWithAttributes::nodeName)
                     .collect(toSet());
 
-            CompletableFuture<?>[] futures = 
RebalanceUtil.forceAssignmentsUpdate(
+            CompletableFuture<?>[] futures = forceAssignmentsUpdate(
                     tableDescriptor,
                     zoneDescriptor,
-                    partitionIds,
                     dataNodes,
                     nodeConsistentIds,
                     msRevision,
-                    disasterRecoveryManager.metaStorageManager
+                    disasterRecoveryManager.metaStorageManager,
+                    localStatesMap
             );
 
             return allOf(futures);
-        });
+        }).thenCompose(Function.identity());
+    }
+
+    /**
+     * Sets force assignments for the zone/table if it's required. The 
condition for force reassignment is the absence of stable
+     * assignments' majority within the set of currently alive nodes. In this 
case we calculate new assignments that include all alive
+     * stable nodes, and try to save ot with a {@link Assignments#force()} 
flag enabled.
+     *
+     * @param tableDescriptor Table descriptor.
+     * @param zoneDescriptor Zone descriptor.
+     * @param dataNodes Current DZ data nodes.
+     * @param aliveNodesConsistentIds Set of alive nodes according to logical 
topology.
+     * @param revision Meta-storage revision to be associated with 
reassignment.
+     * @param metaStorageManager Meta-storage manager.
+     * @param localStatesMap Local partition states retrieved by {@link 
DisasterRecoveryManager#localPartitionStates(Set, Set, Set)}.
+     * @return A future that will be completed when reassignments data is 
written into a meta-storage, if that's required.
+     */
+    private CompletableFuture<?>[] forceAssignmentsUpdate(
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            Set<String> dataNodes,
+            Set<String> aliveNodesConsistentIds,
+            long revision,
+            MetaStorageManager metaStorageManager,
+            Map<TablePartitionId, LocalPartitionStateMessageByNode> 
localStatesMap
+    ) {
+        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = 
tableAssignments(
+                metaStorageManager,
+                tableDescriptor.id(),
+                partitionIds,
+                zoneDescriptor.partitions()
+        );
+
+        Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, 
aliveNodesConsistentIds);
+
+        int[] partitionIdsArray = partitionIds.isEmpty()
+                ? IntStream.range(0, zoneDescriptor.partitions()).toArray()
+                : partitionIds.stream().mapToInt(Integer::intValue).toArray();
+
+        CompletableFuture<?>[] futures = new 
CompletableFuture[partitionIdsArray.length];
+
+        for (int partitionId = 0; partitionId < partitionIdsArray.length; 
partitionId++) {
+            TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partitionIdsArray[partitionId]);
+
+            futures[partitionId] = 
tableAssignmentsFut.thenCompose(tableAssignments ->
+                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
manualPartitionUpdate(
+                            replicaGrpId,
+                            aliveDataNodes,
+                            aliveNodesConsistentIds,
+                            zoneDescriptor.replicas(),
+                            revision,
+                            metaStorageManager,
+                            
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
+                            localStatesMap.get(replicaGrpId)
+                    )).thenAccept(res -> {
+                        DisasterRecoveryManager.LOG.info(
+                                "Partition {} returned {} status on reset 
attempt", replicaGrpId, UpdateStatus.valueOf(res)
+                        );
+                    }
+            );
+        }
+
+        return futures;
+    }
+
+    private static CompletableFuture<Integer> manualPartitionUpdate(
+            TablePartitionId partId,
+            Collection<String> aliveDataNodes,
+            Set<String> aliveNodesConsistentIds,
+            int replicas,
+            long revision,
+            MetaStorageManager metaStorageMgr,
+            Set<Assignment> currentAssignments,
+            LocalPartitionStateMessageByNode localPartitionStateMessageByNode
+    ) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+        //  This is a naive approach that doesn't exclude nodes in error 
state, if they exist.
+        Set<Assignment> partAssignments = new HashSet<>();
+        if (localPartitionStateMessageByNode != null) {
+            for (Entry<String, LocalPartitionStateMessage> entry : 
localPartitionStateMessageByNode.entrySet()) {
+                if (aliveNodesConsistentIds.contains(entry.getKey()) && 
(entry.getValue().state() == LocalPartitionStateEnum.HEALTHY
+                        || entry.getValue().state() == 
LocalPartitionStateEnum.CATCHING_UP)) {
+                    partAssignments.add(Assignment.forPeer(entry.getKey()));
+                }
+            }
+        }
+
+        Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentAssignments, partAssignments);
+
+        if (aliveStableNodes.size() >= (replicas / 2 + 1)) {
+            return 
CompletableFuture.completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());

Review Comment:
   Maybe use static import?



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -299,6 +331,167 @@ void testManualRebalanceIfPartitionIsLost() throws 
Exception {
         assertThat(errors, is(empty()));
     }
 
+    /**
+     * Tests a scenario where all stable nodes are lost, yet we have data on 
one of pending nodes and perform reset partition operation. In
+     * this case we should use that pending node as a source of data for 
recovery.
+     *
+     * <p>It goes like this:
+     * <ul>
+     *     <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+     *     <li>We stop nodes 4 and 5, leaving node 1 alone in stable 
assignments.</li>
+     *     <li>New distribution is 0, 1 and 3. Rebalance is started via raft 
snapshots. It transfers data to node 0, but not node 3.</li>
+     *     <li>Node 1 is stopped. Data is only present on node 0.</li>
+     *     <li>We execute "resetPartitions" and expect that data from node 0 
will be available after that.</li>
+     * </ul>
+     */
+    @Test
+    @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+    public void testIncompleteRebalanceAfterResetPartitions() throws Exception 
{
+        int partId = 0;
+
+        Assignments assignment013 = Assignments.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name())
+        );
+
+        IgniteImpl node0 = cluster.node(0);
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 1, 4, 5);
+
+        insertValues(table, partId, 0);
+
+        triggerRaftSnapshot(1, partId);
+        // Second snapshot causes log truncation.
+        triggerRaftSnapshot(1, partId);
+
+        node(1).dropMessages((nodeName, msg) -> 
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+
+        stopNodesInParallel(4, 5);
+        waitForScale(node0, 4);
+
+        assertRealAssignments(node0, partId, 0, 1, 3);
+
+        cluster.runningNodes().forEach(node -> {
+            BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg) 
-> stableKeySwitchMessage(msg, partId, assignment013);
+            BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
+
+            if (oldPredicate == null) {
+                node.dropMessages(newPredicate);
+            } else {
+                node.dropMessages(oldPredicate.or(newPredicate));
+            }
+        });
+
+        CompletableFuture<Void> resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+        var localStatesFut = 
node0.disasterRecoveryManager().localPartitionStates(emptySet(), 
Set.of(node(3).name()), emptySet());
+        assertThat(localStatesFut, willCompleteSuccessfully());
+
+        Map<TablePartitionId, LocalPartitionStateByNode> localStates = 
localStatesFut.join();
+        assertThat(localStates, is(not(anEmptyMap())));
+        assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, 
localStates.values().iterator().next().values().iterator().next().state);
+
+        stopNode(1);
+        waitForScale(node0, 3);
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+        resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 0, 2, 3);
+
+        // Set time in the future to protect us from "getAsync" from the past.
+        // Should be replaced with "sleep" when clock skew validation is 
implemented.
+        node0.clock().update(node0.clock().now().addPhysicalTime(
+                SECONDS.toMillis(DEFAULT_IDLE_SAFE_TIME_PROP_DURATION) + 
node0.clockService().maxClockSkewMillis())
+        );
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+        //  We need wait quite a bit before data is available. Log shows term 
mismatches, meaning that right now it only works due to some
+        //  miracle. For future improvements we must specify "stable" forced 
sub-assignments explicitly, instead of calculating them as an
+        //  intersection.
+        Thread.sleep(10_000);
+
+        // "forEach" makes "i" effectively final, which is convenient for 
internal lambda.
+        IntStream.range(0, ENTRIES).forEach(i -> {
+            CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null, 
Tuple.create(of("id", i)));
+            assertThat(fut, willCompleteSuccessfully());
+
+            assertNotNull(fut.join());
+        });
+    }
+
+    private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, 
Assignments blockedAssignments) {
+        if (msg instanceof WriteActionRequest) {
+            var writeActionRequest = (WriteActionRequest) msg;
+            WriteCommand command = writeActionRequest.deserializedCommand();
+
+            if (command instanceof MultiInvokeCommand) {
+                MultiInvokeCommand multiInvokeCommand = (MultiInvokeCommand) 
command;
+
+                Statement andThen = multiInvokeCommand.iif().andThen();
+
+                if (andThen instanceof UpdateStatement) {
+                    UpdateStatement updateStatement = (UpdateStatement) 
andThen;
+                    Collection<Operation> operations = 
updateStatement.update().operations();
+
+                    ByteArray raftConfigurationAppliedKey = 
raftConfigurationAppliedKey(new TablePartitionId(tableId, partId));
+
+                    for (Operation operation : operations) {
+                        ByteArray opKey = new ByteArray(operation.key());
+
+                        if (operation.type() == OperationType.PUT && 
opKey.equals(raftConfigurationAppliedKey)) {
+                            return 
blockedAssignments.equals(ByteUtils.fromBytes(operation.value()));
+                        }
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private void waitForPartitionState(IgniteImpl node0, 
GlobalPartitionStateEnum expectedState) throws InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            Map<TablePartitionId, GlobalPartitionState> map = 
node0.disasterRecoveryManager()
+                    .globalPartitionStates(Set.of(zoneName), 
emptySet()).join();
+
+            System.out.println(map);
+            
System.out.println(node0.disasterRecoveryManager().localPartitionStates(emptySet(),
 emptySet(), emptySet()).join());

Review Comment:
   Maybe logged or deleted?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java:
##########
@@ -79,31 +123,165 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
         HybridTimestamp msSafeTime = 
disasterRecoveryManager.metaStorageManager.timestampByRevision(msRevision);
 
         int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msSafeTime.longValue());
+
+        if (this.catalogVersion != catalogVersion) {
+            return CompletableFuture.failedFuture(

Review Comment:
   Maybe use static import ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java:
##########
@@ -79,31 +123,165 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
         HybridTimestamp msSafeTime = 
disasterRecoveryManager.metaStorageManager.timestampByRevision(msRevision);
 
         int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msSafeTime.longValue());
+
+        if (this.catalogVersion != catalogVersion) {
+            return CompletableFuture.failedFuture(
+                    new DisasterRecoveryException(CLUSTER_NOT_IDLE_ERR, 
"Cluster is not idle, concurrent DDL update detected.")
+            );
+        }
+
         Catalog catalog = 
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
 
         CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
         CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
 
+        CompletableFuture<Map<TablePartitionId, 
LocalPartitionStateMessageByNode>> localStates
+                = 
disasterRecoveryManager.localPartitionStatesInternal(Set.of(zoneDescriptor.name()),
 emptySet(), partitionIds, catalog);
+
         CompletableFuture<Set<String>> dataNodesFuture = 
disasterRecoveryManager.dzManager.dataNodes(msRevision, catalogVersion, zoneId);
 
-        return dataNodesFuture.thenCompose(dataNodes -> {
+        return dataNodesFuture.thenCombine(localStates, (dataNodes, 
localStatesMap) -> {
             Set<String> nodeConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology()
                     .stream()
                     .map(NodeWithAttributes::nodeName)
                     .collect(toSet());
 
-            CompletableFuture<?>[] futures = 
RebalanceUtil.forceAssignmentsUpdate(
+            CompletableFuture<?>[] futures = forceAssignmentsUpdate(
                     tableDescriptor,
                     zoneDescriptor,
-                    partitionIds,
                     dataNodes,
                     nodeConsistentIds,
                     msRevision,
-                    disasterRecoveryManager.metaStorageManager
+                    disasterRecoveryManager.metaStorageManager,
+                    localStatesMap
             );
 
             return allOf(futures);
-        });
+        }).thenCompose(Function.identity());
+    }
+
+    /**
+     * Sets force assignments for the zone/table if it's required. The 
condition for force reassignment is the absence of stable
+     * assignments' majority within the set of currently alive nodes. In this 
case we calculate new assignments that include all alive
+     * stable nodes, and try to save ot with a {@link Assignments#force()} 
flag enabled.
+     *
+     * @param tableDescriptor Table descriptor.
+     * @param zoneDescriptor Zone descriptor.
+     * @param dataNodes Current DZ data nodes.
+     * @param aliveNodesConsistentIds Set of alive nodes according to logical 
topology.
+     * @param revision Meta-storage revision to be associated with 
reassignment.
+     * @param metaStorageManager Meta-storage manager.
+     * @param localStatesMap Local partition states retrieved by {@link 
DisasterRecoveryManager#localPartitionStates(Set, Set, Set)}.
+     * @return A future that will be completed when reassignments data is 
written into a meta-storage, if that's required.
+     */
+    private CompletableFuture<?>[] forceAssignmentsUpdate(
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            Set<String> dataNodes,
+            Set<String> aliveNodesConsistentIds,
+            long revision,
+            MetaStorageManager metaStorageManager,
+            Map<TablePartitionId, LocalPartitionStateMessageByNode> 
localStatesMap
+    ) {
+        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = 
tableAssignments(
+                metaStorageManager,
+                tableDescriptor.id(),
+                partitionIds,
+                zoneDescriptor.partitions()
+        );
+
+        Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, 
aliveNodesConsistentIds);
+
+        int[] partitionIdsArray = partitionIds.isEmpty()
+                ? IntStream.range(0, zoneDescriptor.partitions()).toArray()
+                : partitionIds.stream().mapToInt(Integer::intValue).toArray();
+
+        CompletableFuture<?>[] futures = new 
CompletableFuture[partitionIdsArray.length];
+
+        for (int partitionId = 0; partitionId < partitionIdsArray.length; 
partitionId++) {
+            TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partitionIdsArray[partitionId]);
+
+            futures[partitionId] = 
tableAssignmentsFut.thenCompose(tableAssignments ->
+                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
manualPartitionUpdate(
+                            replicaGrpId,
+                            aliveDataNodes,
+                            aliveNodesConsistentIds,
+                            zoneDescriptor.replicas(),
+                            revision,
+                            metaStorageManager,
+                            
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
+                            localStatesMap.get(replicaGrpId)
+                    )).thenAccept(res -> {
+                        DisasterRecoveryManager.LOG.info(
+                                "Partition {} returned {} status on reset 
attempt", replicaGrpId, UpdateStatus.valueOf(res)
+                        );
+                    }
+            );
+        }
+
+        return futures;
+    }
+
+    private static CompletableFuture<Integer> manualPartitionUpdate(

Review Comment:
   The method is more difficult to read because it contains many lines of code, 
I would divide it into several methods, but at your discretion.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -299,6 +331,167 @@ void testManualRebalanceIfPartitionIsLost() throws 
Exception {
         assertThat(errors, is(empty()));
     }
 
+    /**
+     * Tests a scenario where all stable nodes are lost, yet we have data on 
one of pending nodes and perform reset partition operation. In
+     * this case we should use that pending node as a source of data for 
recovery.
+     *
+     * <p>It goes like this:
+     * <ul>
+     *     <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+     *     <li>We stop nodes 4 and 5, leaving node 1 alone in stable 
assignments.</li>
+     *     <li>New distribution is 0, 1 and 3. Rebalance is started via raft 
snapshots. It transfers data to node 0, but not node 3.</li>
+     *     <li>Node 1 is stopped. Data is only present on node 0.</li>
+     *     <li>We execute "resetPartitions" and expect that data from node 0 
will be available after that.</li>
+     * </ul>
+     */
+    @Test
+    @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+    public void testIncompleteRebalanceAfterResetPartitions() throws Exception 
{
+        int partId = 0;
+
+        Assignments assignment013 = Assignments.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name())
+        );
+
+        IgniteImpl node0 = cluster.node(0);
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 1, 4, 5);
+
+        insertValues(table, partId, 0);
+
+        triggerRaftSnapshot(1, partId);
+        // Second snapshot causes log truncation.
+        triggerRaftSnapshot(1, partId);
+
+        node(1).dropMessages((nodeName, msg) -> 
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+
+        stopNodesInParallel(4, 5);
+        waitForScale(node0, 4);
+
+        assertRealAssignments(node0, partId, 0, 1, 3);
+
+        cluster.runningNodes().forEach(node -> {
+            BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg) 
-> stableKeySwitchMessage(msg, partId, assignment013);
+            BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
+
+            if (oldPredicate == null) {
+                node.dropMessages(newPredicate);
+            } else {
+                node.dropMessages(oldPredicate.or(newPredicate));
+            }
+        });
+
+        CompletableFuture<Void> resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+        var localStatesFut = 
node0.disasterRecoveryManager().localPartitionStates(emptySet(), 
Set.of(node(3).name()), emptySet());
+        assertThat(localStatesFut, willCompleteSuccessfully());
+
+        Map<TablePartitionId, LocalPartitionStateByNode> localStates = 
localStatesFut.join();
+        assertThat(localStates, is(not(anEmptyMap())));
+        assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, 
localStates.values().iterator().next().values().iterator().next().state);
+
+        stopNode(1);
+        waitForScale(node0, 3);
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+
+        resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet());
+        assertThat(resetFuture, willCompleteSuccessfully());
+
+        waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+        awaitPrimaryReplica(node0, partId);
+        assertRealAssignments(node0, partId, 0, 2, 3);
+
+        // Set time in the future to protect us from "getAsync" from the past.
+        // Should be replaced with "sleep" when clock skew validation is 
implemented.
+        node0.clock().update(node0.clock().now().addPhysicalTime(
+                SECONDS.toMillis(DEFAULT_IDLE_SAFE_TIME_PROP_DURATION) + 
node0.clockService().maxClockSkewMillis())
+        );
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21303
+        //  We need wait quite a bit before data is available. Log shows term 
mismatches, meaning that right now it only works due to some
+        //  miracle. For future improvements we must specify "stable" forced 
sub-assignments explicitly, instead of calculating them as an
+        //  intersection.
+        Thread.sleep(10_000);
+
+        // "forEach" makes "i" effectively final, which is convenient for 
internal lambda.
+        IntStream.range(0, ENTRIES).forEach(i -> {
+            CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null, 
Tuple.create(of("id", i)));
+            assertThat(fut, willCompleteSuccessfully());
+
+            assertNotNull(fut.join());
+        });
+    }
+
+    private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, 
Assignments blockedAssignments) {
+        if (msg instanceof WriteActionRequest) {
+            var writeActionRequest = (WriteActionRequest) msg;
+            WriteCommand command = writeActionRequest.deserializedCommand();
+
+            if (command instanceof MultiInvokeCommand) {
+                MultiInvokeCommand multiInvokeCommand = (MultiInvokeCommand) 
command;
+
+                Statement andThen = multiInvokeCommand.iif().andThen();
+
+                if (andThen instanceof UpdateStatement) {
+                    UpdateStatement updateStatement = (UpdateStatement) 
andThen;
+                    Collection<Operation> operations = 
updateStatement.update().operations();
+
+                    ByteArray raftConfigurationAppliedKey = 
raftConfigurationAppliedKey(new TablePartitionId(tableId, partId));
+
+                    for (Operation operation : operations) {
+                        ByteArray opKey = new ByteArray(operation.key());
+
+                        if (operation.type() == OperationType.PUT && 
opKey.equals(raftConfigurationAppliedKey)) {
+                            return 
blockedAssignments.equals(ByteUtils.fromBytes(operation.value()));
+                        }
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private void waitForPartitionState(IgniteImpl node0, 
GlobalPartitionStateEnum expectedState) throws InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            Map<TablePartitionId, GlobalPartitionState> map = 
node0.disasterRecoveryManager()
+                    .globalPartitionStates(Set.of(zoneName), 
emptySet()).join();
+
+            System.out.println(map);

Review Comment:
   Maybe logged or deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to