sanpwc commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r847519203


##########
modules/api/src/main/java/org/apache/ignite/Ignition.java:
##########
@@ -102,4 +111,23 @@ public interface Ignition {
      * @throws IllegalArgumentException if null is specified instead of node 
name.
      */
     public void stop(String name);
+
+    /**
+     * Initializes the cluster that the given node is present in.
+     *
+     * @param name name of the node that the initialization request will be 
sent to.
+     * @param metaStorageNodeNames names of nodes that will host the Meta 
Storage and the CMG.
+     * @throws NodeStoppingException If node stopping intention was detected.

Review Comment:
   It's not valid to throw `NodeStoppingException extends 
IgniteInternalCheckedException` from public API.



##########
modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java:
##########
@@ -66,15 +70,21 @@ public class ItConfigCommandTest extends AbstractCliTest {
     private IgniteImpl node;
 
     @BeforeEach
-    void setup(@WorkDirectory Path workDir, TestInfo testInfo) {
-        node = (IgniteImpl) IgnitionManager.start(testNodeName(testInfo, 0), 
null, workDir);
+    void setup(@WorkDirectory Path workDir, TestInfo testInfo) throws 
NodeStoppingException {

Review Comment:
   As was mentioned above it's not valid to throw internal 
NodeStoppingException from public API.



##########
modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java:
##########
@@ -66,15 +70,21 @@ public class ItConfigCommandTest extends AbstractCliTest {
     private IgniteImpl node;
 
     @BeforeEach
-    void setup(@WorkDirectory Path workDir, TestInfo testInfo) {
-        node = (IgniteImpl) IgnitionManager.start(testNodeName(testInfo, 0), 
null, workDir);
+    void setup(@WorkDirectory Path workDir, TestInfo testInfo) throws 
NodeStoppingException {
+        String nodeName = testNodeName(testInfo, 0);
+
+        CompletableFuture<Ignite> future = IgnitionManager.start(nodeName, 
null, workDir);
+
+        IgnitionManager.init(nodeName, List.of(nodeName));

Review Comment:
   Seems that having nodeName as an init parameter is useful for some test 
scenarios but in general case we might use any of 
org.apache.ignite.internal.app.IgnitionImpl#nodes. I'd rather add overloaded 
version of init() without nodeName param, WDYT?



##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -227,7 +233,13 @@ public class IgniteImpl implements Ignite {
 
         txManager = new TableTxManagerImpl(clusterSvc, new HeapLockManager());
 
-        cmgMgr = new ClusterManagementGroupManager(clusterSvc, raftMgr, 
restComponent);
+        cmgMgr = new ClusterManagementGroupManager(

Review Comment:
   Currently there's such code (by the way it's located in incorrect place) in 
IgniteImpl
   ```
       private void waitForJoinPermission() {
           // TODO https://issues.apache.org/jira/browse/IGNITE-15114
       }
   ```
   meaning that components after CMGManager should start after receiving 
NodeJoinResponse[OK].
   According to
   ```
               raftService
                       .thenCompose(CmgRaftService::joinCluster)
                       .thenRun(() -> 
metaStorageNodes.complete(state.metaStorageNodes()));
   ```
   in
    
`org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager#handleClusterState`
   
   it's not implemented. Could you please fix it?
   



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
+
+        synchronized (raftServiceLock) {
+            raftService = recoverLocalState();
+        }
+
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
+
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
+
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
if the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+        log.info("Local CMG state recovered, starting the CMG");
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> {
+                            if (isLeader) {
+                                return service.readClusterState()
+                                        // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                        // TODO: properly handle this case, 
see https://issues.apache.org/jira/browse/IGNITE-16819
+                                        .thenCompose(state -> state == null ? 
completedFuture(null) : onLeaderElected(service, state));
+                            } else {
+                                return completedFuture(null);
+                            }
+                        })
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found (the Raft service future is {@code null}) 
- this means that the current node has never been initialized
+     *     before.</li>
+     *     <li>Local state found (the Raft service future has been started and 
therefore is not {@code null}),
+     *     but no CMG state present in the Raft storage - this means that the 
node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        synchronized (raftServiceLock) {
+            if (raftService == null) {
+                // Raft service has not been started
+                log.info("Init command received, starting the CMG: " + 
newState);
+
+                raftService = initCmgRaftService(newState);
+
+                resultHook = raftService;
+            } else {
+                // Raft service has been started, which means that this node 
has already received an init command at least once, but
+                // we still need to check that the initialization has 
completed successfully.
+                log.info("Init command received, but the CMG has already been 
started");
+
+                resultHook = raftService.thenCompose(service ->
+                        service.readClusterState()
+                                .thenCompose(state -> {
+                                    if (state == null) {
+                                        // Raft state is empty, perform 
re-initialization
+                                        log.info("CMG state is missing, 
completing initialization");
+
+                                        if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                            return 
service.isCurrentNodeLeader()
+                                                    
.thenCompose(isCurrentNodeLeader ->
+                                                            
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+                                        } else {
+                                            throw new 
IllegalInitArgumentException(String.format(
+                                                    "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                    service.nodeNames(), 
newState.cmgNodes()
+                                            ));
+                                        }
+                                    } else {
+                                        // Node is fully initialized, just 
check some invariants
+                                        log.info("Node has already been 
initialized");
+
+                                        if (state.equals(newState)) {
+                                            return completedFuture(null);
+                                        } else {
+                                            throw new 
IllegalInitArgumentException(String.format(
+                                                    "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                    state, newState
+                                            ));
+                                        }
+                                    }
+                                }));
+            }
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response;
+
+            if (e == null) {
+                response = msgFactory.initCompleteMessage().build();
+            } else {
+                response = msgFactory.initErrorMessage()
+                        .cause(e.getMessage())
+                        .shouldCancel(!(e instanceof 
IllegalInitArgumentException))
+                        .build();
+            }
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            scheduleRemoveFromLogicalTopology(service, node);

Review Comment:
   Why we need scheduling such removal? I mean full delta removal at once.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
+
+        synchronized (raftServiceLock) {
+            raftService = recoverLocalState();
+        }
+
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
+
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
+
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
if the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+        log.info("Local CMG state recovered, starting the CMG");
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> {
+                            if (isLeader) {
+                                return service.readClusterState()
+                                        // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                        // TODO: properly handle this case, 
see https://issues.apache.org/jira/browse/IGNITE-16819
+                                        .thenCompose(state -> state == null ? 
completedFuture(null) : onLeaderElected(service, state));
+                            } else {
+                                return completedFuture(null);
+                            }
+                        })
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found (the Raft service future is {@code null}) 
- this means that the current node has never been initialized
+     *     before.</li>
+     *     <li>Local state found (the Raft service future has been started and 
therefore is not {@code null}),
+     *     but no CMG state present in the Raft storage - this means that the 
node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        synchronized (raftServiceLock) {
+            if (raftService == null) {
+                // Raft service has not been started
+                log.info("Init command received, starting the CMG: " + 
newState);
+
+                raftService = initCmgRaftService(newState);
+
+                resultHook = raftService;
+            } else {
+                // Raft service has been started, which means that this node 
has already received an init command at least once, but
+                // we still need to check that the initialization has 
completed successfully.
+                log.info("Init command received, but the CMG has already been 
started");
+
+                resultHook = raftService.thenCompose(service ->
+                        service.readClusterState()
+                                .thenCompose(state -> {
+                                    if (state == null) {
+                                        // Raft state is empty, perform 
re-initialization
+                                        log.info("CMG state is missing, 
completing initialization");
+
+                                        if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                            return 
service.isCurrentNodeLeader()
+                                                    
.thenCompose(isCurrentNodeLeader ->
+                                                            
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+                                        } else {
+                                            throw new 
IllegalInitArgumentException(String.format(
+                                                    "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                    service.nodeNames(), 
newState.cmgNodes()
+                                            ));
+                                        }
+                                    } else {
+                                        // Node is fully initialized, just 
check some invariants
+                                        log.info("Node has already been 
initialized");
+
+                                        if (state.equals(newState)) {
+                                            return completedFuture(null);
+                                        } else {
+                                            throw new 
IllegalInitArgumentException(String.format(
+                                                    "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                    state, newState
+                                            ));
+                                        }
+                                    }
+                                }));
+            }
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response;
+
+            if (e == null) {
+                response = msgFactory.initCompleteMessage().build();
+            } else {
+                response = msgFactory.initErrorMessage()
+                        .cause(e.getMessage())
+                        .shouldCancel(!(e instanceof 
IllegalInitArgumentException))
+                        .build();
+            }
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {

Review Comment:
   I believe that 
   ```
   register clusterSvc.onAppeared(node ->
     clusterSvc.msgSvc.invoke(
     node.id, clusterState
     )
   );
   ```
   step is missing.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java:
##########
@@ -122,7 +119,11 @@ void setUp(TestInfo testInfo) {
      */
     @AfterEach
     void afterEach() throws Exception {
-        IgniteUtils.closeAll(ItUtils.reverse(clusterNodes));

Review Comment:
   Could you please also remove ItUtils? It's not used.



##########
modules/api/src/main/java/org/apache/ignite/Ignition.java:
##########
@@ -102,4 +111,23 @@ public interface Ignition {
      * @throws IllegalArgumentException if null is specified instead of node 
name.
      */
     public void stop(String name);
+
+    /**
+     * Initializes the cluster that the given node is present in.
+     *
+     * @param name name of the node that the initialization request will be 
sent to.
+     * @param metaStorageNodeNames names of nodes that will host the Meta 
Storage and the CMG.
+     * @throws NodeStoppingException If node stopping intention was detected.
+     */
+    public void init(String name, Collection<String> metaStorageNodeNames) 
throws NodeStoppingException;
+
+    /**
+     * Initializes the cluster that the given node is present in.
+     *
+     * @param name name of the node that the initialization request will be 
sent to.
+     * @param metaStorageNodeNames names of nodes that will host the Meta 
Storage.
+     * @param cmgNodeNames names of nodes that will host the CMG.

Review Comment:
   Let's also add more details for what CMG and Meta Storage is. Meaning that 
Ignition is public API and corresponding javadocs should be good enough.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java:
##########
@@ -114,15 +121,17 @@ private CompletableFuture<Void> 
invokeMessage(Collection<ClusterNode> nodes, Net
                         .invoke(node, message, 10000)
                         .thenAccept(response -> {
                             if (response instanceof InitErrorMessage) {
-                                throw new InitException(String.format(
-                                        "Got error response from node \"%s\": 
%s", node.name(), ((InitErrorMessage) response).cause()
-                                ));
-                            }
-
-                            if (!(response instanceof InitCompleteMessage)) {
-                                throw new InitException(String.format(
-                                        "Unexpected response from node \"%s\": 
%s", node.name(), response.getClass()
-                                ));
+                                var errorResponse = (InitErrorMessage) 
response;
+
+                                throw new InternalInitException(
+                                        String.format("Got error response from 
node \"%s\": %s", node.name(), errorResponse.cause()),
+                                        errorResponse.shouldCancel()
+                                );
+                            } else if (!(response instanceof 
InitCompleteMessage)) {

Review Comment:
   What behavior do you expected if it's neither  InitCompleteMessage nor 
!(response instanceof InitCompleteMessage)?



##########
modules/api/src/main/java/org/apache/ignite/Ignition.java:
##########
@@ -102,4 +111,23 @@ public interface Ignition {
      * @throws IllegalArgumentException if null is specified instead of node 
name.
      */
     public void stop(String name);
+
+    /**
+     * Initializes the cluster that the given node is present in.

Review Comment:
   I'd rather add more details about what Initialized cluster is.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.NodeJoinCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.NodeLeaveCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ReadLogicalTopologyCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ReadStateCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.WriteStateCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link RaftGroupListener} implementation for the CMG.
+ */
+public class CmgRaftGroupListener implements RaftGroupListener {
+    private static final IgniteLogger log = 
IgniteLogger.forClass(CmgRaftGroupListener.class);
+
+    private final RaftStorageManager storage;
+
+    public CmgRaftGroupListener(RaftStorage storage) {
+        this.storage = new RaftStorageManager(storage);
+    }
+
+    @Override
+    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<ReadCommand> clo = iterator.next();
+
+            ReadCommand command = clo.command();
+
+            if (command instanceof ReadStateCommand) {
+                clo.result(storage.getClusterState());
+            } else if (command instanceof ReadLogicalTopologyCommand) {
+                clo.result(new 
LogicalTopologyResponse(storage.getLogicalTopology()));
+            }
+        }
+    }
+
+    @Override
+    public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<WriteCommand> clo = iterator.next();
+
+            WriteCommand command = clo.command();
+
+            if (command instanceof WriteStateCommand) {
+                storage.putClusterState(((WriteStateCommand) 
command).clusterState());
+            } else if (command instanceof NodeJoinCommand) {
+                // TODO: perform validation 
https://issues.apache.org/jira/browse/IGNITE-16717
+                //  this method must also remain idempotent, as it might be 
called multiple times on the same node
+                storage.putLogicalTopologyNode(((NodeJoinCommand) 
command).node());
+            } else if (command instanceof NodeLeaveCommand) {
+                storage.removeLogicalTopologyNode(((NodeLeaveCommand) 
command).node());
+            }
+
+            clo.result(null);
+        }
+    }
+
+    @Override
+    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+        storage.snapshot(path)
+                .whenComplete((unused, throwable) -> 
doneClo.accept(throwable));
+    }
+
+    @Override
+    public boolean onSnapshotLoad(Path path) {
+        try {
+            storage.restoreSnapshot(path);
+
+            return true;
+        } catch (IgniteInternalException e) {
+            log.error("Failed to restore snapshot at " + path, e);
+
+            return false;
+        }
+    }
+
+    @Override
+    public void onShutdown() {
+        // no-op

Review Comment:
   What about closing RaftStorage?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.NodeJoinCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.NodeLeaveCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ReadLogicalTopologyCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ReadStateCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.WriteStateCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link RaftGroupListener} implementation for the CMG.
