rpuch commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r841795828


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for {@link ClusterManagementGroupManager}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItClusterManagerTest {
+    private static final int PORT_BASE = 10000;
+
+    private final List<MockNode> cluster = new ArrayList<>();
+
+    @WorkDirectory
+    private Path workDir;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws IOException {
+        var addr1 = new NetworkAddress("localhost", PORT_BASE);
+        var addr2 = new NetworkAddress("localhost", PORT_BASE + 1);
+
+        var nodeFinder = new StaticNodeFinder(List.of(addr1, addr2));
+
+        cluster.add(new MockNode(testInfo, addr1, nodeFinder, 
workDir.resolve("node0")));
+        cluster.add(new MockNode(testInfo, addr2, nodeFinder, 
workDir.resolve("node1")));
+
+        for (MockNode node : cluster) {
+            node.start();
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        for (MockNode node : cluster) {
+            node.beforeNodeStop();
+        }
+
+        for (MockNode node : cluster) {
+            node.stop();
+        }
+    }
+
+    /**
+     * Tests initial cluster setup.
+     */
+    @Test
+    void testInit() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name() };
+
+        String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+
+        initCluster(metaStorageNodes, cmgNodes);
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+        assertThat(cluster.get(1).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+
+        ClusterNode[] expectedTopology = currentPhysicalTopology();
+
+        assertThat(cluster.get(0).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+        assertThat(cluster.get(1).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+    }
+
+    /**
+     * Tests cancelling an init command due to a node failure.
+     */
+    @Test
+    void testInitCancel() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name(), 
cluster.get(1).localMember().name() };
+
+        // stop a CMG node to make the init fail
+
+        MockNode nodeToStop = cluster.remove(0);
+
+        nodeToStop.beforeNodeStop();
+        nodeToStop.stop();
+
+        assertThrows(InitException.class, () -> initCluster(cmgNodes, 
cmgNodes));

Review Comment:
   `initCluster()` accepts two lists of nodes: metastorage nodes and CMG nodes, 
so passing `cmgNodes` to both arguments seems weird. How about renaming 
`cmdNodes` local variable to `allNodes` (or `aliveAndDeadNodes`) and 
`correctCmgNodes` to `aliveNodes`?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.cluster.management.raft.RocksDbRaftStorage;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestComponent;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Fake node for integration tests.
+ */
+class MockNode {
+    private ClusterManagementGroupManager clusterManager;
+
+    private ClusterService clusterService;
+
+    private final TestInfo testInfo;
+
+    private final NodeFinder nodeFinder;
+
+    private final Path workDir;
+
+    private final List<IgniteComponent> components = new ArrayList<>();
+
+    MockNode(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder, 
Path workDir) throws IOException {
+        this.testInfo = testInfo;
+        this.nodeFinder = nodeFinder;
+        this.workDir = workDir;
+
+        init(addr.port());
+    }
+
+    private void init(int port) throws IOException {
+        Path vaultDir = workDir.resolve("vault");
+
+        var vaultManager = new VaultManager(new 
PersistentVaultService(Files.createDirectories(vaultDir)));
+
+        this.clusterService = clusterService(testInfo, port, nodeFinder);
+
+        Loza raftManager = new Loza(clusterService, workDir);
+
+        this.clusterManager = new ClusterManagementGroupManager(
+                vaultManager,
+                clusterService,
+                raftManager,
+                mock(RestComponent.class),
+                new RocksDbRaftStorage(workDir.resolve("cmg"))
+        );
+
+        components.add(vaultManager);
+        components.add(clusterService);
+        components.add(raftManager);
+        components.add(clusterManager);
+    }
+
+    void start() {
+        components.forEach(IgniteComponent::start);
+    }
+
+    void beforeNodeStop() {
+        Collections.reverse(components);

Review Comment:
   I suggest to create a local copy of the `components` list and reverse it and 
not the original list, otherwise a non-obvious method order call dependency is 
created.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->

Review Comment:
   Should `this.raftService` be updated as well here? (It is updated in the 
first `if` branch)



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for {@link ClusterManagementGroupManager}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItClusterManagerTest {
+    private static final int PORT_BASE = 10000;
+
+    private final List<MockNode> cluster = new ArrayList<>();
+
+    @WorkDirectory
+    private Path workDir;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws IOException {
+        var addr1 = new NetworkAddress("localhost", PORT_BASE);
+        var addr2 = new NetworkAddress("localhost", PORT_BASE + 1);
+
+        var nodeFinder = new StaticNodeFinder(List.of(addr1, addr2));
+
+        cluster.add(new MockNode(testInfo, addr1, nodeFinder, 
workDir.resolve("node0")));
+        cluster.add(new MockNode(testInfo, addr2, nodeFinder, 
workDir.resolve("node1")));
+
+        for (MockNode node : cluster) {
+            node.start();
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        for (MockNode node : cluster) {
+            node.beforeNodeStop();
+        }
+
+        for (MockNode node : cluster) {
+            node.stop();
+        }
+    }
+
+    /**
+     * Tests initial cluster setup.
+     */
+    @Test
+    void testInit() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name() };
+
+        String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+
+        initCluster(metaStorageNodes, cmgNodes);
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+        assertThat(cluster.get(1).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+
+        ClusterNode[] expectedTopology = currentPhysicalTopology();
+
+        assertThat(cluster.get(0).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+        assertThat(cluster.get(1).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+    }
+
+    /**
+     * Tests cancelling an init command due to a node failure.
+     */
+    @Test
+    void testInitCancel() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name(), 
cluster.get(1).localMember().name() };
+
+        // stop a CMG node to make the init fail
+
+        MockNode nodeToStop = cluster.remove(0);
+
+        nodeToStop.beforeNodeStop();
+        nodeToStop.stop();
+
+        assertThrows(InitException.class, () -> initCluster(cmgNodes, 
cmgNodes));
+
+        // complete initialization with one node to check that it finishes 
correctly
+
+        String[] correctCmgNodes = { cluster.get(0).localMember().name() };
+
+        initCluster(correctCmgNodes, correctCmgNodes);
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(correctCmgNodes)));
+
+        assertThat(cluster.get(0).clusterManager().logicalTopology(), 
will(containsInAnyOrder(currentPhysicalTopology())));
+    }
+
+    /**
+     * Tests a scenario when a node is restarted.
+     */
+    @Test
+    void testNodeRestart() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name() };
+
+        String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+
+        initCluster(metaStorageNodes, cmgNodes);
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+        assertThat(cluster.get(1).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+
+        cluster.get(0).restart();
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+
+        ClusterNode[] expectedTopology = currentPhysicalTopology();
+
+        waitForLogicalTopology();
+
+        assertThat(cluster.get(0).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+        assertThat(cluster.get(1).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+    }
+
+    /**
+     * Tests a scenario when a new node joins a cluster.
+     */
+    @Test
+    void testNodeJoin(TestInfo testInfo) throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name() };
+
+        initCluster(cmgNodes, cmgNodes);
+
+        // create and start a new node
+        var addr = new NetworkAddress("localhost", PORT_BASE + cluster.size());
+
+        var nodeFinder = new StaticNodeFinder(cluster.stream().map(node -> 
node.localMember().address()).collect(toList()));
+
+        var node = new MockNode(testInfo, addr, nodeFinder, 
workDir.resolve("node" + cluster.size()));
+
+        cluster.add(node);
+
+        node.start();
+
+        assertThat(node.clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(cmgNodes)));
+
+        assertThat(Arrays.asList(currentPhysicalTopology()), 
hasSize(cluster.size()));

Review Comment:
   `arrayWithSize()` can be used here instead of `hasSize()`, this would make 
wrapping array into a list unnecessary.



##########
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractRaftStorageTest.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base class for testing {@link RaftStorage} implementations.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class AbstractRaftStorageTest {
+    @WorkDirectory
+    protected Path workDir;
+
+    private RaftStorage storage;
+
+    abstract RaftStorage createStorage();
+
+    @BeforeEach
+    void setUp() {
+        storage = createStorage();
+
+        storage.start();
+
+        assertTrue(storage.isStarted());
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
+    }
+
+    /**
+     * Tests the {@link RaftStorage#get} and {@link RaftStorage#put} methods.
+     */
+    @Test
+    void testGet() {
+        byte[] key1 = "key1".getBytes(UTF_8);
+        byte[] key2 = "key2".getBytes(UTF_8);
+
+        assertThat(storage.get(key1), is(nullValue()));
+        assertThat(storage.get(key2), is(nullValue()));
+
+        byte[] value1 = "value1".getBytes(UTF_8);
+        byte[] value2 = "value2".getBytes(UTF_8);
+
+        storage.put(key1, value1);
+        storage.put(key2, value2);
+
+        assertThat(storage.get(key1), is(equalTo(value1)));
+        assertThat(storage.get(key2), is(equalTo(value2)));
+    }
+
+    /**
+     * Tests that {@link RaftStorage#put} replaces previous values.
+     */
+    @Test
+    void testPutReplace() {
+        byte[] key = "key".getBytes(UTF_8);
+
+        byte[] value1 = "value1".getBytes(UTF_8);
+
+        storage.put(key, value1);
+
+        assertThat(storage.get(key), is(value1));
+
+        byte[] value2 = "value2".getBytes(UTF_8);
+
+        storage.put(key, value2);
+
+        assertThat(storage.get(key), is(equalTo(value2)));
+    }
+
+    /**
+     * Tests the {@link RaftStorage#remove} method.
+     */
+    @Test
+    void testRemove() {
+        byte[] key1 = "key1".getBytes(UTF_8);
+        byte[] key2 = "key2".getBytes(UTF_8);
+
+        byte[] value1 = "value1".getBytes(UTF_8);
+        byte[] value2 = "value2".getBytes(UTF_8);
+
+        storage.put(key1, value1);
+        storage.put(key2, value2);
+
+        storage.remove(key1);
+
+        assertThat(storage.get(key1), is(nullValue()));
+        assertThat(storage.get(key2), is(equalTo(value2)));
+    }
+
+    /**
+     * Tests that {@link RaftStorage#remove} works correctly if a value is 
missing.
+     */
+    @Test
+    void testRemoveNonExistent() {
+        byte[] key = "key".getBytes(UTF_8);
+
+        storage.remove(key);
+
+        assertThat(storage.get(key), is(nullValue()));
+    }
+
+    /**
+     * Tests the {@link RaftStorage#getWithPrefix} method.
+     */
+    @Test
+    void testGetWithPrefix() throws Exception {
+        storage.put("key1".getBytes(UTF_8), "value1".getBytes(UTF_8));
+        storage.put("key2".getBytes(UTF_8), "value2".getBytes(UTF_8));
+        storage.put("foo".getBytes(UTF_8), "value3".getBytes(UTF_8));
+
+        Cursor<String> cursor = storage.getWithPrefix("ke".getBytes(UTF_8), 
(k, v) -> new String(v, UTF_8));
+
+        try (cursor) {
+            assertThat(cursor.stream().collect(toList()), 
containsInAnyOrder("value1", "value2"));
+        }
+    }
+
+    /**
+     * Tests the {@link RaftStorage#getWithPrefix} method (corner case, when 
keys are close together lexicographically).
+     */
+    @Test
+    void testGetWithPrefixBorder() throws Exception {
+        byte[] key1 = "key1".getBytes(UTF_8);
+        byte[] key2 = key1.clone();
+
+        key2[key2.length - 1] += 1;
+
+        storage.put(key1, "value1".getBytes(UTF_8));
+        storage.put(key2, "value2".getBytes(UTF_8));
+
+        Cursor<String> cursor = storage.getWithPrefix(key1, (k, v) -> new 
String(v, UTF_8));
+
+        try (cursor) {
+            assertThat(cursor.stream().collect(toList()), 
containsInAnyOrder("value1"));
+        }
+    }
+
+    /**
+     * Tests that {@link RaftStorage#getWithPrefix} method works correctly 
over empty ranges.
+     */
+    @Test
+    void testGetWithPrefixEmpty() throws Exception {
+        storage.put("key1".getBytes(UTF_8), "value1".getBytes(UTF_8));
+        storage.put("key2".getBytes(UTF_8), "value2".getBytes(UTF_8));
+
+        Cursor<String> cursor = storage.getWithPrefix("foo".getBytes(UTF_8), 
(k, v) -> new String(v, UTF_8));
+
+        try (cursor) {
+            assertThat(cursor.stream().collect(toList()), is(empty()));
+        }
+    }
+
+    /**
+     * Tests the {@link RaftStorage#destroy()} method.
+     */
+    @Test
+    void testClear() {

Review Comment:
   Let's rename the method to `testDestroy()` because it's `destroy()`, not 
`clear()`



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->
+                    service.readClusterState()
+                            .thenCompose(state -> {
+                                if (state == null) {
+                                    // Raft state is empty, perform 
re-initialization
+                                    log.info("CMG state is missing, completing 
initialization");
+
+                                    if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                        return service.isCurrentNodeLeader()
+                                                
.thenCompose(isCurrentNodeLeader ->
+                                                        isCurrentNodeLeader ? 
initCmgState(service, newState) : completedFuture(null));
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                service.nodeNames(), 
newState.cmgNodes()
+                                        ));
+                                    }
+                                } else {
+                                    // Node is fully initialized, just check 
some invariants
+                                    log.info("Node has already been 
initialized");
+
+                                    if (state.equals(newState)) {
+                                        return completedFuture(null);
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                state, newState
+                                        ));
+                                    }
+                                }
+                            }));
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response = e == null
+                    ? msgFactory.initCompleteMessage().build()
+                    : 
msgFactory.initErrorMessage().cause(e.getMessage()).build();
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            removeNodeFromLogicalTopology(service, node);
                         }
