Phillippko commented on code in PR #4399:
URL: https://github.com/apache/ignite-3/pull/4399#discussion_r1770206472


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -133,32 +145,38 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     private final MetaStorageMetricSource metaStorageMetricSource;
 
+    private final MetastorageRepairStorage metastorageRepairStorage;
+    private final MetastorageRepair metastorageRepair;
+
     private volatile long appliedRevision = 0;
 
     private volatile MetaStorageConfiguration metaStorageConfiguration;
 
-    private volatile MetaStorageListener followerListener;
-
-    private volatile MetaStorageListener learnerListener;
-
     private final List<ElectionListener> electionListeners = new 
CopyOnWriteArrayList<>();
 
     private final RaftGroupOptionsConfigurer raftGroupOptionsConfigurer;
 
+    private final MetaStorageLearnerManager learnerManager;
+
     /** Gets completed when raft service is started. */
     private final CompletableFuture<Void> raftNodeStarted = new 
CompletableFuture<>();
 
+    private final OrderingFuture<RaftGroupService> raftServiceFuture = new 
OrderingFuture<>();

Review Comment:
   we have a local variable with the same name, let's rename. And add javadoc - 
it's not clear when this future completes. 
   
   It's returned by method "`start(Follower/Leader)Node`", but we also have 
`raftNodeStarted`, that completes immediately after calling start method - 
without waiting for anything