+ */
+public class CmgRaftGroupListener implements RaftGroupListener {
+    private static final IgniteLogger log = 
IgniteLogger.forClass(CmgRaftGroupListener.class);
+
+    private final RaftStorageManager storage;
+
+    public CmgRaftGroupListener(RaftStorage storage) {
+        this.storage = new RaftStorageManager(storage);
+    }
+
+    @Override
+    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<ReadCommand> clo = iterator.next();
+
+            ReadCommand command = clo.command();
+
+            if (command instanceof ReadStateCommand) {
+                clo.result(storage.getClusterState());
+            } else if (command instanceof ReadLogicalTopologyCommand) {
+                clo.result(new 
LogicalTopologyResponse(storage.getLogicalTopology()));
+            }
+        }
+    }
+
+    @Override
+    public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+        while (iterator.hasNext()) {
+            CommandClosure<WriteCommand> clo = iterator.next();
+
+            WriteCommand command = clo.command();
+
+            if (command instanceof WriteStateCommand) {
+                storage.putClusterState(((WriteStateCommand) 
command).clusterState());
+            } else if (command instanceof NodeJoinCommand) {
+                // TODO: perform validation 
https://issues.apache.org/jira/browse/IGNITE-16717
+                //  this method must also remain idempotent, as it might be 
called multiple times on the same node
+                storage.putLogicalTopologyNode(((NodeJoinCommand) 
command).node());

Review Comment:
   As you've mentioned earlier, node should be added to the logical topology 
not after NodeJoinCommand but after NodeJoinFinishCommand.



##########
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapRaftStorage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link RaftStorage} in-memory implementation based on a {@link 
ConcurrentHashMap}.
+ */
+public class ConcurrentMapRaftStorage implements RaftStorage {

Review Comment:
   @TestOnly ?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/ClusterStateReceivedMessage.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.network.messages;
+
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Successful response for receiving a {@link ClusterStateMessage}.
+ */
+@Transferable(CmgMessageGroup.CLUSTER_STATE_RECEIVED)
+public interface ClusterStateReceivedMessage extends NetworkMessage {

Review Comment:
   Do we really need it? Isn't there any general OK response?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -18,103 +18,114 @@
 package org.apache.ignite.internal.cluster.management;
 
 import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.network.util.ClusterServiceUtils.resolveNodes;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import 
org.apache.ignite.internal.cluster.management.messages.CancelInitMessage;
-import 
org.apache.ignite.internal.cluster.management.messages.ClusterStateMessage;
-import org.apache.ignite.internal.cluster.management.messages.CmgInitMessage;
-import org.apache.ignite.internal.cluster.management.messages.CmgMessageGroup;
-import 
org.apache.ignite.internal.cluster.management.messages.CmgMessagesFactory;
+import 
org.apache.ignite.internal.cluster.management.network.CmgMessageHandlerFactory;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CancelInitMessage;
+import 
org.apache.ignite.internal.cluster.management.network.messages.ClusterStateMessage;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.raft.ClusterState;
+import org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
+import org.apache.ignite.internal.cluster.management.raft.CmgRaftService;
+import org.apache.ignite.internal.cluster.management.raft.RaftStorage;
 import org.apache.ignite.internal.cluster.management.rest.InitCommandHandler;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.rest.RestComponent;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite component responsible for cluster initialization and managing the 
Cluster Management Raft Group.
+ *
+ * <p>Refer to
+ * <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3";>IEP-77</a>
+ * for the description of the Cluster Management Group and its 
responsibilities.
  */
 public class ClusterManagementGroupManager implements IgniteComponent {
+    // TODO: timeout should be configurable, see 
https://issues.apache.org/jira/browse/IGNITE-16785
+    private static final int NETWORK_INVOKE_TIMEOUT = 500;
+
     private static final IgniteLogger log = 
IgniteLogger.forClass(ClusterManagementGroupManager.class);
 
     /** CMG Raft group name. */
     private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
 
     /** Init REST endpoint path. */
-    public static final String REST_ENDPOINT = "/management/v1/cluster/init";
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /** Prevents double stopping the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
+    /** Future that resolves into a CMG Raft service. Can be {@code null} if 
the Raft service has not been started. */
+    @Nullable
+    private CompletableFuture<CmgRaftService> raftService;
+
+    /** Lock for the {@code raftService} field. */
+    private final Object raftServiceLock = new Object();
+
+    /** Future that resolves into a list of node names that host the Meta 
Storage. */
+    private final CompletableFuture<Collection<String>> metaStorageNodes = new 
CompletableFuture<>();
+
+    /** Message factory. */
+    private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
+
+    /** Delayed executor. */
+    private final ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
+
     private final ClusterService clusterService;
 
     private final Loza raftManager;
 
-    private final RestComponent restModule;
+    private final RestComponent restComponent;
 
-    /** Handles cluster initialization flow. */
-    private final ClusterInitializer clusterInitializer;
+    private final RaftStorage raftStorage;

Review Comment:
   Naming: I'd rather use ClusterStateStorage both for class and variable. 



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java:
##########
@@ -95,6 +98,24 @@ public void test(TestInfo testInfo) throws Exception {
         }
     }
 
+    private Ignite startNode(TestInfo testInfo) throws NodeStoppingException {

Review Comment:
   Seems that such startThenInitAndSometimesAddToClusterNodeCollection method 
is widely spread. Let's add it to IgniteAbstractTest of similar, wdyt?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorage.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage for the CMG Raft service.
+ */
+public interface RaftStorage extends AutoCloseable {

Review Comment:
   As was mentioned above it's either ClusterStateStorage or general 
SimpleKeyValueStorage. In latter it should be moved to some common non-cmg 
module.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static java.util.stream.Collectors.toList;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A wrapper around a {@link RaftStorage} which provides convenient methods.
+ */
+class RaftStorageManager {

Review Comment:
   Do we really need it? Seems that all logic implemented here might be 
in-lined into appropriate components and storage itself - up to you though. In 
any case naming: ClusterStateStorageManager or similar.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
+
+        synchronized (raftServiceLock) {
+            raftService = recoverLocalState();
+        }
+
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
+
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
+
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
if the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+        log.info("Local CMG state recovered, starting the CMG");
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> {
+                            if (isLeader) {
+                                return service.readClusterState()
+                                        // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                        // TODO: properly handle this case, 
see https://issues.apache.org/jira/browse/IGNITE-16819
+                                        .thenCompose(state -> state == null ? 
completedFuture(null) : onLeaderElected(service, state));
+                            } else {
+                                return completedFuture(null);
+                            }
+                        })
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found (the Raft service future is {@code null}) 
- this means that the current node has never been initialized
+     *     before.</li>
+     *     <li>Local state found (the Raft service future has been started and 
therefore is not {@code null}),
+     *     but no CMG state present in the Raft storage - this means that the 
node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        synchronized (raftServiceLock) {
+            if (raftService == null) {
+                // Raft service has not been started
+                log.info("Init command received, starting the CMG: " + 
newState);
+
+                raftService = initCmgRaftService(newState);
+
+                resultHook = raftService;
+            } else {
+                // Raft service has been started, which means that this node 
has already received an init command at least once, but
+                // we still need to check that the initialization has 
completed successfully.
+                log.info("Init command received, but the CMG has already been 
started");
+
+                resultHook = raftService.thenCompose(service ->
+                        service.readClusterState()
+                                .thenCompose(state -> {
+                                    if (state == null) {
+                                        // Raft state is empty, perform 
re-initialization
+                                        log.info("CMG state is missing, 
completing initialization");
+
+                                        if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                            return 
service.isCurrentNodeLeader()
+                                                    
.thenCompose(isCurrentNodeLeader ->
+                                                            
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+                                        } else {
+                                            throw new 
IllegalInitArgumentException(String.format(
+                                                    "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                    service.nodeNames(), 
newState.cmgNodes()
+                                            ));
+                                        }
+                                    } else {
+                                        // Node is fully initialized, just 
check some invariants
+                                        log.info("Node has already been 
initialized");
+
+                                        if (state.equals(newState)) {
+                                            return completedFuture(null);
+                                        } else {
+                                            throw new 
IllegalInitArgumentException(String.format(
+                                                    "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                    state, newState
+                                            ));
+                                        }
+                                    }
+                                }));
+            }
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response;
+
+            if (e == null) {
+                response = msgFactory.initCompleteMessage().build();
+            } else {
+                response = msgFactory.initErrorMessage()
+                        .cause(e.getMessage())
+                        .shouldCancel(!(e instanceof 
IllegalInitArgumentException))
+                        .build();
+            }
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            scheduleRemoveFromLogicalTopology(service, node);
                         }
-                    } else {
-                        messagingService.respond(addr, 
errorResponse(msgFactory, e), correlationId);
                     }
                 });
     }
 
-    private void handleCancelInit(CancelInitMessage msg) throws 
NodeStoppingException {
+    private void handleCancelInit(CancelInitMessage msg) {
         log.info("CMG initialization cancelled, reason: " + msg.reason());
 
-        raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+        destroyCmg();
+    }
+
+    /**
+     * Completely destroys the local CMG Raft service.
+     */
+    private void destroyCmg() {
+        synchronized (raftServiceLock) {
+            try {
+                if (raftService != null) {
+                    raftService.cancel(true);
 
-        // TODO: drop the Raft storage as well, 
https://issues.apache.org/jira/browse/IGNITE-16471
+                    raftService = null;
+                }
+
+                raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+                if (raftStorage.isStarted()) {
+                    raftStorage.destroy();
+                }
+
+                localStateStorage.clear().get();
+            } catch (Exception e) {
+                throw new IgniteInternalException("Error when cleaning the CMG 
state", e);
+            }
+        }
     }
 
-    private void handleClusterState(ClusterStateMessage msg) {
-        metastorageNodes.complete(msg.metastorageNodes());
+    /**
+     * Handler for the {@link ClusterStateMessage}.
+     */
+    private void handleClusterState(ClusterStateMessage msg, NetworkAddress 
addr, long correlationId) {
+        clusterService.messagingService().respond(addr, 
msgFactory.clusterStateReceivedMessage().build(), correlationId);
+
+        var state = new ClusterState(msg.cmgNodes(), msg.metaStorageNodes());
+
+        synchronized (raftServiceLock) {
+            if (raftService == null) {
+                raftService = initCmgRaftService(state);
+            } else {
+                // Raft service might have been started on wrong CMG nodes, 
because CMG state can change while a node is offline. In this
+                // case we need to re-create the service.
+                raftService = raftService.thenCompose(service -> {
+                    if (service.nodeNames().equals(state.cmgNodes())) {
+                        return completedFuture(service);
+                    } else {
+                        if (log.isInfoEnabled()) {
+                            log.info("CMG has been started on {}, but the 
cluster state is different: {}. "
+                                    + "Re-creating the CMG Raft service", 
service.nodeNames(), state.cmgNodes());
+                        }
+
+                        destroyCmg();
+
+                        return initCmgRaftService(state);
+                    }
+                });
+            }
+
+            raftService
+                    .thenCompose(CmgRaftService::joinCluster)
+                    .thenRun(() -> 
metaStorageNodes.complete(state.metaStorageNodes()));
+        }
     }
 
-    private static NetworkMessage successResponse(CmgMessagesFactory 
msgFactory) {
-        log.info("CMG started successfully");
+    /**
+     * Starts the CMG Raft service using the provided node names as its peers.
+     */
+    private CompletableFuture<CmgRaftService> 
startCmgRaftService(Collection<String> nodeNames) {
+        // TODO: wait for nodes to appear, see 
https://issues.apache.org/jira/browse/IGNITE-16811
+        List<ClusterNode> nodes = resolveNodes(clusterService, nodeNames);
 
-        return msgFactory.initCompleteMessage().build();
+        try {
+            return raftManager
+                    .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
+                        raftStorage.start();
+
+                        return new CmgRaftGroupListener(raftStorage);
+                    })
+                    .thenApply(service -> new CmgRaftService(service, 
clusterService));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
     }
 
-    private void broadcastClusterState(Collection<String> metaStorageNodes) {
-        NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
-                .metastorageNodes(metaStorageNodes)
-                .build();
+    private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService 
raftService) {
+        return new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                raftService.readClusterState()
+                        .thenAccept(state -> {
+                            if (state != null) {
+                                sendClusterState(state, List.of(member));
+                            } else if (log.isWarnEnabled()) {
+                                log.warn("Cannot send the cluster state to a 
newly added node {} because cluster state is empty", member);
+                            }
+                        });
+            }
 
-        clusterService.topologyService()
-                .allMembers()
-                .forEach(node -> clusterService.messagingService().send(node, 
clusterStateMsg));
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                scheduleRemoveFromLogicalTopology(raftService, member);
+            }
+        };
     }
 
-    private static NetworkMessage errorResponse(CmgMessagesFactory msgFactory, 
Throwable e) {
-        log.error("Exception when starting the CMG", e);
+    private void scheduleRemoveFromLogicalTopology(CmgRaftService raftService, 
ClusterNode node) {
+        // TODO: delay should be configurable, see 
https://issues.apache.org/jira/browse/IGNITE-16785
+        scheduledExecutor.schedule(() -> {
+            ClusterNode physicalTopologyNode = 
clusterService.topologyService().getByConsistentId(node.name());
 
-        return msgFactory.initErrorMessage()
-                .cause(e.getMessage())
-                .build();
+            if (physicalTopologyNode == null || 
!physicalTopologyNode.id().equals(node.id())) {
+                raftService.removeFromCluster(node);
+            }
+        }, 0, TimeUnit.MILLISECONDS);
     }
 
-    private ClusterNode getLeader(RaftGroupService raftService) {
-        Peer leader = raftService.leader();
+    private void sendClusterState(ClusterState clusterState, 
Collection<ClusterNode> nodes) {
+        NetworkMessage msg = msgFactory.clusterStateMessage()

Review Comment:
   Cluster state message should also contain some information for validation, 
e.g.Ignite version and cluster tag. I believe that this was planned to be done 
under a separate ticket as part of a join epic. Could you please add TODO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to