-                    } else {
-                        messagingService.respond(addr, 
errorResponse(msgFactory, e), correlationId);
                     }
                 });
     }
 
-    private void handleCancelInit(CancelInitMessage msg) throws 
NodeStoppingException {
+    private void handleCancelInit(CancelInitMessage msg) {
         log.info("CMG initialization cancelled, reason: " + msg.reason());
 
-        raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+        destroyCmg();
+    }
+
+    /**
+     * Completely destroys the local CMG Raft service.
+     */
+    private void destroyCmg() {
+        try {
+            CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+            if (raftService != null) {
+                raftService.cancel(true);
+            }
+
+            raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
 
-        // TODO: drop the Raft storage as well, 
https://issues.apache.org/jira/browse/IGNITE-16471
+            if (raftStorage.isStarted()) {
+                raftStorage.destroy();
+            }
+
+            localStateStorage.clear().get();
+
+            this.raftService = null;
+        } catch (Exception e) {
+            throw new IgniteInternalException("Error when cleaning the CMG 
state", e);
+        }
     }
 
-    private void handleClusterState(ClusterStateMessage msg) {
-        metastorageNodes.complete(msg.metastorageNodes());
+    /**
+     * Handler for the {@link ClusterStateMessage}.
+     */
+    private void handleClusterState(ClusterStateMessage msg, NetworkAddress 
addr, long correlationId) {
+        clusterService.messagingService().respond(addr, 
msgFactory.clusterStateReceivedMessage().build(), correlationId);
+
+        var state = new ClusterState(msg.cmgNodes(), msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        if (raftService == null) {
+            raftService = initCmgRaftService(state);
+        } else {
+            // Raft service might have been started on wrong CMG nodes, 
because CMG state can change while a node is offline. In this
+            // case we need to re-create the service.
+            raftService = raftService.thenCompose(service -> {
+                if (service.nodeNames().equals(state.cmgNodes())) {
+                    return completedFuture(service);
+                } else {
+                    if (log.isInfoEnabled()) {
+                        log.info("CMG has been started on {}, but the cluster 
state is different: {}. "
+                                + "Re-creating the CMG Raft service", 
service.nodeNames(), state.cmgNodes());
+                    }
+
+                    destroyCmg();
+
+                    return initCmgRaftService(state);
+                }
+            });
+        }
+
+        raftService
+                .thenCompose(CmgRaftService::joinCluster)
+                .thenRun(() -> 
metaStorageNodes.complete(state.metaStorageNodes()));
+
+        this.raftService = raftService;
     }
 
-    private static NetworkMessage successResponse(CmgMessagesFactory 
msgFactory) {
-        log.info("CMG started successfully");
+    /**
+     * Starts the CMG Raft service using the provided node names as its peers.
+     */
+    private CompletableFuture<CmgRaftService> 
startCmgRaftService(Collection<String> nodeNames) {
+        List<ClusterNode> nodes = resolveNodes(clusterService, nodeNames);
 
-        return msgFactory.initCompleteMessage().build();
+        try {
+            return raftManager
+                    .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
+                        raftStorage.start();
+
+                        return new CmgRaftGroupListener(raftStorage);
+                    })
+                    .thenApply(service -> new CmgRaftService(service, 
clusterService));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
     }
 
-    private void broadcastClusterState(Collection<String> metaStorageNodes) {
-        NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
-                .metastorageNodes(metaStorageNodes)
-                .build();
+    private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService 
raftService) {
+        return new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                raftService.readClusterState()
+                        .thenAccept(state -> {
+                            if (state != null) {
+                                sendClusterState(state, List.of(member));
+                            } else if (log.isWarnEnabled()) {
+                                log.warn("Cannot send the cluster state to a 
newly added node {} because cluster state is empty", member);
+                            }
+                        });
+            }
 
-        clusterService.topologyService()
-                .allMembers()
-                .forEach(node -> clusterService.messagingService().send(node, 
clusterStateMsg));
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                removeNodeFromLogicalTopology(raftService, member);
+            }
+        };
     }
 
-    private static NetworkMessage errorResponse(CmgMessagesFactory msgFactory, 
Throwable e) {
-        log.error("Exception when starting the CMG", e);
+    private void removeNodeFromLogicalTopology(CmgRaftService raftService, 
ClusterNode node) {
+        // TODO: delay should be configurable, see 
https://issues.apache.org/jira/browse/IGNITE-16785
+        scheduledExecutor.schedule(() -> {
+            ClusterNode physicalTopologyNode = 
clusterService.topologyService().getByConsistentId(node.name());
 
-        return msgFactory.initErrorMessage()
-                .cause(e.getMessage())
-                .build();
+            if (physicalTopologyNode == null || 
!physicalTopologyNode.id().equals(node.id())) {
+                raftService.removeFromCluster(node);
+            }
+        }, 0, TimeUnit.MILLISECONDS);
     }
 
-    private ClusterNode getLeader(RaftGroupService raftService) {
-        Peer leader = raftService.leader();
+    private void sendClusterState(ClusterState clusterState, 
Collection<ClusterNode> nodes) {
+        NetworkMessage msg = msgFactory.clusterStateMessage()
+                .cmgNodes(clusterState.cmgNodes())
+                .metaStorageNodes(clusterState.metaStorageNodes())
+                .build();
 
-        assert leader != null;
+        for (ClusterNode node : nodes) {
+            sendWithRetry(node, msg);
+        }
+    }
 
-        ClusterNode leaderNode = 
clusterService.topologyService().getByAddress(leader.address());
+    private CompletableFuture<NetworkMessage> sendWithRetry(ClusterNode node, 
NetworkMessage msg) {
+        var result = new CompletableFuture<NetworkMessage>();
 
-        assert leaderNode != null;
+        sendWithRetry(node, msg, result, 5);

Review Comment:
   Should we make the retry count configurable?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbRaftStorage.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * {@link RaftStorage} implementation based on RocksDB.
+ */
+public class RocksDbRaftStorage implements RaftStorage {
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor = 
Executors.newFixedThreadPool(2);
+
+    /** Path to the rocksdb database. */
+    private final Path dbPath;
+
+    /** RockDB options. */
+    private volatile Options options;
+
+    /** RocksDb instance. */
+    private volatile RocksDB db;
+
+    private volatile RocksSnapshotManager snapshotManager;
+
+    private final Object snapshotRestoreLock = new Object();
+
+    public RocksDbRaftStorage(Path dbPath) {
+        this.dbPath = dbPath;
+    }
+
+    @Override
+    public void start() {
+        options = new Options().setCreateIfMissing(true);
+
+        try {
+            db = RocksDB.open(options, dbPath.toString());
+
+            ColumnFamily defaultCf = ColumnFamily.wrap(db, 
db.getDefaultColumnFamily());
+
+            snapshotManager = new RocksSnapshotManager(db, 
List.of(fullRange(defaultCf)), snapshotExecutor);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Failed to start the storage", 
e);
+        }
+    }
+
+    @Override
+    public boolean isStarted() {
+        return db != null;
+    }
+
+    @Override
+    public byte @Nullable [] get(byte[] key) {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to get data from Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) {
+        try {
+            db.put(key, value);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to put data into Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void remove(byte[] key) {
+        try {
+            db.delete(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to remove data from 
Rocks DB", e);
+        }
+    }
+
+    @Override
+    public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], 
byte[], T> entryTransformer) {
+        byte[] upperBound = prefix.clone();
+
+        upperBound[upperBound.length - 1] += 1;
+
+        Slice upperBoundSlice = new Slice(upperBound);
+
+        ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBoundSlice);
+
+        RocksIterator it = db.newIterator(options);
+
+        it.seek(prefix);
+
+        return new RocksIteratorAdapter<>(it) {
+            @Override
+            protected T decodeEntry(byte[] key, byte[] value) {
+                return entryTransformer.apply(key, value);
+            }
+
+            @Override
+            public void close() throws Exception {
+                super.close();
+
+                IgniteUtils.closeAll(options, upperBoundSlice);

Review Comment:
   If an exception happens before the iteration starts, but after options and 
slice are created, will some resources leak?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->
+                    service.readClusterState()
+                            .thenCompose(state -> {
+                                if (state == null) {
+                                    // Raft state is empty, perform 
re-initialization
+                                    log.info("CMG state is missing, completing 
initialization");
+
+                                    if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                        return service.isCurrentNodeLeader()
+                                                
.thenCompose(isCurrentNodeLeader ->
+                                                        isCurrentNodeLeader ? 
initCmgState(service, newState) : completedFuture(null));
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                service.nodeNames(), 
newState.cmgNodes()
+                                        ));
+                                    }
+                                } else {
+                                    // Node is fully initialized, just check 
some invariants
+                                    log.info("Node has already been 
initialized");
+
+                                    if (state.equals(newState)) {
+                                        return completedFuture(null);
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                state, newState
+                                        ));
+                                    }
+                                }
+                            }));
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response = e == null
+                    ? msgFactory.initCompleteMessage().build()
+                    : 
msgFactory.initErrorMessage().cause(e.getMessage()).build();
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            removeNodeFromLogicalTopology(service, node);
                         }
-                    } else {
-                        messagingService.respond(addr, 
errorResponse(msgFactory, e), correlationId);
                     }
                 });
     }
 