##########
modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.junit.jupiter.api.Test;
+
+class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
+    @Test
+    void repairWhenMgWas1Node() throws Exception {
+        // Node with index 2 will host neither of voting sets.
+        startAndInitCluster(3, new int[]{0}, new int[]{1});
+        waitTillClusterStateIsSavedToVaultOnConductor(0);
+
+        // This makes the MG majority go away.
+        cluster.stopNode(1);
+
+        IgniteImpl igniteImpl0BeforeRestart = igniteImpl(0);
+
+        assertThatMgHasNoMajority(igniteImpl0BeforeRestart);
+
+        initiateMgRepairVia(igniteImpl0BeforeRestart, 1, 0);
+
+        IgniteImpl restartedIgniteImpl0 = waitTillNodeRestartsInternally(0);
+        waitTillMgHasMajority(restartedIgniteImpl0);
+
+        IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+        waitTillMgHasMajority(restartedIgniteImpl2);
+
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl0);
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl2);
+    }
+
+    private static void assertThatMgHasNoMajority(IgniteImpl ignite) {
+        assertThat(ignite.metaStorageManager().get(new ByteArray("abc")), 
willTimeoutIn(1, SECONDS));
+    }
+
+    private static void waitTillMgHasMajority(IgniteImpl ignite) {
+        assertThat(ignite.metaStorageManager().get(new ByteArray("abc")), 
willCompleteSuccessfully());
+    }
+
+    private void initiateMgRepairVia(IgniteImpl conductor, int 
mgReplicationFactor, int... newCmgIndexes) {
+        // TODO: IGNITE-22897 - initiate repair via CLI.
+
+        CompletableFuture<Void> initiationFuture = 
conductor.systemDisasterRecoveryManager()
+                
.resetClusterRepairingMetastorage(List.of(nodeNames(newCmgIndexes)), 
mgReplicationFactor);
+        assertThat(initiationFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void afterRepairingWithReplicationFactor1LeaderPerformsSecondaryDuties() 
throws Exception {
+        startAndInitCluster(2, new int[]{0}, new int[]{1});
+        waitTillClusterStateIsSavedToVaultOnConductor(0);
+
+        // This makes the MG majority go away.
+        cluster.stopNode(1);

Review Comment:
   should we check that we don't have majority now?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java:
##########
@@ -179,10 +178,14 @@ private void startSafeTimeScheduler() {
 
     private CompletableFuture<Void> 
updateLearnersIfSecondaryDutiesAreNotPaused(long term) {
         if (leaderSecondaryDutiesPaused.getAsBoolean()) {
+            LOG.info("Skipping learners update as secondary duties are still 
paused");
+
             return nullCompletedFuture();
         }
 
-        return metaStorageSvcFut.thenCompose(service -> 
resetLearners(service.raftGroupService(), term));
+        LOG.info("Going to really update learners with term {}", term);

Review Comment:
   ```suggestion
           LOG.info("Actually updating learners with term {}", term);
   ```



##########
modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.jetbrains.annotations.Nullable;
+
+abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {

Review Comment:
   Let's add javadocs



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -889,26 +1027,45 @@ public CompletableFuture<IndexWithTerm> raftNodeIndex() {
 
     @Override
     public CompletableFuture<Void> becomeLonelyLeader(long termBeforeChange, 
Set<String> targetVotingSet) {
-        // TODO: IGNITE-22899 - use both parameters.
         return inBusyLockAsync(busyLock, () -> {
-            synchronized (becomeLonelyLeaderMutex) {
-                leaderSecondaryDutiesPaused = targetVotingSet.size() > 1;
+            synchronized (peersChangeMutex) {
+                if (peersChangeState != null) {
+                    return failedFuture(new IgniteInternalException(
+                            INTERNAL_ERR,
+                            "Peers change is under way [state=" + 
peersChangeState + "]."
+                    ));
+                }
+
+                peersChangeState = targetVotingSet.size() > 1 ? new 
PeersChangeState(termBeforeChange, targetVotingSet) : null;

Review Comment:
   didnt' grasp, why it's null if targetVotingSize <= 1?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -133,32 +145,38 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
 
     private final MetaStorageMetricSource metaStorageMetricSource;
 
+    private final MetastorageRepairStorage metastorageRepairStorage;
+    private final MetastorageRepair metastorageRepair;

Review Comment:
   ```suggestion
       private final MetastorageRepairStorage metastorageRepairStorage;
       
       private final MetastorageRepair metastorageRepair;
   ```



##########
modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.jetbrains.annotations.Nullable;
+
+abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 0;
+    }
+
+    final void startAndInitCluster(int nodeCount, int[] cmgNodeIndexes, int[] 
metastorageNodeIndexes) {
+        // Pre-allocate this to make sure that for each pair of nodes, if they 
start almost at the same time, at least one is able to make
+        // an initial sync to another one.
+        cluster.overrideSeedsCount(10);
+
+        cluster.startAndInit(nodeCount, paramsBuilder -> {
+            paramsBuilder.cmgNodeNames(nodeNames(cmgNodeIndexes));
+            
paramsBuilder.metaStorageNodeNames(nodeNames(metastorageNodeIndexes));
+        });
+    }
+
+    final void waitTillClusterStateIsSavedToVaultOnConductor(int nodeIndex) 
throws InterruptedException {
+        assertTrue(waitForCondition(
+                () -> new 
SystemDisasterRecoveryStorage(igniteImpl(nodeIndex).vault()).readClusterState() 
!= null,
+                SECONDS.toMillis(10)
+        ));
+    }
+
+    final String[] nodeNames(int... nodeIndexes) {
+        return IntStream.of(nodeIndexes)
+                .mapToObj(cluster::nodeName)
+                .toArray(String[]::new);
+    }
+
+    final IgniteImpl waitTillNodeRestartsInternally(int nodeIndex) throws 
InterruptedException {
+        // restartOrShutdownFuture() becomes non-null when restart or shutdown 
is initiated; we know it's restart.
+
+        assertTrue(
+                waitForCondition(() -> restartOrShutdownFuture(nodeIndex) != 
null, SECONDS.toMillis(20)),
+                "Node did not attempt to be restarted (or shut down) in time"
+        );
+        assertThat(restartOrShutdownFuture(nodeIndex), 
willCompleteSuccessfully());
+
+        return unwrapIgniteImpl(cluster.server(nodeIndex).api());
+    }
+
+    @Nullable
+    private CompletableFuture<Void> restartOrShutdownFuture(int nodeIndex) {
+        return ((IgniteServerImpl) 
cluster.server(nodeIndex)).restartOrShutdownFuture();
+    }
+
+    static ClusterState clusterState(IgniteImpl restartedIgniteImpl1)

Review Comment:
   ```suggestion
       static ClusterState clusterState(IgniteImpl ignite)
   ```



##########
modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.junit.jupiter.api.Test;
+
+class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
+    @Test
+    void repairWhenMgWas1Node() throws Exception {
+        // Node with index 2 will host neither of voting sets.
+        startAndInitCluster(3, new int[]{0}, new int[]{1});
+        waitTillClusterStateIsSavedToVaultOnConductor(0);
+
+        // This makes the MG majority go away.
+        cluster.stopNode(1);
+
+        IgniteImpl igniteImpl0BeforeRestart = igniteImpl(0);
+
+        assertThatMgHasNoMajority(igniteImpl0BeforeRestart);
+
+        initiateMgRepairVia(igniteImpl0BeforeRestart, 1, 0);
+
+        IgniteImpl restartedIgniteImpl0 = waitTillNodeRestartsInternally(0);
+        waitTillMgHasMajority(restartedIgniteImpl0);
+
+        IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+        waitTillMgHasMajority(restartedIgniteImpl2);
+
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl0);
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl2);

Review Comment:
   ```suggestion
           assertResetClusterMessageIsNotPresentAt(restartedNode2);
   ```
   just to be more concise



##########
modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.junit.jupiter.api.Test;
+
+class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {

Review Comment:
   javadocs



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -380,35 +426,106 @@ private CompletableFuture<? extends RaftGroupService> 
startFollowerNode(
         return raftServiceFuture;
     }
 
-    private CompletableFuture<? extends RaftGroupService> startLearnerNode(
-            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration 
disruptorConfig
-    ) throws NodeStoppingException {
-        String thisNodeName = clusterService.nodeName();
+    private boolean peersChangeStateExists() {
+        synchronized (peersChangeMutex) {
+            return peersChangeState != null;
+        }
+    }
 
-        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+    private RaftNodeId raftNodeId() {
+        return raftNodeId(new Peer(clusterService.nodeName()));
+    }
 
-        Peer localPeer = configuration.learner(thisNodeName);
+    private static RaftNodeId raftNodeId(Peer localPeer) {
+        return new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer);
+    }
 
-        assert localPeer != null;
+    private void onConfigurationCommitted(CommittedConfiguration 
configuration) {
+        LOG.info("MS configuration committed {}", configuration);
 
-        learnerListener = new MetaStorageListener(storage, clusterTime);
+        // TODO: IGNITE-23210 - use thenAccept() when implemented.
+        raftServiceFuture
+                .handle((raftService, ex) -> {
+                    if (ex != null) {
+                        throw ExceptionUtils.sneakyThrow(ex);
+                    }
 
-        return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                raftNodeId(localPeer),
-                configuration,
-                learnerListener,
-                RaftGroupEventsListener.noopLsnr,
-                disruptorConfig,
-                raftGroupOptionsConfigurer
-        );
+                    updateRaftClientConfigIfEventIsNotStale(configuration, 
raftService);
+
+                    handlePeersChange(configuration, raftService);
+
+                    return null;
+                })
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        LOG.error("Error while handling ConfigurationCommitted 
event", ex);
+                    }
+                });
     }
 
-    private RaftNodeId raftNodeId() {
-        return raftNodeId(new Peer(clusterService.nodeName()));
+    private void 
updateRaftClientConfigIfEventIsNotStale(CommittedConfiguration configuration, 
RaftGroupService raftService) {
+        IndexWithTerm newIndexWithTerm = new 
IndexWithTerm(configuration.index(), configuration.term());
+
+        lastHandledIndexWithTerm.updateAndGet(existingIndexWithTerm -> {
+            if (newIndexWithTerm.compareTo(existingIndexWithTerm) > 0) {
+                LOG.info("Updating raftService config to {}", configuration);
+
+                
raftService.updateConfiguration(PeersAndLearners.fromConsistentIds(
+                        Set.copyOf(configuration.peers()),
+                        Set.copyOf(configuration.learners())
+                ));
+
+                return newIndexWithTerm;
+            } else {
+                LOG.info("Skipping update for stale config {}, actual is {}", 
newIndexWithTerm, existingIndexWithTerm);
+
+                return existingIndexWithTerm;
+            }
+        });
     }
 
-    private static RaftNodeId raftNodeId(Peer localPeer) {
-        return new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer);
+    private void handlePeersChange(CommittedConfiguration configuration, 
RaftGroupService raftService) {
+        synchronized (peersChangeMutex) {
+            if (peersChangeState == null || configuration.term() <= 
peersChangeState.termBeforeChange) {
+                return;
+            }
+
+            PeersChangeState currentState = peersChangeState;
+
+            if (thisNodeIsEstablishedAsLonelyLeader(configuration)) {
+                LOG.info("Lonely leader has been established, changing voting 
set to target set: {}", currentState.targetPeers);
+
+                PeersAndLearners newConfig = 
PeersAndLearners.fromConsistentIds(currentState.targetPeers);
+                raftService.changePeersAndLearners(newConfig, 
configuration.term())
+                        .whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                LOG.error("Error while changing voting set to 
{}", ex, currentState.targetPeers);
+                            } else {
+                                LOG.info("Changed voting set successfully to 
{}", currentState.targetPeers);
+                            }
+                        });
+            } else if (targetVotingSetIsEstablished(configuration, 
currentState)) {
+                LOG.info("Target voting set has been established, unpausing 
secondary duties");
+
+                peersChangeState = null;

Review Comment:
   Should we set it to null if "thisNodeIsEstablishedAsLonelyLeader"? When it 
becomes null in that case?



##########
modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageGroupDisasterRecoveryTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.app.IgniteServerImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.junit.jupiter.api.Test;
+
+class ItMetastorageGroupDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
+    @Test
+    void repairWhenMgWas1Node() throws Exception {
+        // Node with index 2 will host neither of voting sets.
+        startAndInitCluster(3, new int[]{0}, new int[]{1});
+        waitTillClusterStateIsSavedToVaultOnConductor(0);
+
+        // This makes the MG majority go away.
+        cluster.stopNode(1);
+
+        IgniteImpl igniteImpl0BeforeRestart = igniteImpl(0);
+
+        assertThatMgHasNoMajority(igniteImpl0BeforeRestart);
+
+        initiateMgRepairVia(igniteImpl0BeforeRestart, 1, 0);
+
+        IgniteImpl restartedIgniteImpl0 = waitTillNodeRestartsInternally(0);
+        waitTillMgHasMajority(restartedIgniteImpl0);
+
+        IgniteImpl restartedIgniteImpl2 = waitTillNodeRestartsInternally(2);
+        waitTillMgHasMajority(restartedIgniteImpl2);
+
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl0);
+        assertResetClusterMessageIsNotPresentAt(restartedIgniteImpl2);
+    }
+
+    private static void assertThatMgHasNoMajority(IgniteImpl ignite) {
+        assertThat(ignite.metaStorageManager().get(new ByteArray("abc")), 
willTimeoutIn(1, SECONDS));
+    }
+
+    private static void waitTillMgHasMajority(IgniteImpl ignite) {
+        assertThat(ignite.metaStorageManager().get(new ByteArray("abc")), 
willCompleteSuccessfully());
+    }
+
+    private void initiateMgRepairVia(IgniteImpl conductor, int 
mgReplicationFactor, int... newCmgIndexes) {

Review Comment:
   something is missing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to