sanpwc commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r847519203
##########
modules/api/src/main/java/org/apache/ignite/Ignition.java:
##########
@@ -102,4 +111,23 @@ public interface Ignition {
* @throws IllegalArgumentException if null is specified instead of node
name.
*/
public void stop(String name);
+
+ /**
+ * Initializes the cluster that the given node is present in.
+ *
+ * @param name name of the node that the initialization request will be
sent to.
+ * @param metaStorageNodeNames names of nodes that will host the Meta
Storage and the CMG.
+ * @throws NodeStoppingException If node stopping intention was detected.
Review Comment:
It's not valid to throw `NodeStoppingException extends
IgniteInternalCheckedException` from public API.
##########
modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java:
##########
@@ -66,15 +70,21 @@ public class ItConfigCommandTest extends AbstractCliTest {
private IgniteImpl node;
@BeforeEach
- void setup(@WorkDirectory Path workDir, TestInfo testInfo) {
- node = (IgniteImpl) IgnitionManager.start(testNodeName(testInfo, 0),
null, workDir);
+ void setup(@WorkDirectory Path workDir, TestInfo testInfo) throws
NodeStoppingException {
Review Comment:
As was mentioned above it's not valid to throw internal
NodeStoppingException from public API.
##########
modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java:
##########
@@ -66,15 +70,21 @@ public class ItConfigCommandTest extends AbstractCliTest {
private IgniteImpl node;
@BeforeEach
- void setup(@WorkDirectory Path workDir, TestInfo testInfo) {
- node = (IgniteImpl) IgnitionManager.start(testNodeName(testInfo, 0),
null, workDir);
+ void setup(@WorkDirectory Path workDir, TestInfo testInfo) throws
NodeStoppingException {
+ String nodeName = testNodeName(testInfo, 0);
+
+ CompletableFuture<Ignite> future = IgnitionManager.start(nodeName,
null, workDir);
+
+ IgnitionManager.init(nodeName, List.of(nodeName));
Review Comment:
Seems that having nodeName as an init parameter is useful for some test
scenarios but in general case we might use any of
org.apache.ignite.internal.app.IgnitionImpl#nodes. I'd rather add overloaded
version of init() without nodeName param, WDYT?
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -227,7 +233,13 @@ public class IgniteImpl implements Ignite {
txManager = new TableTxManagerImpl(clusterSvc, new HeapLockManager());
- cmgMgr = new ClusterManagementGroupManager(clusterSvc, raftMgr,
restComponent);
+ cmgMgr = new ClusterManagementGroupManager(
Review Comment:
Currently there's such code (by the way it's located in incorrect place) in
IgniteImpl
```
private void waitForJoinPermission() {
// TODO https://issues.apache.org/jira/browse/IGNITE-15114
}
```
meaning that components after CMGManager should start after receiving
NodeJoinResponse[OK].
According to
```
raftService
.thenCompose(CmgRaftService::joinCluster)
.thenRun(() ->
metaStorageNodes.complete(state.metaStorageNodes()));
```
in
`org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager#handleClusterState`
it's not implemented. Could you please fix it?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String>
metaStorageNodeNames, Collection<Stri
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IgniteInternalException("Interrupted while initializing
the cluster", e);
+ throw new InitException("Interrupted while initializing the
cluster", e);
} catch (ExecutionException e) {
- throw new IgniteInternalException("Unable to initialize the
cluster", e.getCause());
+ throw new InitException("Unable to initialize the cluster",
e.getCause());
} finally {
busyLock.leaveBusy();
}
}
- private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) throws NodeStoppingException {
- List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+ @Override
+ public void start() {
+ var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock,
msgFactory, clusterService);
- raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes,
CmgRaftGroupListener::new)
- .whenComplete((service, e) -> {
- MessagingService messagingService =
clusterService.messagingService();
+ // register the ClusterState handler first, because local state
recovery might send such messages
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof ClusterStateMessage) {
+ assert correlationId != null;
- if (e == null) {
- ClusterNode leader = getLeader(service);
+ handleClusterState((ClusterStateMessage) message,
senderAddr, correlationId);
+ }
+ })
+ );
+
+ synchronized (raftServiceLock) {
+ raftService = recoverLocalState();
+ }
+
+ // register the Init handler second in order to handle the command
differently, depending on the local state
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof CancelInitMessage) {
+ handleCancelInit((CancelInitMessage) message);
+ } else if (message instanceof CmgInitMessage) {
+ assert correlationId != null;
+
+ handleInit((CmgInitMessage) message, senderAddr,
correlationId);
+ }
+ })
+ );
+
+ restComponent.registerHandlers(routes ->
+ routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new
InitCommandHandler(clusterInitializer))
+ );
+ }
+
+ /**
+ * Extracts the local state (if any) and starts the CMG.
+ *
+ * @return Future that resolves into the CMG Raft service or {@code null}
if the local state is empty.
+ */
+ @Nullable
+ private CompletableFuture<CmgRaftService> recoverLocalState() {
+ Collection<String> cmgNodes;
+
+ try {
+ cmgNodes = localStateStorage.cmgNodeNames().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- ClusterNode thisNode =
clusterService.topologyService().localMember();
+ throw new IgniteInternalException("Interrupted while retrieving
local CMG state", e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Error while retrieving local
CMG state", e);
+ }
+
+ if (cmgNodes.isEmpty()) {
+ return null;
+ }
- messagingService.respond(addr,
successResponse(msgFactory), correlationId);
+ log.info("Local CMG state recovered, starting the CMG");
+
+ return startCmgRaftService(cmgNodes)
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> {
+ if (isLeader) {
+ return service.readClusterState()
+ // Raft state might not have been
initialized in case of leader failure during cluster init
+ // TODO: properly handle this case,
see https://issues.apache.org/jira/browse/IGNITE-16819
+ .thenCompose(state -> state == null ?
completedFuture(null) : onLeaderElected(service, state));
+ } else {
+ return completedFuture(null);
+ }
+ })
+ .thenApply(v -> service));
+ }
+
+ /**
+ * Handles the Init command.
+ *
+ * <p>This method needs to take the following possibilities into account,
depending on the local state and the Raft state:
+ * <ol>
+ * <li>No local state found (the Raft service future is {@code null})
- this means that the current node has never been initialized
+ * before.</li>
+ * <li>Local state found (the Raft service future has been started and
therefore is not {@code null}),
+ * but no CMG state present in the Raft storage - this means that the
node has failed somewhere during
+ * the init process. In this case we need to check the consistency of
the local state and the received message and complete
+ * the init process.</li>
+ * <li>Local state found and CMG state is present in the Raft storage
- this means that the node has been initialized successfully
+ * and a user may be retrying the init in case the successful response
was lost. To make the init message idempotent
+ * we simply check that the Raft state and the received message are
the same.</li>
+ * </ol>
+ */
+ private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) {
+ var newState = new ClusterState(msg.cmgNodes(),
msg.metaStorageNodes());
+
+ // This future is needed to add a completion listener at the end of
the method
+ CompletableFuture<?> resultHook;
+
+ synchronized (raftServiceLock) {
+ if (raftService == null) {
+ // Raft service has not been started
+ log.info("Init command received, starting the CMG: " +
newState);
+
+ raftService = initCmgRaftService(newState);
+
+ resultHook = raftService;
+ } else {
+ // Raft service has been started, which means that this node
has already received an init command at least once, but
+ // we still need to check that the initialization has
completed successfully.
+ log.info("Init command received, but the CMG has already been
started");
+
+ resultHook = raftService.thenCompose(service ->
+ service.readClusterState()
+ .thenCompose(state -> {
+ if (state == null) {
+ // Raft state is empty, perform
re-initialization
+ log.info("CMG state is missing,
completing initialization");
+
+ if
(service.nodeNames().equals(newState.cmgNodes())) {
+ return
service.isCurrentNodeLeader()
+
.thenCompose(isCurrentNodeLeader ->
+
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ service.nodeNames(),
newState.cmgNodes()
+ ));
+ }
+ } else {
+ // Node is fully initialized, just
check some invariants
+ log.info("Node has already been
initialized");
+
+ if (state.equals(newState)) {
+ return completedFuture(null);
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ state, newState
+ ));
+ }
+ }
+ }));
+ }
+ }
+
+ resultHook.whenComplete((v, e) -> {
+ NetworkMessage response;
+
+ if (e == null) {
+ response = msgFactory.initCompleteMessage().build();
+ } else {
+ response = msgFactory.initErrorMessage()
+ .cause(e.getMessage())
+ .shouldCancel(!(e instanceof
IllegalInitArgumentException))
+ .build();
+ }
+
+ clusterService.messagingService().respond(addr, response,
correlationId);
+ });
+ }
+
+ /**
+ * Starts the CMG Raft service and writes the given {@code state} to the
storage.
+ */
+ private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState
state) {
+ return localStateStorage.putCmgNodeNames(state.cmgNodes())
+ .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> isLeader ?
initCmgState(service, state) : completedFuture(null))
+ .thenApply(v -> service));
+ }
- if (leader.equals(thisNode)) {
- broadcastClusterState(msg.metaStorageNodes());
+ /**
+ * Writes the given state to the CMG's STM and executes some necessary
on-leader logic.
+ */
+ private CompletableFuture<Void> initCmgState(CmgRaftService service,
ClusterState state) {
+ return service.writeClusterState(state).thenCompose(v ->
onLeaderElected(service, state));
+ }
+
+ /**
+ * Executes the following actions when a CMG leader is elected.
+ * <ol>
+ * <li>Updates the logical topology in case some nodes have gone
offline during leader election.</li>
+ * <li>Broadcasts the current CMG state to all nodes in the physical
topology.</li>
+ * </ol>
+ */
+ private CompletableFuture<Void> onLeaderElected(CmgRaftService service,
ClusterState state) {
+ return updateLogicalTopology(service)
+ .thenRun(() -> {
+
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+ sendClusterState(state,
clusterService.topologyService().allMembers());
+ });
+ }
+
+ /**
+ * This method must be executed upon CMG leader election in order to
regain logical topology consistency in case some nodes left the
+ * physical topology during the election. New node will be added
automatically after the new leader broadcasts the current cluster
+ * state.
+ */
+ private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
+ return service.logicalTopology()
+ .thenAccept(logicalTopology -> {
+ Collection<String> physicalTopologyIds =
clusterService.topologyService().allMembers()
+ .stream()
+ .map(ClusterNode::id)
+ .collect(toSet());
+
+ for (ClusterNode node : logicalTopology) {
+ if (!physicalTopologyIds.contains(node.id())) {
+ scheduleRemoveFromLogicalTopology(service, node);
Review Comment:
Why we need scheduling such removal? I mean full delta removal at once.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String>
metaStorageNodeNames, Collection<Stri
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IgniteInternalException("Interrupted while initializing
the cluster", e);
+ throw new InitException("Interrupted while initializing the
cluster", e);
} catch (ExecutionException e) {
- throw new IgniteInternalException("Unable to initialize the
cluster", e.getCause());
+ throw new InitException("Unable to initialize the cluster",
e.getCause());
} finally {
busyLock.leaveBusy();
}
}
- private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) throws NodeStoppingException {
- List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+ @Override
+ public void start() {
+ var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock,
msgFactory, clusterService);
- raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes,
CmgRaftGroupListener::new)
- .whenComplete((service, e) -> {
- MessagingService messagingService =
clusterService.messagingService();
+ // register the ClusterState handler first, because local state
recovery might send such messages
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof ClusterStateMessage) {
+ assert correlationId != null;
- if (e == null) {
- ClusterNode leader = getLeader(service);
+ handleClusterState((ClusterStateMessage) message,
senderAddr, correlationId);
+ }
+ })
+ );
+
+ synchronized (raftServiceLock) {
+ raftService = recoverLocalState();
+ }
+
+ // register the Init handler second in order to handle the command
differently, depending on the local state
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof CancelInitMessage) {
+ handleCancelInit((CancelInitMessage) message);
+ } else if (message instanceof CmgInitMessage) {
+ assert correlationId != null;
+
+ handleInit((CmgInitMessage) message, senderAddr,
correlationId);
+ }
+ })
+ );
+
+ restComponent.registerHandlers(routes ->
+ routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new
InitCommandHandler(clusterInitializer))
+ );
+ }
+
+ /**
+ * Extracts the local state (if any) and starts the CMG.
+ *
+ * @return Future that resolves into the CMG Raft service or {@code null}
if the local state is empty.
+ */
+ @Nullable
+ private CompletableFuture<CmgRaftService> recoverLocalState() {
+ Collection<String> cmgNodes;
+
+ try {
+ cmgNodes = localStateStorage.cmgNodeNames().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- ClusterNode thisNode =
clusterService.topologyService().localMember();
+ throw new IgniteInternalException("Interrupted while retrieving
local CMG state", e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Error while retrieving local
CMG state", e);
+ }
+
+ if (cmgNodes.isEmpty()) {
+ return null;
+ }
- messagingService.respond(addr,
successResponse(msgFactory), correlationId);
+ log.info("Local CMG state recovered, starting the CMG");
+
+ return startCmgRaftService(cmgNodes)
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> {
+ if (isLeader) {
+ return service.readClusterState()
+ // Raft state might not have been
initialized in case of leader failure during cluster init
+ // TODO: properly handle this case,
see https://issues.apache.org/jira/browse/IGNITE-16819
+ .thenCompose(state -> state == null ?
completedFuture(null) : onLeaderElected(service, state));
+ } else {
+ return completedFuture(null);
+ }
+ })
+ .thenApply(v -> service));
+ }
+
+ /**
+ * Handles the Init command.
+ *
+ * <p>This method needs to take the following possibilities into account,
depending on the local state and the Raft state:
+ * <ol>
+ * <li>No local state found (the Raft service future is {@code null})
- this means that the current node has never been initialized
+ * before.</li>
+ * <li>Local state found (the Raft service future has been started and
therefore is not {@code null}),
+ * but no CMG state present in the Raft storage - this means that the
node has failed somewhere during
+ * the init process. In this case we need to check the consistency of
the local state and the received message and complete
+ * the init process.</li>
+ * <li>Local state found and CMG state is present in the Raft storage
- this means that the node has been initialized successfully
+ * and a user may be retrying the init in case the successful response
was lost. To make the init message idempotent
+ * we simply check that the Raft state and the received message are
the same.</li>
+ * </ol>
+ */
+ private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) {
+ var newState = new ClusterState(msg.cmgNodes(),
msg.metaStorageNodes());
+
+ // This future is needed to add a completion listener at the end of
the method
+ CompletableFuture<?> resultHook;
+
+ synchronized (raftServiceLock) {
+ if (raftService == null) {
+ // Raft service has not been started
+ log.info("Init command received, starting the CMG: " +
newState);
+
+ raftService = initCmgRaftService(newState);
+
+ resultHook = raftService;
+ } else {
+ // Raft service has been started, which means that this node
has already received an init command at least once, but
+ // we still need to check that the initialization has
completed successfully.
+ log.info("Init command received, but the CMG has already been
started");
+
+ resultHook = raftService.thenCompose(service ->
+ service.readClusterState()
+ .thenCompose(state -> {
+ if (state == null) {
+ // Raft state is empty, perform
re-initialization
+ log.info("CMG state is missing,
completing initialization");
+
+ if
(service.nodeNames().equals(newState.cmgNodes())) {
+ return
service.isCurrentNodeLeader()
+
.thenCompose(isCurrentNodeLeader ->
+
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ service.nodeNames(),
newState.cmgNodes()
+ ));
+ }
+ } else {
+ // Node is fully initialized, just
check some invariants
+ log.info("Node has already been
initialized");
+
+ if (state.equals(newState)) {
+ return completedFuture(null);
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ state, newState
+ ));
+ }
+ }
+ }));
+ }
+ }
+
+ resultHook.whenComplete((v, e) -> {
+ NetworkMessage response;
+
+ if (e == null) {
+ response = msgFactory.initCompleteMessage().build();
+ } else {
+ response = msgFactory.initErrorMessage()
+ .cause(e.getMessage())
+ .shouldCancel(!(e instanceof
IllegalInitArgumentException))
+ .build();
+ }
+
+ clusterService.messagingService().respond(addr, response,
correlationId);
+ });
+ }
+
+ /**
+ * Starts the CMG Raft service and writes the given {@code state} to the
storage.
+ */
+ private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState
state) {
+ return localStateStorage.putCmgNodeNames(state.cmgNodes())
+ .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> isLeader ?
initCmgState(service, state) : completedFuture(null))
+ .thenApply(v -> service));
+ }
- if (leader.equals(thisNode)) {
- broadcastClusterState(msg.metaStorageNodes());
+ /**
+ * Writes the given state to the CMG's STM and executes some necessary
on-leader logic.
+ */
+ private CompletableFuture<Void> initCmgState(CmgRaftService service,
ClusterState state) {
+ return service.writeClusterState(state).thenCompose(v ->
onLeaderElected(service, state));
+ }
+
+ /**
+ * Executes the following actions when a CMG leader is elected.
+ * <ol>
+ * <li>Updates the logical topology in case some nodes have gone
offline during leader election.</li>
+ * <li>Broadcasts the current CMG state to all nodes in the physical
topology.</li>
+ * </ol>
+ */
+ private CompletableFuture<Void> onLeaderElected(CmgRaftService service,
ClusterState state) {
Review Comment:
I believe that
```
register clusterSvc.onAppeared(node ->
clusterSvc.msgSvc.invoke(
node.id, clusterState
)
);
```
step is missing.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java:
##########
@@ -122,7 +119,11 @@ void setUp(TestInfo testInfo) {
*/
@AfterEach
void afterEach() throws Exception {
- IgniteUtils.closeAll(ItUtils.reverse(clusterNodes));
Review Comment:
Could you please also remove ItUtils? It's not used.
##########
modules/api/src/main/java/org/apache/ignite/Ignition.java:
##########
@@ -102,4 +111,23 @@ public interface Ignition {
* @throws IllegalArgumentException if null is specified instead of node
name.
*/
public void stop(String name);
+
+ /**
+ * Initializes the cluster that the given node is present in.
+ *
+ * @param name name of the node that the initialization request will be
sent to.
+ * @param metaStorageNodeNames names of nodes that will host the Meta
Storage and the CMG.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ public void init(String name, Collection<String> metaStorageNodeNames)
throws NodeStoppingException;
+
+ /**
+ * Initializes the cluster that the given node is present in.
+ *
+ * @param name name of the node that the initialization request will be
sent to.
+ * @param metaStorageNodeNames names of nodes that will host the Meta
Storage.
+ * @param cmgNodeNames names of nodes that will host the CMG.
Review Comment:
Let's also add more details for what CMG and Meta Storage is. Meaning that
Ignition is public API and corresponding javadocs should be good enough.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java:
##########
@@ -114,15 +121,17 @@ private CompletableFuture<Void>
invokeMessage(Collection<ClusterNode> nodes, Net
.invoke(node, message, 10000)
.thenAccept(response -> {
if (response instanceof InitErrorMessage) {
- throw new InitException(String.format(
- "Got error response from node \"%s\":
%s", node.name(), ((InitErrorMessage) response).cause()
- ));
- }
-
- if (!(response instanceof InitCompleteMessage)) {
- throw new InitException(String.format(
- "Unexpected response from node \"%s\":
%s", node.name(), response.getClass()
- ));
+ var errorResponse = (InitErrorMessage)
response;
+
+ throw new InternalInitException(
+ String.format("Got error response from
node \"%s\": %s", node.name(), errorResponse.cause()),
+ errorResponse.shouldCancel()
+ );
+ } else if (!(response instanceof
InitCompleteMessage)) {
Review Comment:
What behavior do you expected if it's neither InitCompleteMessage nor
!(response instanceof InitCompleteMessage)?
##########
modules/api/src/main/java/org/apache/ignite/Ignition.java:
##########
@@ -102,4 +111,23 @@ public interface Ignition {
* @throws IllegalArgumentException if null is specified instead of node
name.
*/
public void stop(String name);
+
+ /**
+ * Initializes the cluster that the given node is present in.
Review Comment:
I'd rather add more details about what Initialized cluster is.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import
org.apache.ignite.internal.cluster.management.raft.commands.NodeJoinCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.NodeLeaveCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.ReadLogicalTopologyCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.ReadStateCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.WriteStateCommand;
+import
org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link RaftGroupListener} implementation for the CMG.
+ */
+public class CmgRaftGroupListener implements RaftGroupListener {
+ private static final IgniteLogger log =
IgniteLogger.forClass(CmgRaftGroupListener.class);
+
+ private final RaftStorageManager storage;
+
+ public CmgRaftGroupListener(RaftStorage storage) {
+ this.storage = new RaftStorageManager(storage);
+ }
+
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<ReadCommand> clo = iterator.next();
+
+ ReadCommand command = clo.command();
+
+ if (command instanceof ReadStateCommand) {
+ clo.result(storage.getClusterState());
+ } else if (command instanceof ReadLogicalTopologyCommand) {
+ clo.result(new
LogicalTopologyResponse(storage.getLogicalTopology()));
+ }
+ }
+ }
+
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<WriteCommand> clo = iterator.next();
+
+ WriteCommand command = clo.command();
+
+ if (command instanceof WriteStateCommand) {
+ storage.putClusterState(((WriteStateCommand)
command).clusterState());
+ } else if (command instanceof NodeJoinCommand) {
+ // TODO: perform validation
https://issues.apache.org/jira/browse/IGNITE-16717
+ // this method must also remain idempotent, as it might be
called multiple times on the same node
+ storage.putLogicalTopologyNode(((NodeJoinCommand)
command).node());
+ } else if (command instanceof NodeLeaveCommand) {
+ storage.removeLogicalTopologyNode(((NodeLeaveCommand)
command).node());
+ }
+
+ clo.result(null);
+ }
+ }
+
+ @Override
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+ storage.snapshot(path)
+ .whenComplete((unused, throwable) ->
doneClo.accept(throwable));
+ }
+
+ @Override
+ public boolean onSnapshotLoad(Path path) {
+ try {
+ storage.restoreSnapshot(path);
+
+ return true;
+ } catch (IgniteInternalException e) {
+ log.error("Failed to restore snapshot at " + path, e);
+
+ return false;
+ }
+ }
+
+ @Override
+ public void onShutdown() {
+ // no-op
Review Comment:
What about closing RaftStorage?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import
org.apache.ignite.internal.cluster.management.raft.commands.NodeJoinCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.NodeLeaveCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.ReadLogicalTopologyCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.ReadStateCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.WriteStateCommand;
+import
org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link RaftGroupListener} implementation for the CMG.
+ */
+public class CmgRaftGroupListener implements RaftGroupListener {
+ private static final IgniteLogger log =
IgniteLogger.forClass(CmgRaftGroupListener.class);
+
+ private final RaftStorageManager storage;
+
+ public CmgRaftGroupListener(RaftStorage storage) {
+ this.storage = new RaftStorageManager(storage);
+ }
+
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<ReadCommand> clo = iterator.next();
+
+ ReadCommand command = clo.command();
+
+ if (command instanceof ReadStateCommand) {
+ clo.result(storage.getClusterState());
+ } else if (command instanceof ReadLogicalTopologyCommand) {
+ clo.result(new
LogicalTopologyResponse(storage.getLogicalTopology()));
+ }
+ }
+ }
+
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<WriteCommand> clo = iterator.next();
+
+ WriteCommand command = clo.command();
+
+ if (command instanceof WriteStateCommand) {
+ storage.putClusterState(((WriteStateCommand)
command).clusterState());
+ } else if (command instanceof NodeJoinCommand) {
+ // TODO: perform validation
https://issues.apache.org/jira/browse/IGNITE-16717
+ // this method must also remain idempotent, as it might be
called multiple times on the same node
+ storage.putLogicalTopologyNode(((NodeJoinCommand)
command).node());
Review Comment:
As you've mentioned earlier, node should be added to the logical topology
not after NodeJoinCommand but after NodeJoinFinishCommand.
##########
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapRaftStorage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link RaftStorage} in-memory implementation based on a {@link
ConcurrentHashMap}.
+ */
+public class ConcurrentMapRaftStorage implements RaftStorage {
Review Comment:
@TestOnly ?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/network/messages/ClusterStateReceivedMessage.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.network.messages;
+
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Successful response for receiving a {@link ClusterStateMessage}.
+ */
+@Transferable(CmgMessageGroup.CLUSTER_STATE_RECEIVED)
+public interface ClusterStateReceivedMessage extends NetworkMessage {
Review Comment:
Do we really need it? Isn't there any general OK response?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -18,103 +18,114 @@
package org.apache.ignite.internal.cluster.management;
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.network.util.ClusterServiceUtils.resolveNodes;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import
org.apache.ignite.internal.cluster.management.messages.CancelInitMessage;
-import
org.apache.ignite.internal.cluster.management.messages.ClusterStateMessage;
-import org.apache.ignite.internal.cluster.management.messages.CmgInitMessage;
-import org.apache.ignite.internal.cluster.management.messages.CmgMessageGroup;
-import
org.apache.ignite.internal.cluster.management.messages.CmgMessagesFactory;
+import
org.apache.ignite.internal.cluster.management.network.CmgMessageHandlerFactory;
+import
org.apache.ignite.internal.cluster.management.network.messages.CancelInitMessage;
+import
org.apache.ignite.internal.cluster.management.network.messages.ClusterStateMessage;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgInitMessage;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.raft.ClusterState;
+import org.apache.ignite.internal.cluster.management.raft.CmgRaftGroupListener;
+import org.apache.ignite.internal.cluster.management.raft.CmgRaftService;
+import org.apache.ignite.internal.cluster.management.raft.RaftStorage;
import org.apache.ignite.internal.cluster.management.rest.InitCommandHandler;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite component responsible for cluster initialization and managing the
Cluster Management Raft Group.
+ *
+ * <p>Refer to
+ * <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">IEP-77</a>
+ * for the description of the Cluster Management Group and its
responsibilities.
*/
public class ClusterManagementGroupManager implements IgniteComponent {
+ // TODO: timeout should be configurable, see
https://issues.apache.org/jira/browse/IGNITE-16785
+ private static final int NETWORK_INVOKE_TIMEOUT = 500;
+
private static final IgniteLogger log =
IgniteLogger.forClass(ClusterManagementGroupManager.class);
/** CMG Raft group name. */
private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
/** Init REST endpoint path. */
- public static final String REST_ENDPOINT = "/management/v1/cluster/init";
+ private static final String REST_ENDPOINT = "/management/v1/cluster/init";
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** Future that resolves into a CMG Raft service. Can be {@code null} if
the Raft service has not been started. */
+ @Nullable
+ private CompletableFuture<CmgRaftService> raftService;
+
+ /** Lock for the {@code raftService} field. */
+ private final Object raftServiceLock = new Object();
+
+ /** Future that resolves into a list of node names that host the Meta
Storage. */
+ private final CompletableFuture<Collection<String>> metaStorageNodes = new
CompletableFuture<>();
+
+ /** Message factory. */
+ private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
+
+ /** Delayed executor. */
+ private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+
private final ClusterService clusterService;
private final Loza raftManager;
- private final RestComponent restModule;
+ private final RestComponent restComponent;
- /** Handles cluster initialization flow. */
- private final ClusterInitializer clusterInitializer;
+ private final RaftStorage raftStorage;
Review Comment:
Naming: I'd rather use ClusterStateStorage both for class and variable.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItNoThreadsLeftTest.java:
##########
@@ -95,6 +98,24 @@ public void test(TestInfo testInfo) throws Exception {
}
}
+ private Ignite startNode(TestInfo testInfo) throws NodeStoppingException {
Review Comment:
Seems that such startThenInitAndSometimesAddToClusterNodeCollection method
is widely spread. Let's add it to IgniteAbstractTest of similar, wdyt?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorage.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage for the CMG Raft service.
+ */
+public interface RaftStorage extends AutoCloseable {
Review Comment:
As was mentioned above it's either ClusterStateStorage or general
SimpleKeyValueStorage. In latter it should be moved to some common non-cmg
module.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static java.util.stream.Collectors.toList;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A wrapper around a {@link RaftStorage} which provides convenient methods.
+ */
+class RaftStorageManager {
Review Comment:
Do we really need it? Seems that all logic implemented here might be
in-lined into appropriate components and storage itself - up to you though. In
any case naming: ClusterStateStorageManager or similar.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -133,90 +144,384 @@ public void initCluster(Collection<String>
metaStorageNodeNames, Collection<Stri
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IgniteInternalException("Interrupted while initializing
the cluster", e);
+ throw new InitException("Interrupted while initializing the
cluster", e);
} catch (ExecutionException e) {
- throw new IgniteInternalException("Unable to initialize the
cluster", e.getCause());
+ throw new InitException("Unable to initialize the cluster",
e.getCause());
} finally {
busyLock.leaveBusy();
}
}
- private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) throws NodeStoppingException {
- List<ClusterNode> nodes = resolveNodes(clusterService, msg.cmgNodes());
+ @Override
+ public void start() {
+ var messageHandlerFactory = new CmgMessageHandlerFactory(busyLock,
msgFactory, clusterService);
- raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes,
CmgRaftGroupListener::new)
- .whenComplete((service, e) -> {
- MessagingService messagingService =
clusterService.messagingService();
+ // register the ClusterState handler first, because local state
recovery might send such messages
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof ClusterStateMessage) {
+ assert correlationId != null;
- if (e == null) {
- ClusterNode leader = getLeader(service);
+ handleClusterState((ClusterStateMessage) message,
senderAddr, correlationId);
+ }
+ })
+ );
+
+ synchronized (raftServiceLock) {
+ raftService = recoverLocalState();
+ }
+
+ // register the Init handler second in order to handle the command
differently, depending on the local state
+ clusterService.messagingService().addMessageHandler(
+ CmgMessageGroup.class,
+ messageHandlerFactory.wrapHandler((message, senderAddr,
correlationId) -> {
+ if (message instanceof CancelInitMessage) {
+ handleCancelInit((CancelInitMessage) message);
+ } else if (message instanceof CmgInitMessage) {
+ assert correlationId != null;
+
+ handleInit((CmgInitMessage) message, senderAddr,
correlationId);
+ }
+ })
+ );
+
+ restComponent.registerHandlers(routes ->
+ routes.post(REST_ENDPOINT, APPLICATION_JSON.toString(), new
InitCommandHandler(clusterInitializer))
+ );
+ }
+
+ /**
+ * Extracts the local state (if any) and starts the CMG.
+ *
+ * @return Future that resolves into the CMG Raft service or {@code null}
if the local state is empty.
+ */
+ @Nullable
+ private CompletableFuture<CmgRaftService> recoverLocalState() {
+ Collection<String> cmgNodes;
+
+ try {
+ cmgNodes = localStateStorage.cmgNodeNames().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- ClusterNode thisNode =
clusterService.topologyService().localMember();
+ throw new IgniteInternalException("Interrupted while retrieving
local CMG state", e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Error while retrieving local
CMG state", e);
+ }
+
+ if (cmgNodes.isEmpty()) {
+ return null;
+ }
- messagingService.respond(addr,
successResponse(msgFactory), correlationId);
+ log.info("Local CMG state recovered, starting the CMG");
+
+ return startCmgRaftService(cmgNodes)
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> {
+ if (isLeader) {
+ return service.readClusterState()
+ // Raft state might not have been
initialized in case of leader failure during cluster init
+ // TODO: properly handle this case,
see https://issues.apache.org/jira/browse/IGNITE-16819
+ .thenCompose(state -> state == null ?
completedFuture(null) : onLeaderElected(service, state));
+ } else {
+ return completedFuture(null);
+ }
+ })
+ .thenApply(v -> service));
+ }
+
+ /**
+ * Handles the Init command.
+ *
+ * <p>This method needs to take the following possibilities into account,
depending on the local state and the Raft state:
+ * <ol>
+ * <li>No local state found (the Raft service future is {@code null})
- this means that the current node has never been initialized
+ * before.</li>
+ * <li>Local state found (the Raft service future has been started and
therefore is not {@code null}),
+ * but no CMG state present in the Raft storage - this means that the
node has failed somewhere during
+ * the init process. In this case we need to check the consistency of
the local state and the received message and complete
+ * the init process.</li>
+ * <li>Local state found and CMG state is present in the Raft storage
- this means that the node has been initialized successfully
+ * and a user may be retrying the init in case the successful response
was lost. To make the init message idempotent
+ * we simply check that the Raft state and the received message are
the same.</li>
+ * </ol>
+ */
+ private void handleInit(CmgInitMessage msg, NetworkAddress addr, long
correlationId) {
+ var newState = new ClusterState(msg.cmgNodes(),
msg.metaStorageNodes());
+
+ // This future is needed to add a completion listener at the end of
the method
+ CompletableFuture<?> resultHook;
+
+ synchronized (raftServiceLock) {
+ if (raftService == null) {
+ // Raft service has not been started
+ log.info("Init command received, starting the CMG: " +
newState);
+
+ raftService = initCmgRaftService(newState);
+
+ resultHook = raftService;
+ } else {
+ // Raft service has been started, which means that this node
has already received an init command at least once, but
+ // we still need to check that the initialization has
completed successfully.
+ log.info("Init command received, but the CMG has already been
started");
+
+ resultHook = raftService.thenCompose(service ->
+ service.readClusterState()
+ .thenCompose(state -> {
+ if (state == null) {
+ // Raft state is empty, perform
re-initialization
+ log.info("CMG state is missing,
completing initialization");
+
+ if
(service.nodeNames().equals(newState.cmgNodes())) {
+ return
service.isCurrentNodeLeader()
+
.thenCompose(isCurrentNodeLeader ->
+
isCurrentNodeLeader ? initCmgState(service, newState) : completedFuture(null));
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ service.nodeNames(),
newState.cmgNodes()
+ ));
+ }
+ } else {
+ // Node is fully initialized, just
check some invariants
+ log.info("Node has already been
initialized");
+
+ if (state.equals(newState)) {
+ return completedFuture(null);
+ } else {
+ throw new
IllegalInitArgumentException(String.format(
+ "CMG has already been
initialized with %s, but the new state is different: %s",
+ state, newState
+ ));
+ }
+ }
+ }));
+ }
+ }
+
+ resultHook.whenComplete((v, e) -> {
+ NetworkMessage response;
+
+ if (e == null) {
+ response = msgFactory.initCompleteMessage().build();
+ } else {
+ response = msgFactory.initErrorMessage()
+ .cause(e.getMessage())
+ .shouldCancel(!(e instanceof
IllegalInitArgumentException))
+ .build();
+ }
+
+ clusterService.messagingService().respond(addr, response,
correlationId);
+ });
+ }
+
+ /**
+ * Starts the CMG Raft service and writes the given {@code state} to the
storage.
+ */
+ private CompletableFuture<CmgRaftService> initCmgRaftService(ClusterState
state) {
+ return localStateStorage.putCmgNodeNames(state.cmgNodes())
+ .thenCompose(v -> startCmgRaftService(state.cmgNodes()))
+ .thenCompose(service -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> isLeader ?
initCmgState(service, state) : completedFuture(null))
+ .thenApply(v -> service));
+ }
- if (leader.equals(thisNode)) {
- broadcastClusterState(msg.metaStorageNodes());
+ /**
+ * Writes the given state to the CMG's STM and executes some necessary
on-leader logic.
+ */
+ private CompletableFuture<Void> initCmgState(CmgRaftService service,
ClusterState state) {
+ return service.writeClusterState(state).thenCompose(v ->
onLeaderElected(service, state));
+ }
+
+ /**
+ * Executes the following actions when a CMG leader is elected.
+ * <ol>
+ * <li>Updates the logical topology in case some nodes have gone
offline during leader election.</li>
+ * <li>Broadcasts the current CMG state to all nodes in the physical
topology.</li>
+ * </ol>
+ */
+ private CompletableFuture<Void> onLeaderElected(CmgRaftService service,
ClusterState state) {
+ return updateLogicalTopology(service)
+ .thenRun(() -> {
+
clusterService.topologyService().addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+ sendClusterState(state,
clusterService.topologyService().allMembers());
+ });
+ }
+
+ /**
+ * This method must be executed upon CMG leader election in order to
regain logical topology consistency in case some nodes left the
+ * physical topology during the election. New node will be added
automatically after the new leader broadcasts the current cluster
+ * state.
+ */
+ private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
+ return service.logicalTopology()
+ .thenAccept(logicalTopology -> {
+ Collection<String> physicalTopologyIds =
clusterService.topologyService().allMembers()
+ .stream()
+ .map(ClusterNode::id)
+ .collect(toSet());
+
+ for (ClusterNode node : logicalTopology) {
+ if (!physicalTopologyIds.contains(node.id())) {
+ scheduleRemoveFromLogicalTopology(service, node);
}
- } else {
- messagingService.respond(addr,
errorResponse(msgFactory, e), correlationId);
}
});
}
- private void handleCancelInit(CancelInitMessage msg) throws
NodeStoppingException {
+ private void handleCancelInit(CancelInitMessage msg) {
log.info("CMG initialization cancelled, reason: " + msg.reason());
- raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+ destroyCmg();
+ }
+
+ /**
+ * Completely destroys the local CMG Raft service.
+ */
+ private void destroyCmg() {
+ synchronized (raftServiceLock) {
+ try {
+ if (raftService != null) {
+ raftService.cancel(true);
- // TODO: drop the Raft storage as well,
https://issues.apache.org/jira/browse/IGNITE-16471
+ raftService = null;
+ }
+
+ raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+ if (raftStorage.isStarted()) {
+ raftStorage.destroy();
+ }
+
+ localStateStorage.clear().get();
+ } catch (Exception e) {
+ throw new IgniteInternalException("Error when cleaning the CMG
state", e);
+ }
+ }
}
- private void handleClusterState(ClusterStateMessage msg) {
- metastorageNodes.complete(msg.metastorageNodes());
+ /**
+ * Handler for the {@link ClusterStateMessage}.
+ */
+ private void handleClusterState(ClusterStateMessage msg, NetworkAddress
addr, long correlationId) {
+ clusterService.messagingService().respond(addr,
msgFactory.clusterStateReceivedMessage().build(), correlationId);
+
+ var state = new ClusterState(msg.cmgNodes(), msg.metaStorageNodes());
+
+ synchronized (raftServiceLock) {
+ if (raftService == null) {
+ raftService = initCmgRaftService(state);
+ } else {
+ // Raft service might have been started on wrong CMG nodes,
because CMG state can change while a node is offline. In this
+ // case we need to re-create the service.
+ raftService = raftService.thenCompose(service -> {
+ if (service.nodeNames().equals(state.cmgNodes())) {
+ return completedFuture(service);
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info("CMG has been started on {}, but the
cluster state is different: {}. "
+ + "Re-creating the CMG Raft service",
service.nodeNames(), state.cmgNodes());
+ }
+
+ destroyCmg();
+
+ return initCmgRaftService(state);
+ }
+ });
+ }
+
+ raftService
+ .thenCompose(CmgRaftService::joinCluster)
+ .thenRun(() ->
metaStorageNodes.complete(state.metaStorageNodes()));
+ }
}
- private static NetworkMessage successResponse(CmgMessagesFactory
msgFactory) {
- log.info("CMG started successfully");
+ /**
+ * Starts the CMG Raft service using the provided node names as its peers.
+ */
+ private CompletableFuture<CmgRaftService>
startCmgRaftService(Collection<String> nodeNames) {
+ // TODO: wait for nodes to appear, see
https://issues.apache.org/jira/browse/IGNITE-16811
+ List<ClusterNode> nodes = resolveNodes(clusterService, nodeNames);
- return msgFactory.initCompleteMessage().build();
+ try {
+ return raftManager
+ .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
+ raftStorage.start();
+
+ return new CmgRaftGroupListener(raftStorage);
+ })
+ .thenApply(service -> new CmgRaftService(service,
clusterService));
+ } catch (NodeStoppingException e) {
+ return CompletableFuture.failedFuture(e);
+ }
}
- private void broadcastClusterState(Collection<String> metaStorageNodes) {
- NetworkMessage clusterStateMsg = msgFactory.clusterStateMessage()
- .metastorageNodes(metaStorageNodes)
- .build();
+ private TopologyEventHandler cmgLeaderTopologyEventHandler(CmgRaftService
raftService) {
+ return new TopologyEventHandler() {
+ @Override
+ public void onAppeared(ClusterNode member) {
+ raftService.readClusterState()
+ .thenAccept(state -> {
+ if (state != null) {
+ sendClusterState(state, List.of(member));
+ } else if (log.isWarnEnabled()) {
+ log.warn("Cannot send the cluster state to a
newly added node {} because cluster state is empty", member);
+ }
+ });
+ }
- clusterService.topologyService()
- .allMembers()
- .forEach(node -> clusterService.messagingService().send(node,
clusterStateMsg));
+ @Override
+ public void onDisappeared(ClusterNode member) {
+ scheduleRemoveFromLogicalTopology(raftService, member);
+ }
+ };
}
- private static NetworkMessage errorResponse(CmgMessagesFactory msgFactory,
Throwable e) {
- log.error("Exception when starting the CMG", e);
+ private void scheduleRemoveFromLogicalTopology(CmgRaftService raftService,
ClusterNode node) {
+ // TODO: delay should be configurable, see
https://issues.apache.org/jira/browse/IGNITE-16785
+ scheduledExecutor.schedule(() -> {
+ ClusterNode physicalTopologyNode =
clusterService.topologyService().getByConsistentId(node.name());
- return msgFactory.initErrorMessage()
- .cause(e.getMessage())
- .build();
+ if (physicalTopologyNode == null ||
!physicalTopologyNode.id().equals(node.id())) {
+ raftService.removeFromCluster(node);
+ }
+ }, 0, TimeUnit.MILLISECONDS);
}
- private ClusterNode getLeader(RaftGroupService raftService) {
- Peer leader = raftService.leader();
+ private void sendClusterState(ClusterState clusterState,
Collection<ClusterNode> nodes) {
+ NetworkMessage msg = msgFactory.clusterStateMessage()
Review Comment:
Cluster state message should also contain some information for validation,
e.g.Ignite version and cluster tag. I believe that this was planned to be done
under a separate ticket as part of a join epic. Could you please add TODO?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]