sashapolo commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r845222519


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>

Review Comment:
   This is an artifact from a previous implementation, I will rephrase the 
javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to