-    private void handleCancelInit(CancelInitMessage msg) throws 
NodeStoppingException {
+    private void handleCancelInit(CancelInitMessage msg) {
         log.info("CMG initialization cancelled, reason: " + msg.reason());
 
-        raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+        destroyCmg();
+    }
+
+    /**
+     * Completely destroys the local CMG Raft service.
+     */
+    private void destroyCmg() {
+        try {
+            CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+            if (raftService != null) {
+                raftService.cancel(true);
+            }
+
+            raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
 
-        // TODO: drop the Raft storage as well, 
https://issues.apache.org/jira/browse/IGNITE-16471
+            if (raftStorage.isStarted()) {
+                raftStorage.destroy();
+            }
+
+            localStateStorage.clear().get();
+
+            this.raftService = null;
+        } catch (Exception e) {
+            throw new IgniteInternalException("Error when cleaning the CMG 
state", e);
+        }
     }
 
-    private void handleClusterState(ClusterStateMessage msg) {
-        metastorageNodes.complete(msg.metastorageNodes());
+    /**
+     * Handler for the {@link ClusterStateMessage}.
+     */
+    private void handleClusterState(ClusterStateMessage msg, NetworkAddress 
addr, long correlationId) {
+        clusterService.messagingService().respond(addr, 
msgFactory.clusterStateReceivedMessage().build(), correlationId);
+
+        var state = new ClusterState(msg.cmgNodes(), msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        if (raftService == null) {
+            raftService = initCmgRaftService(state);
+        } else {
+            // Raft service might have been started on wrong CMG nodes, 
because CMG state can change while a node is offline. In this
+            // case we need to re-create the service.
+            raftService = raftService.thenCompose(service -> {
+                if (service.nodeNames().equals(state.cmgNodes())) {
+                    return completedFuture(service);
+                } else {
+                    if (log.isInfoEnabled()) {
+                        log.info("CMG has been started on {}, but the cluster 
state is different: {}. "
+                                + "Re-creating the CMG Raft service", 
service.nodeNames(), state.cmgNodes());
+                    }
+
+                    destroyCmg();
+
+                    return initCmgRaftService(state);
+                }
+            });
+        }
+
+        raftService
+                .thenCompose(CmgRaftService::joinCluster)
+                .thenRun(() -> 
metaStorageNodes.complete(state.metaStorageNodes()));
+
+        this.raftService = raftService;
     }
 
-    private static NetworkMessage successResponse(CmgMessagesFactory 
msgFactory) {
-        log.info("CMG started successfully");
+    /**
+     * Starts the CMG Raft service using the provided node names as its peers.
+     */
+    private CompletableFuture<CmgRaftService> 
startCmgRaftService(Collection<String> nodeNames) {
+        List<ClusterNode> nodes = resolveNodes(clusterService, nodeNames);
 
-        return msgFactory.initCompleteMessage().build();
+        try {
+            return raftManager
+                    .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
+                        raftStorage.start();
+
+                        return new CmgRaftGroupListener(raftStorage);
+                    })
+                    .thenApply(service -> new CmgRaftService(service, 
clusterService));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
     }
 
-    private void broadcastClusterState(Collection<String> metaStorageNodes) {
-        NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
-                .metastorageNodes(metaStorageNodes)
-                .build();
+    private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService 
raftService) {
+        return new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                raftService.readClusterState()
+                        .thenAccept(state -> {
+                            if (state != null) {
+                                sendClusterState(state, List.of(member));
+                            } else if (log.isWarnEnabled()) {
+                                log.warn("Cannot send the cluster state to a 
newly added node {} because cluster state is empty", member);
+                            }
+                        });
+            }
 
-        clusterService.topologyService()
-                .allMembers()
-                .forEach(node -> clusterService.messagingService().send(node, 
clusterStateMsg));
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                removeNodeFromLogicalTopology(raftService, member);
+            }
+        };
     }
 
-    private static NetworkMessage errorResponse(CmgMessagesFactory msgFactory, 
Throwable e) {
-        log.error("Exception when starting the CMG", e);
+    private void removeNodeFromLogicalTopology(CmgRaftService raftService, 
ClusterNode node) {
+        // TODO: delay should be configurable, see 
https://issues.apache.org/jira/browse/IGNITE-16785
+        scheduledExecutor.schedule(() -> {
+            ClusterNode physicalTopologyNode = 
clusterService.topologyService().getByConsistentId(node.name());
 
-        return msgFactory.initErrorMessage()
-                .cause(e.getMessage())
-                .build();
+            if (physicalTopologyNode == null || 
!physicalTopologyNode.id().equals(node.id())) {
+                raftService.removeFromCluster(node);
+            }
+        }, 0, TimeUnit.MILLISECONDS);
     }
 
-    private ClusterNode getLeader(RaftGroupService raftService) {
-        Peer leader = raftService.leader();
+    private void sendClusterState(ClusterState clusterState, 
Collection<ClusterNode> nodes) {
+        NetworkMessage msg = msgFactory.clusterStateMessage()
+                .cmgNodes(clusterState.cmgNodes())
+                .metaStorageNodes(clusterState.metaStorageNodes())
+                .build();
 
-        assert leader != null;
+        for (ClusterNode node : nodes) {
+            sendWithRetry(node, msg);
+        }
+    }
 
-        ClusterNode leaderNode = 
clusterService.topologyService().getByAddress(leader.address());
+    private CompletableFuture<NetworkMessage> sendWithRetry(ClusterNode node, 
NetworkMessage msg) {
+        var result = new CompletableFuture<NetworkMessage>();
 
-        assert leaderNode != null;
+        sendWithRetry(node, msg, result, 5);
 
-        return leaderNode;
+        return result;
     }
 
-    @Override
-    public void start() {
-        restModule.registerHandlers(routes ->
-                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
-        );
+    private void sendWithRetry(ClusterNode node, NetworkMessage msg, 
CompletableFuture<NetworkMessage> result, int attempts) {
+        clusterService.messagingService().invoke(node, msg, 1000)

Review Comment:
   This time it's 1000. Why? :) Let's make it configurable or at least extract 
it to a constant.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));

Review Comment:
   If I am the leader, but there is no state - what does this mean? Is this 
even legal? If yes, how will the cluster proceed?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->
+                    service.readClusterState()
+                            .thenCompose(state -> {
+                                if (state == null) {
+                                    // Raft state is empty, perform 
re-initialization
+                                    log.info("CMG state is missing, completing 
initialization");
+
+                                    if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                        return service.isCurrentNodeLeader()
+                                                
.thenCompose(isCurrentNodeLeader ->
+                                                        isCurrentNodeLeader ? 
initCmgState(service, newState) : completedFuture(null));
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                service.nodeNames(), 
newState.cmgNodes()
+                                        ));
+                                    }
+                                } else {
+                                    // Node is fully initialized, just check 
some invariants
+                                    log.info("Node has already been 
initialized");
+
+                                    if (state.equals(newState)) {
+                                        return completedFuture(null);
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                state, newState
+                                        ));
+                                    }
+                                }
+                            }));
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response = e == null
+                    ? msgFactory.initCompleteMessage().build()
+                    : 
msgFactory.initErrorMessage().cause(e.getMessage()).build();
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            removeNodeFromLogicalTopology(service, node);

