sashapolo commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r844064490


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +138,377 @@ public void initCluster(Collection<String> 
metaStorageNodeNames, Collection<Stri
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
-            throw new IgniteInternalException("Interrupted while initializing 
the cluster", e);
+            throw new InitException("Interrupted while initializing the 
cluster", e);
         } catch (ExecutionException e) {
-            throw new IgniteInternalException("Unable to initialize the 
cluster", e.getCause());
+            throw new InitException("Unable to initialize the cluster", 
e.getCause());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) throws NodeStoppingException {
-        List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+    @Override
+    public void start() {
+        var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock, 
msgFactory, clusterService);
+
+        // register the ClusterState handler first, because local state 
recovery might send such messages
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof ClusterStateMessage) {
+                        assert correlationId != null;
 
-        raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, 
CmgRaftGroupListener::new)
-                .whenComplete((service, e) -> {
-                    MessagingService messagingService = 
clusterService.messagingService();
+                        handleClusterState((ClusterStateMessage) message, 
senderAddr, correlationId);
+                    }
+                })
+        );
 
-                    if (e == null) {
-                        ClusterNode leader = getLeader(service);
+        raftService = recoverLocalState();
 
-                        ClusterNode thisNode = 
clusterService.topologyService().localMember();
+        // register the Init handler second in order to handle the command 
differently, depending on the local state
+        clusterService.messagingService().addMessageHandler(
+                CmgMessageGroup.class,
+                messageHandlerFactory.wrapHandler((message, senderAddr, 
correlationId) -> {
+                    if (message instanceof CancelInitMessage) {
+                        handleCancelInit((CancelInitMessage) message);
+                    } else if (message instanceof CmgInitMessage) {
+                        assert correlationId != null;
 
-                        messagingService.respond(addr, 
successResponse(msgFactory), correlationId);
+                        handleInit((CmgInitMessage) message, senderAddr, 
correlationId);
+                    }
+                })
+        );
 
-                        if (leader.equals(thisNode)) {
-                            broadcastClusterState(msg.metaStorageNodes());
+        restComponent.registerHandlers(routes ->
+                routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new 
InitCommandHandler(clusterInitializer))
+        );
+    }
+
+    /**
+     * Extracts the local state (if any) and starts the CMG.
+     *
+     * @return Future that resolves into the CMG Raft service or {@code null} 
the local state is empty.
+     */
+    @Nullable
+    private CompletableFuture<CmgRaftService> recoverLocalState() {
+        Collection<String> cmgNodes;
+
+        try {
+            cmgNodes = localStateStorage.cmgNodeNames().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while retrieving 
local CMG state", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Error while retrieving local 
CMG state", e);
+        }
+
+        if (cmgNodes.isEmpty()) {
+            return null;
+        }
+
+        return startCmgRaftService(cmgNodes)
+                .thenCompose(service -> {
+                    log.info("Local CMG state recovered");
+
+                    return service.isCurrentNodeLeader()
+                            .thenCompose(isLeader -> {
+                                if (isLeader) {
+                                    return service.readClusterState()
+                                            // Raft state might not have been 
initialized in case of leader failure during cluster init
+                                            .thenCompose(state -> state == 
null ? completedFuture(null) : onLeaderElected(service, state));
+                                } else {
+                                    return completedFuture(null);
+                                }
+                            })
+                            .thenApply(v -> service);
+                });
+    }
+
+    /**
+     * Handles the Init command.
+     *
+     * <p>This method needs to take the following possibilities into account, 
depending on the local state and the Raft state:
+     * <ol>
+     *     <li>No local state found - this means that the current node has 
never been initialized before.</li>
+     *     <li>Local state found, but no CMG state present in the Raft storage 
- this means that the node has failed somewhere during
+     *     the init process. In this case we need to check the consistency of 
the local state and the received message and complete
+     *     the init process.</li>
+     *     <li>Local state found and CMG state is present in the Raft storage 
- this means that the node has been initialized successfully
+     *     and a user may be retrying the init in case the successful response 
was lost. To make the init message idempotent
+     *     we simply check that the Raft state and the received message are 
the same.</li>
+     * </ol>
+     */
+    private void handleInit(CmgInitMessage msg, NetworkAddress addr, long 
correlationId) {
+        var newState = new ClusterState(msg.cmgNodes(), 
msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        // This future is needed to add a completion listener at the end of 
the method
+        CompletableFuture<?> resultHook;
+
+        if (raftService == null) {
+            // Raft service has not been started
+            log.info("Init command received, starting the CMG: " + newState);
+
+            raftService = initCmgRaftService(newState);
+
+            this.raftService = raftService;
+
+            resultHook = raftService;
+        } else {
+            // Raft service has been started, which means that this node has 
already received an init command at least once, but
+            // we still need to check that the initialization has completed 
successfully.
+            log.info("Init command received, but the CMG has already been 
started");
+
+            resultHook = raftService.thenCompose(service ->
+                    service.readClusterState()
+                            .thenCompose(state -> {
+                                if (state == null) {
+                                    // Raft state is empty, perform 
re-initialization
+                                    log.info("CMG state is missing, completing 
initialization");
+
+                                    if 
(service.nodeNames().equals(newState.cmgNodes())) {
+                                        return service.isCurrentNodeLeader()
+                                                
.thenCompose(isCurrentNodeLeader ->
+                                                        isCurrentNodeLeader ? 
initCmgState(service, newState) : completedFuture(null));
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                service.nodeNames(), 
newState.cmgNodes()
+                                        ));
+                                    }
+                                } else {
+                                    // Node is fully initialized, just check 
some invariants
+                                    log.info("Node has already been 
initialized");
+
+                                    if (state.equals(newState)) {
+                                        return completedFuture(null);
+                                    } else {
+                                        throw new 
IllegalStateException(String.format(
+                                                "CMG has already been 
initialized with %s, but the new state is different: %s",
+                                                state, newState
+                                        ));
+                                    }
+                                }
+                            }));
+        }
+
+        resultHook.whenComplete((v, e) -> {
+            NetworkMessage response = e == null
+                    ? msgFactory.initCompleteMessage().build()
+                    : 
msgFactory.initErrorMessage().cause(e.getMessage()).build();
+
+            clusterService.messagingService().respond(addr, response, 
correlationId);
+        });
+    }
+
+    /**
+     * Starts the CMG Raft service and writes the given {@code state} to the 
storage.
+     */
+    private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState 
state) {
+        return localStateStorage.putCmgNodeNames(state.cmgNodes())
+                .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+                .thenCompose(service -> service.isCurrentNodeLeader()
+                        .thenCompose(isLeader -> isLeader ? 
initCmgState(service, state) : completedFuture(null))
+                        .thenApply(v -> service));
+    }
+
+    /**
+     * Writes the given state to the CMG's STM and executes some necessary 
on-leader logic.
+     */
+    private CompletableFuture<Void> initCmgState(CmgRaftService service, 
ClusterState state) {
+        return service.writeClusterState(state).thenCompose(v -> 
onLeaderElected(service, state));
+    }
+
+    /**
+     * Executes the following actions when a CMG leader is elected.
+     * <ol>
+     *     <li>Updates the logical topology in case some nodes have gone 
offline during leader election.</li>
+     *     <li>Broadcasts the current CMG state to all nodes in the physical 
topology.</li>
+     * </ol>
+     */
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, 
ClusterState state) {
+        return updateLogicalTopology(service)
+                .thenRun(() -> {
+                    
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                    sendClusterState(state, 
clusterService.topologyService().allMembers());
+                });
+    }
+
+    /**
+     * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
+     * physical topology during the election. New node will be added 
automatically after the new leader broadcasts the current cluster
+     * state.
+     */
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
+        return service.logicalTopology()
+                .thenAccept(logicalTopology -> {
+                    Collection<String> physicalTopologyIds = 
clusterService.topologyService().allMembers()
+                            .stream()
+                            .map(ClusterNode::id)
+                            .collect(toSet());
+
+                    for (ClusterNode node : logicalTopology) {
+                        if (!physicalTopologyIds.contains(node.id())) {
+                            removeNodeFromLogicalTopology(service, node);
                         }
-                    } else {
-                        messagingService.respond(addr, 
errorResponse(msgFactory, e), correlationId);
                     }
                 });
     }
 
