sanpwc commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r850274054
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String>
metaStorageNodeNames, Collection<Stri
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IgniteInternalException("Interrupted while initializing
the cluster", e);
+ throw new InitException("Interrupted while initializing the
cluster", e);
} catch (ExecutionException e) {
- throw new IgniteInternalException("Unable to initialize the
cluster", e.getCause());
+ throw new InitException("Unable to initialize the cluster",
e.getCause());
} finally {
busyLock.leaveBusy();
}
}
- private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) throws NodeStoppingException {
- List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+ @Override
+ public void start() {
+ var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock,
msgFactory, clusterService);
- raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes,
CmgRaftGroupListener::new)
- .whenComplete((service, e) -> {
- MessagingService messagingService =
clusterService.messagingService();
+ // register the ClusterState handler first, because local state
recovery might send such messages
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof ClusterStateMessage) {
+ assert correlationId != null;
- if (e == null) {
- ClusterNode leader = getLeader(service);
+ handleClusterState((ClusterStateMessage) message,
senderAddr, correlationId);
+ }
+ })
+ );
+
+ synchronized (raftServiceLock) {
+ raftService = recoverLocalState();
+ }
+
+ // register the Init handler second in order to handle the command
differently, depending on the local state
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof CancelInitMessage) {
+ handleCancelInit((CancelInitMessage) message);
+ } else if (message instanceof CmgInitMessage) {
+ assert correlationId != null;
+
+ handleInit((CmgInitMessage) message, senderAddr,
correlationId);
+ }
+ })
+ );
+
+ restComponent.registerHandlers(routes ->
+ routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new
InitCommandHandler(clusterInitializer))
+ );
+ }
+
+ /**
+ * Extracts the local state (if any) and starts the CMG.
+ *
+ * @return Future that resolves into the CMG Raft service or {@code null}
if the local state is empty.
+ */
+ @Nullable
+ private CompletableFuture<CmgRaftService> recoverLocalState() {
+ Collection<String> cmgNodes;
+
+ try {
+ cmgNodes = localStateStorage.cmgNodeNames().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- ClusterNode thisNode =
clusterService.topologyService().localMember();
+ throw new IgniteInternalException("Interrupted while retrieving
local CMG state", e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Error while retrieving local
CMG state", e);
+ }
+
+ if (cmgNodes.isEmpty()) {
+ return null;
+ }
- messagingService.respond(addr,
successResponse(msgFactory), correlationId);
+ log.info("Local CMG state recovered, starting the CMG");
+
+ return startCmgRaftService(cmgNodes)
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> {
+ if (isLeader) {
+ return service.readClusterState()
+ // Raft state might not have been
initialized in case of leader failure during cluster init
+ // TODO: properly handle this case,
see https://issues.apache.org/jira/browse/IGNITE-16819
+ .thenCompose(state -> state == null ?
completedFuture(null) : onLeaderElected(service, state));
+ } else {
+ return completedFuture(null);
+ }
+ })
+ .thenApply(v -> service));
+ }
+
+ /**
+ * Handles the Init command.
+ *
+ * <p>This method needs to take the following possibilities into account,
depending on the local state and the Raft state:
+ * <ol>
+ * <li>No local state found (the Raft service future is {@code null})
- this means that the current node has never been initialized
+ * before.</li>
+ * <li>Local state found (the Raft service future has been started and
therefore is not {@code null}),
+ * but no CMG state present in the Raft storage - this means that the
node has failed somewhere during
+ * the init process. In this case we need to check the consistency of
the local state and the received message and complete
+ * the init process.</li>
+ * <li>Local state found and CMG state is present in the Raft storage
- this means that the node has been initialized successfully
+ * and a user may be retrying the init in case the successful response
was lost. To make the init message idempotent
+ * we simply check that the Raft state and the received message are
the same.</li>
+ * </ol>
+ */
+ private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) {
+ var newState = new ClusterState(msg.cmgNodes(),
msg.metaStorageNodes());
+
+ // This future is needed to add a completion listener at the end of
the method
+ CompletableFuture<?> resultHook;
+
+ synchronized (raftServiceLock) {
+ if (raftService == null) {
+ // Raft service has not been started
+ log.info("Init command received, starting the CMG: " +
newState);
+
+ raftService = initCmgRaftService(newState);
+
+ resultHook = raftService;
+ } else {
+ // Raft service has been started, which means that this node
has already received an init command at least once, but
+ // we still need to check that the initialization has
completed successfully.
+ log.info("Init command received, but the CMG has already been
started");
+
+ resultHook = raftService.thenCompose(service ->
+ service.readClusterState()
+ .thenCompose(state -> {
+ if (state == null) {
+ // Raft state is empty, perform
re-initialization
+ log.info("CMG state is missing,
completing initialization");
+
+ if
(service.nodeNames().equals(newState.cmgNodes())) {
+ return
service.isCurrentNodeLeader()
+
.thenCompose(isCurrentNodeLeader ->
+
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ service.nodeNames(),
newState.cmgNodes()
+ ));
+ }
+ } else {
+ // Node is fully initialized, just
check some invariants
+ log.info("Node has already been
initialized");
+
+ if (state.equals(newState)) {
+ return completedFuture(null);
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ state, newState
+ ));
+ }
+ }
+ }));
+ }
+ }
+
+ resultHook.whenComplete((v, e) -> {
+ NetworkMessage response;
+
+ if (e == null) {
+ response = msgFactory.initCompleteMessage().build();
+ } else {
+ response = msgFactory.initErrorMessage()
+ .cause(e.getMessage())
+ .shouldCancel(!(e instanceof
IllegalInitArgumentException))
+ .build();
+ }
+
+ clusterService.messagingService().respond(addr, response,
correlationId);
+ });
+ }
+
+ /**
+ * Starts the CMG Raft service and writes the given {@code state} to the
storage.
+ */
+ private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState
state) {
+ return localStateStorage.putCmgNodeNames(state.cmgNodes())
+ .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> isLeader ?
initCmgState(service, state) : completedFuture(null))
+ .thenApply(v -> service));
+ }
- if (leader.equals(thisNode)) {
- broadcastClusterState(msg.metaStorageNodes());
+ /**
+ * Writes the given state to the CMG's STM and executes some necessary
on-leader logic.
+ */
+ private CompletableFuture<Void> initCmgState(CmgRaftService service,
ClusterState state) {
+ return service.writeClusterState(state).thenCompose(v ->
onLeaderElected(service, state));
+ }
+
+ /**
+ * Executes the following actions when a CMG leader is elected.
+ * <ol>
+ * <li>Updates the logical topology in case some nodes have gone
offline during leader election.</li>
+ * <li>Broadcasts the current CMG state to all nodes in the physical
topology.</li>
+ * </ol>
+ */
+ private CompletableFuture<Void> onLeaderElected(CmgRaftService service,
ClusterState state) {
+ return updateLogicalTopology(service)
+ .thenRun(() -> {
+
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+ sendClusterState(state,
clusterService.topologyService().allMembers());
+ });
+ }
+
+ /**
+ * This method must be executed upon CMG leader election in order to
regain logical topology consistency in case some nodes left the
+ * physical topology during the election. New node will be added
automatically after the new leader broadcasts the current cluster
+ * state.
+ */
+ private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
+ return service.logicalTopology()
+ .thenAccept(logicalTopology -> {
+ Collection<String> physicalTopologyIds =
clusterService.topologyService().allMembers()
+ .stream()
+ .map(ClusterNode::id)
+ .collect(toSet());
+
+ for (ClusterNode node : logicalTopology) {
+ if (!physicalTopologyIds.contains(node.id())) {
+ scheduleRemoveFromLogicalTopology(service, node);
Review Comment:
Let's update `service.removeFromCluster(node);` a bit and corresponding
NodeLeaveCommand to Node**s**LeaveCommand with
`private final Set<ClusterNode> node;`
instead of
` private final ClusterNode node;`
in order to remove all nodes at ones.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]