Review Comment:
   I suggest renaming the method to `scheduleNodeRemovalFromLogicalTopology()` 
because we just initiate it and then forget about it.
   
   Also, why should it be done in an async way, using a scheduled task?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>

Review Comment:
   The method does not seem to do what the javadoc describes. For instance, the 
method does not seem to read local state, but it does write it.
   
   It looks like the javadoc describes the init process as a whole, but the 
method only implements a part of it. Should the comment be moved elsewhere?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbRaftStorage.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * {@link RaftStorage} implementation based on RocksDB.
+ */
+public class RocksDbRaftStorage implements RaftStorage {
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor = 
Executors.newFixedThreadPool(2);
+
+    /** Path to the rocksdb database. */
+    private final Path dbPath;
+
+    /** RockDB options. */
+    private volatile Options options;
+
+    /** RocksDb instance. */
+    private volatile RocksDB db;
+
+    private volatile RocksSnapshotManager snapshotManager;
+
+    private final Object snapshotRestoreLock = new Object();
+
+    public RocksDbRaftStorage(Path dbPath) {
+        this.dbPath = dbPath;
+    }
+
+    @Override
+    public void start() {
+        options = new Options().setCreateIfMissing(true);
+
+        try {
+            db = RocksDB.open(options, dbPath.toString());
+
+            ColumnFamily defaultCf = ColumnFamily.wrap(db, 
db.getDefaultColumnFamily());
+
+            snapshotManager = new RocksSnapshotManager(db, 
List.of(fullRange(defaultCf)), snapshotExecutor);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Failed to start the storage", 
e);
+        }
+    }
+
+    @Override
+    public boolean isStarted() {
+        return db != null;
+    }
+
+    @Override
+    public byte @Nullable [] get(byte[] key) {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to get data from Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) {
+        try {
+            db.put(key, value);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to put data into Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void remove(byte[] key) {
+        try {
+            db.delete(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to remove data from 
Rocks DB", e);
+        }
+    }
+
+    @Override
+    public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], 
byte[], T> entryTransformer) {
+        byte[] upperBound = prefix.clone();
+
+        upperBound[upperBound.length - 1] += 1;
+
+        Slice upperBoundSlice = new Slice(upperBound);
+
+        ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBoundSlice);

Review Comment:
   I suggest renaming the variable to avoid collision with the field called 
`options`



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->
+                    service.readClusterState()
+                            .thenCompose(state -> {
+                                if (state == null) {
+                                    // Raft state is empty, perform 
re-initialization
+                                    log.info("CMG state is missing, completing 
initialization");
+
+                                    if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                        return service.isCurrentNodeLeader()
+                                                
.thenCompose(isCurrentNodeLeader ->
+                                                        isCurrentNodeLeader ? 
initCmgState(service, newState) : completedFuture(null));
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(

Review Comment:
   If I understand this correctly, this means that someone tried to init the 
cluster, the attempt failed halfway, and now we are trying to init it again, 
but with different CMG set. We just fail again here. Why don't we destroy CMG 
group and retry the init (as it's done when handling a cluster state message)?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for {@link ClusterManagementGroupManager}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItClusterManagerTest {
+    private static final int PORT_BASE = 10000;
+
+    private final List<MockNode> cluster = new ArrayList<>();
+
+    @WorkDirectory
+    private Path workDir;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws IOException {
+        var addr1 = new NetworkAddress("localhost", PORT_BASE);
+        var addr2 = new NetworkAddress("localhost", PORT_BASE + 1);
+
+        var nodeFinder = new StaticNodeFinder(List.of(addr1, addr2));
+
+        cluster.add(new MockNode(testInfo, addr1, nodeFinder, 
workDir.resolve("node0")));
+        cluster.add(new MockNode(testInfo, addr2, nodeFinder, 
workDir.resolve("node1")));
+
+        for (MockNode node : cluster) {
+            node.start();
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        for (MockNode node : cluster) {
+            node.beforeNodeStop();
+        }
+
+        for (MockNode node : cluster) {
+            node.stop();
+        }
+    }
+
+    /**
+     * Tests initial cluster setup.
+     */
+    @Test
+    void testInit() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name() };
+
+        String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+
+        initCluster(metaStorageNodes, cmgNodes);
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+        assertThat(cluster.get(1).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+
+        ClusterNode[] expectedTopology = currentPhysicalTopology();
+
+        assertThat(cluster.get(0).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+        assertThat(cluster.get(1).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+    }
+
+    /**
+     * Tests cancelling an init command due to a node failure.
+     */
+    @Test
+    void testInitCancel() throws Exception {

Review Comment:
   This method seems to actually test two scenarios:
   
   1. That init on a cluster containing a stopped node fails
   2. That init on a cluster containing only alive nodes passes ok.
   
   The second scenario is already tested by `testInit()` (the only difference 
seems to be that here the physical topology contains 1 node, and in 
`testInit()` it contains 2), so the scenario seems to be unnecessary here.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.NodeJoinCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.NodeLeaveCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ReadLogicalTopologyCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ReadStateCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.WriteStateCommand;
+import 
org.apache.ignite.internal.cluster.management.raft.responses.JoinDeniedResponse;
+import 
org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * A wrapper around a {@link RaftGroupService} providing helpful methods for 
working with the CMG.
+ */
+public class CmgRaftService {
+    private final RaftGroupService raftService;
+
+    private final ClusterService clusterService;
+
+    public CmgRaftService(RaftGroupService raftService, ClusterService 
clusterService) {
+        this.raftService = raftService;
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Returns {@code true} if the current node is the CMG leader.
+     *
+     * @return {@code true} if the current node is the CMG leader.
+     */
+    // TODO: replace with onLeaderElected callback after 
https://issues.apache.org/jira/browse/IGNITE-16379 is implemented
+    public CompletableFuture<Boolean> isCurrentNodeLeader() {
+        ClusterNode thisNode = clusterService.topologyService().localMember();
+
+        return leader().thenApply(thisNode::equals);
+    }
+
+    private CompletableFuture<ClusterNode> leader() {
+        return raftService.refreshLeader().thenApply(v -> 
resolvePeer(raftService.leader()));
+    }
+
+    /**
+     * Retrieves the cluster state.
+     *
+     * @return Future that resolves into the current cluster state or {@code 
null} if it does not exist.
+     */
+    public CompletableFuture<ClusterState> readClusterState() {
+        return raftService.run(new ReadStateCommand())
+                .thenApply(ClusterState.class::cast);
+    }
+
+    /**
+     * Saves the given {@link ClusterState}.
+     *
+     * @param clusterState Cluster state.
+     * @return Future that represents the state of the operation.
+     */
+    public CompletableFuture<Void> writeClusterState(ClusterState 
clusterState) {
+        return raftService.run(new WriteStateCommand(clusterState));
+    }
+
+    /**
+     * Sends a {@link NodeJoinCommand} thus adding the current node to the 
local topology.
+     *
+     * @return Future that represents the state of the operation.
+     */
+    public CompletableFuture<Void> joinCluster() {
+        ClusterNode localMember = 
clusterService.topologyService().localMember();
+
+        return raftService.run(new NodeJoinCommand(localMember))
+                .thenAccept(response -> {
+                    if (response instanceof JoinDeniedResponse) {

Review Comment:
   I did not find `JoinDeniedResponse` instantiation anywhere. Is it ok?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.

Review Comment:
   It looks like 'if' is missed before 'the local state is empty'



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbRaftStorage.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * {@link RaftStorage} implementation based on RocksDB.
+ */
+public class RocksDbRaftStorage implements RaftStorage {
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor = 
Executors.newFixedThreadPool(2);
+
+    /** Path to the rocksdb database. */
+    private final Path dbPath;
+
+    /** RockDB options. */
+    private volatile Options options;
+
+    /** RocksDb instance. */
+    private volatile RocksDB db;
+
+    private volatile RocksSnapshotManager snapshotManager;
+
+    private final Object snapshotRestoreLock = new Object();
+
+    public RocksDbRaftStorage(Path dbPath) {
+        this.dbPath = dbPath;
+    }
+
+    @Override
+    public void start() {
+        options = new Options().setCreateIfMissing(true);
+
+        try {
+            db = RocksDB.open(options, dbPath.toString());
+
+            ColumnFamily defaultCf = ColumnFamily.wrap(db, 
db.getDefaultColumnFamily());
+
+            snapshotManager = new RocksSnapshotManager(db, 
List.of(fullRange(defaultCf)), snapshotExecutor);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Failed to start the storage", 
e);
+        }
+    }
+
+    @Override
+    public boolean isStarted() {
+        return db != null;
+    }
+
+    @Override
+    public byte @Nullable [] get(byte[] key) {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to get data from Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) {
+        try {
+            db.put(key, value);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to put data into Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void remove(byte[] key) {
+        try {
+            db.delete(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to remove data from 
Rocks DB", e);
+        }
+    }
+
+    @Override
+    public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], 
byte[], T> entryTransformer) {
+        byte[] upperBound = prefix.clone();
+
+        upperBound[upperBound.length - 1] += 1;

Review Comment:
   What if an overflow occur?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->
+                    service.readClusterState()
+                            .thenCompose(state -> {
+                                if (state == null) {
+                                    // Raft state is empty, perform 
re-initialization
+                                    log.info("CMG state is missing, completing 
initialization");
+
+                                    if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                        return service.isCurrentNodeLeader()
+                                                
.thenCompose(isCurrentNodeLeader ->
+                                                        isCurrentNodeLeader ? 
initCmgState(service, newState) : completedFuture(null));
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                service.nodeNames(), 
newState.cmgNodes()
+                                        ));
+                                    }
+                                } else {
+                                    // Node is fully initialized, just check 
some invariants
+                                    log.info("Node has already been 
initialized");
+
+                                    if (state.equals(newState)) {
+                                        return completedFuture(null);
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                state, newState
+                                        ));
+                                    }
+                                }
+                            }));
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response = e == null
+                    ? msgFactory.initCompleteMessage().build()
+                    : 
msgFactory.initErrorMessage().cause(e.getMessage()).build();
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            removeNodeFromLogicalTopology(service, node);
                         }
-                    } else {
-                        messagingService.respond(addr, 
errorResponse(msgFactory, e), correlationId);
                     }
                 });
     }
 
-    private void handleCancelInit(CancelInitMessage msg) throws 
NodeStoppingException {
+    private void handleCancelInit(CancelInitMessage msg) {
         log.info("CMG initialization cancelled, reason: " + msg.reason());
 
-        raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+        destroyCmg();
+    }
+
+    /**
+     * Completely destroys the local CMG Raft service.
+     */
+    private void destroyCmg() {
+        try {
+            CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+            if (raftService != null) {
+                raftService.cancel(true);
+            }
+
+            raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
 
-        // TODO: drop the Raft storage as well, 
https://issues.apache.org/jira/browse/IGNITE-16471
+            if (raftStorage.isStarted()) {
+                raftStorage.destroy();
+            }
+
+            localStateStorage.clear().get();
+
+            this.raftService = null;
+        } catch (Exception e) {
+            throw new IgniteInternalException("Error when cleaning the CMG 
state", e);
+        }
     }
 
-    private void handleClusterState(ClusterStateMessage msg) {
-        metastorageNodes.complete(msg.metastorageNodes());
+    /**
+     * Handler for the {@link ClusterStateMessage}.
+     */
+    private void handleClusterState(ClusterStateMessage msg, NetworkAddress 
addr, long correlationId) {
+        clusterService.messagingService().respond(addr, 
msgFactory.clusterStateReceivedMessage().build(), correlationId);
+
+        var state = new ClusterState(msg.cmgNodes(), msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        if (raftService == null) {
+            raftService = initCmgRaftService(state);
+        } else {
+            // Raft service might have been started on wrong CMG nodes, 
because CMG state can change while a node is offline. In this
+            // case we need to re-create the service.
+            raftService = raftService.thenCompose(service -> {
+                if (service.nodeNames().equals(state.cmgNodes())) {
+                    return completedFuture(service);
+                } else {
+                    if (log.isInfoEnabled()) {
+                        log.info("CMG has been started on {}, but the cluster 
state is different: {}. "
+                                + "Re-creating the CMG Raft service", 
service.nodeNames(), state.cmgNodes());
+                    }
+
+                    destroyCmg();
+
+                    return initCmgRaftService(state);
+                }
+            });
+        }
+
+        raftService
+                .thenCompose(CmgRaftService::joinCluster)
+                .thenRun(() -> 
metaStorageNodes.complete(state.metaStorageNodes()));
+
+        this.raftService = raftService;
     }
 
-    private static NetworkMessage successResponse(CmgMessagesFactory 
msgFactory) {
-        log.info("CMG started successfully");
+    /**
+     * Starts the CMG Raft service using the provided node names as its peers.
+     */
+    private CompletableFuture<CmgRaftService> 
startCmgRaftService(Collection<String> nodeNames) {
+        List<ClusterNode> nodes = resolveNodes(clusterService, nodeNames);
 
-        return msgFactory.initCompleteMessage().build();
+        try {
+            return raftManager
+                    .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
+                        raftStorage.start();
+
+                        return new CmgRaftGroupListener(raftStorage);
+                    })
+                    .thenApply(service -> new CmgRaftService(service, 
clusterService));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
     }
 
-    private void broadcastClusterState(Collection<String> metaStorageNodes) {
-        NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
-                .metastorageNodes(metaStorageNodes)
-                .build();
+    private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService 
raftService) {
+        return new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                raftService.readClusterState()
+                        .thenAccept(state -> {
+                            if (state != null) {
+                                sendClusterState(state, List.of(member));
+                            } else if (log.isWarnEnabled()) {
+                                log.warn("Cannot send the cluster state to a 
newly added node {} because cluster state is empty", member);

Review Comment:
   Isn't this normal for a non-initialized cluster? Should we consider it a 
warning?



##########
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractRaftStorageTest.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base class for testing {@link RaftStorage} implementations.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class AbstractRaftStorageTest {
+    @WorkDirectory
+    protected Path workDir;
+
+    private RaftStorage storage;
+
+    abstract RaftStorage createStorage();
+
+    @BeforeEach
+    void setUp() {
+        storage = createStorage();
+
+        storage.start();
+
+        assertTrue(storage.isStarted());
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
+    }
+
+    /**
+     * Tests the {@link RaftStorage#get} and {@link RaftStorage#put} methods.
+     */
+    @Test
+    void testGet() {

Review Comment:
   The javadoc says its about get and put, but the method name says it's about 
get. I suggest renaming the method to make it match the javadoc (and the method 
body).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to