-    private void handleCancelInit(CancelInitMessage msg) throws 
NodeStoppingException {
+    private void handleCancelInit(CancelInitMessage msg) {
         log.info("CMG initialization cancelled, reason: " + msg.reason());
 
-        raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+        destroyCmg();
+    }
+
+    /**
+     * Completely destroys the local CMG Raft service.
+     */
+    private void destroyCmg() {
+        try {
+            CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+            if (raftService != null) {
+                raftService.cancel(true);
+            }
+
+            raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
 
-        // TODO: drop the Raft storage as well, 
https://issues.apache.org/jira/browse/IGNITE-16471
+            if (raftStorage.isStarted()) {
+                raftStorage.destroy();
+            }
+
+            localStateStorage.clear().get();
+
+            this.raftService = null;
+        } catch (Exception e) {
+            throw new IgniteInternalException("Error when cleaning the CMG 
state", e);
+        }
     }
 
-    private void handleClusterState(ClusterStateMessage msg) {
-        metastorageNodes.complete(msg.metastorageNodes());
+    /**
+     * Handler for the {@link ClusterStateMessage}.
+     */
+    private void handleClusterState(ClusterStateMessage msg, NetworkAddress 
addr, long correlationId) {
+        clusterService.messagingService().respond(addr, 
msgFactory.clusterStateReceivedMessage().build(), correlationId);
+
+        var state = new ClusterState(msg.cmgNodes(), msg.metaStorageNodes());
+
+        CompletableFuture<CmgRaftService> raftService = this.raftService;
+
+        if (raftService == null) {
+            raftService = initCmgRaftService(state);
+        } else {
+            // Raft service might have been started on wrong CMG nodes, 
because CMG state can change while a node is offline. In this
+            // case we need to re-create the service.
+            raftService = raftService.thenCompose(service -> {
+                if (service.nodeNames().equals(state.cmgNodes())) {
+                    return completedFuture(service);
+                } else {
+                    if (log.isInfoEnabled()) {
+                        log.info("CMG has been started on {}, but the cluster 
state is different: {}. "
+                                + "Re-creating the CMG Raft service", 
service.nodeNames(), state.cmgNodes());
+                    }
+
+                    destroyCmg();
+
+                    return initCmgRaftService(state);
+                }
+            });
+        }
+
+        raftService
+                .thenCompose(CmgRaftService::joinCluster)
+                .thenRun(() -> 
metaStorageNodes.complete(state.metaStorageNodes()));
+
+        this.raftService = raftService;
     }
 
-    private static NetworkMessage successResponse(CmgMessagesFactory 
msgFactory) {
-        log.info("CMG started successfully");
+    /**
+     * Starts the CMG Raft service using the provided node names as its peers.
+     */
+    private CompletableFuture<CmgRaftService> 
startCmgRaftService(Collection<String> nodeNames) {
+        List<ClusterNode> nodes = resolveNodes(clusterService, nodeNames);
 
-        return msgFactory.initCompleteMessage().build();
+        try {
+            return raftManager
+                    .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
+                        raftStorage.start();
+
+                        return new CmgRaftGroupListener(raftStorage);
+                    })
+                    .thenApply(service -> new CmgRaftService(service, 
clusterService));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
     }
 
-    private void broadcastClusterState(Collection<String> metaStorageNodes) {
-        NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
-                .metastorageNodes(metaStorageNodes)
-                .build();
+    private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService 
raftService) {
+        return new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                raftService.readClusterState()
+                        .thenAccept(state -> {
+                            if (state != null) {
+                                sendClusterState(state, List.of(member));
+                            } else if (log.isWarnEnabled()) {
+                                log.warn("Cannot send the cluster state to a 
newly added node {} because cluster state is empty", member);
+                            }
+                        });
+            }
 
-        clusterService.topologyService()
-                .allMembers()
-                .forEach(node -> clusterService.messagingService().send(node, 
clusterStateMsg));
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                removeNodeFromLogicalTopology(raftService, member);
+            }
+        };
     }
 
-    private static NetworkMessage errorResponse(CmgMessagesFactory msgFactory, 
Throwable e) {
-        log.error("Exception when starting the CMG", e);
+    private void removeNodeFromLogicalTopology(CmgRaftService raftService, 
ClusterNode node) {
+        // TODO: delay should be configurable, see 
https://issues.apache.org/jira/browse/IGNITE-16785
+        scheduledExecutor.schedule(() -> {
+            ClusterNode physicalTopologyNode = 
clusterService.topologyService().getByConsistentId(node.name());
 
-        return msgFactory.initErrorMessage()
-                .cause(e.getMessage())
-                .build();
+            if (physicalTopologyNode == null || 
!physicalTopologyNode.id().equals(node.id())) {
+                raftService.removeFromCluster(node);
+            }
+        }, 0, TimeUnit.MILLISECONDS);
     }
 
-    private ClusterNode getLeader(RaftGroupService raftService) {
-        Peer leader = raftService.leader();
+    private void sendClusterState(ClusterState clusterState, 
Collection<ClusterNode> nodes) {
+        NetworkMessage msg = msgFactory.clusterStateMessage()
+                .cmgNodes(clusterState.cmgNodes())
+                .metaStorageNodes(clusterState.metaStorageNodes())
+                .build();
 
-        assert leader != null;
+        for (ClusterNode node : nodes) {
+            sendWithRetry(node, msg);
+        }
+    }
 
-        ClusterNode leaderNode = 
clusterService.topologyService().getByAddress(leader.address());
+    private CompletableFuture<NetworkMessage> sendWithRetry(ClusterNode node, 
NetworkMessage msg) {
+        var result = new CompletableFuture<NetworkMessage>();
 
-        assert leaderNode != null;
+        sendWithRetry(node, msg, result, 5);

Review Comment:
   we can, but again, I have no idea when this configuration will be needed. I 
think we can add it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to