sanpwc commented on code in PR #795:
URL: https://github.com/apache/ignite-3/pull/795#discussion_r876091083
##########
modules/api/src/main/java/org/apache/ignite/IgnitionManager.java:
##########
@@ -32,7 +33,11 @@
* Service loader based implementation of an entry point for handling grid
lifecycle.
*/
public class IgnitionManager {
- /** Loaded Ignition instance. */
+ /**
+ * Loaded Ignition instance.
+ *
+ * <p>Concurrent access is guarded by {@code this}.
Review Comment:
this class
##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -57,23 +61,30 @@ public class ItClusterManagerTest {
@WorkDirectory
private Path workDir;
- @BeforeEach
Review Comment:
What about
```
@BeforeEach
void setUp(TestInfo testInfo) throws IOException {
startCluster(2, testInfo);
}
```
?
Seems that `startCluster(2, testInfo);` is called in every test.
##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java:
##########
@@ -240,18 +270,128 @@ void testClusterState() {
assertThat(node1.raftService.readClusterState(),
willCompleteSuccessfully());
assertThat(node2.raftService.readClusterState(),
willCompleteSuccessfully());
- ClusterState state = new ClusterState(List.of("foo"), List.of("bar"));
+ ClusterState state = new ClusterState(
+ List.of("foo"),
+ List.of("bar"),
+ IgniteProductVersion.CURRENT_VERSION,
+ new ClusterTag("cluster")
+ );
assertThat(node1.raftService.writeClusterState(state),
willCompleteSuccessfully());
assertThat(node1.raftService.readClusterState(), willBe(state));
assertThat(node2.raftService.readClusterState(), willBe(state));
- state = new ClusterState(List.of("baz"), List.of("quux"));
+ state = new ClusterState(
+ List.of("baz"),
+ List.of("quux"),
+ IgniteProductVersion.fromString("3.3.3"),
+ new ClusterTag("new cluster")
+ );
assertThat(node2.raftService.writeClusterState(state),
willCompleteSuccessfully());
assertThat(node1.raftService.readClusterState(), willBe(state));
assertThat(node2.raftService.readClusterState(), willBe(state));
}
+
+ /**
+ * Test validation of the Cluster Tag.
+ */
+ @Test
+ void testClusterTagValidation() {
+ Node node1 = cluster.get(0);
+ Node node2 = cluster.get(1);
+
+ ClusterState state = new ClusterState(
+ List.of("foo"),
+ List.of("bar"),
+ IgniteProductVersion.CURRENT_VERSION,
+ new ClusterTag("cluster")
+ );
+
+ assertThat(node1.raftService.writeClusterState(state),
willCompleteSuccessfully());
+
+ // empty tag
+ assertThat(node2.raftService.startJoinCluster(null),
willCompleteSuccessfully());
+
+ // correct tag
+ assertThat(node2.raftService.startJoinCluster(state.clusterTag()),
willCompleteSuccessfully());
+
+ // incorrect tag
+ assertThrowsWithCause(
+ () -> node2.raftService.startJoinCluster(new
ClusterTag("invalid")).get(10, TimeUnit.SECONDS),
+ IgniteInternalException.class,
+ "Join request denied, reason: Cluster tags do not match"
Review Comment:
I believe it'll be useful to print mismatched cluster tags. Like it's done
in versions
`Version: %s, version stored in CMG: 1.2.3`
##########
modules/api/src/main/java/org/apache/ignite/IgnitionManager.java:
##########
@@ -101,64 +106,78 @@ public static CompletableFuture<Ignite> start(String
nodeName, @Nullable Path cf
}
/**
- * Stops the node with given {@code name}. It's possible to stop both
already started node or node that is currently starting. Has no
- * effect if node with specified name doesn't exist.
+ * Stops the node with given {@code nodeName}. It's possible to stop both
already started node or node that is currently starting.
+ * Has no effect if node with specified name doesn't exist.
*
- * @param name Node name to stop.
- * @throws IllegalArgumentException if null is specified instead of node
name.
+ * @param nodeName Node name to stop.
*/
- public static void stop(String name) {
+ public static void stop(String nodeName) {
Ignition ignition =
loadIgnitionService(Thread.currentThread().getContextClassLoader());
- ignition.stop(name);
+ ignition.stop(nodeName);
}
/**
- * Stops the node with given {@code name}. It's possible to stop both
already started node or node that is currently starting. Has no
- * effect if node with specified name doesn't exist.
+ * Stops the node with given {@code nodeName}. It's possible to stop both
already started node or node that is currently starting.
+ * Has no effect if node with specified name doesn't exist.
*
- * @param name Node name to stop.
+ * @param nodeName Node name to stop.
* @param clsLdr The class loader to be used to load
provider-configuration files and provider classes, or {@code null} if the system
* class loader (or, failing that, the bootstrap class
loader) is to be used
- * @throws IllegalArgumentException if null is specified instead of node
name.
*/
- public static void stop(String name, @Nullable ClassLoader clsLdr) {
+ public static void stop(String nodeName, @Nullable ClassLoader clsLdr) {
Ignition ignition = loadIgnitionService(clsLdr);
- ignition.stop(name);
+ ignition.stop(nodeName);
}
/**
* Initializes the cluster that this node is present in.
*
- * @param name name of the node that the initialization request will be
sent to.
+ * @param nodeName Name of the node that the initialization request will
be sent to.
* @param metaStorageNodeNames names of nodes that will host the Meta
Storage and the CMG.
+ * @param clusterName Human-readable name of the cluster.
* @throws IgniteException If the given node has not been started or has
been stopped.
- * @see Ignition#init(String, Collection)
+ * @see Ignition#init(String, Collection, String)
*/
- public static synchronized void init(String name, Collection<String>
metaStorageNodeNames) {
+ public static synchronized void init(String nodeName, Collection<String>
metaStorageNodeNames, String clusterName) {
+ Objects.requireNonNull(nodeName);
+ Objects.requireNonNull(metaStorageNodeNames);
Review Comment:
It's a bit confusing to split validation into two steps here and the one
within ClusterInitializer.java.
I'd rather move whole validation for <>NodeNames and clusterName to the
single entry point - ClusterInitializer and enrich @throws clause with
corresponding IllegalArgumentExceptions and NullPointerExcpetions. WDYT?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java:
##########
@@ -55,15 +58,36 @@ public class ClusterInitializer {
* @param metaStorageNodeNames Names of nodes that will host the Meta
Storage. Cannot be empty.
* @param cmgNodeNames Names of nodes that will host the Cluster
Management Group. Can be empty, in which case {@code
* metaStorageNodeNames} will be used instead.
+ * @param clusterName Human-readable name of the cluster.
* @return Future that represents the state of the operation.
*/
- public CompletableFuture<Void> initCluster(Collection<String>
metaStorageNodeNames, Collection<String> cmgNodeNames) {
+ public CompletableFuture<Void> initCluster(
+ Collection<String> metaStorageNodeNames,
+ Collection<String> cmgNodeNames,
+ String clusterName
+ ) {
+ if (metaStorageNodeNames.isEmpty()) {
Review Comment:
Please check my comment about params validation in IgnitionManager.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationTokenManager.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
+import
org.apache.ignite.internal.cluster.management.raft.responses.JoinDeniedResponse;
+import
org.apache.ignite.internal.cluster.management.raft.responses.NodeValidatedResponse;
+import org.apache.ignite.internal.cluster.management.validation.NodeValidator;
+import
org.apache.ignite.internal.cluster.management.validation.ValidationError;
+import org.apache.ignite.internal.cluster.management.validation.ValidationInfo;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class responsible for managing validation tokens.
+ *
+ * <p>If a node passes the validation successfully, a unique validation token
is issued which exists for a specific period of time.
+ * After the node finishes local recovery procedures, it sends a {@link
JoinReadyCommand} containing the validation
+ * token. If the local token and the received token match, the node will be
added to the logical topology and the token will be invalidated.
+ */
+class ValidationTokenManager implements AutoCloseable {
+ private final ScheduledExecutorService executor =
+ Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("node-validator"));
+
+ private final RaftStorageManager storage;
+
+ /**
+ * Map for storing tasks, submitted to the {@link #executor}, so that it
is possible to cancel them.
+ */
+ private final Map<String, Future<?>> cleanupFutures = new
ConcurrentHashMap<>();
+
+ ValidationTokenManager(RaftStorageManager storage) {
+ this.storage = storage;
+
+ // schedule removal of possibly stale tokens in case the leader has
changed or the node has been restarted
+
storage.getValidatedNodeIds().forEach(this::scheduleValidationTokenRemoval);
+ }
+
+ /**
+ * Validates a given node and issues a validation token.
+ *
+ * @param command Command sent by the node that intends to join the
cluster.
+ * @return {@link NodeValidatedResponse} in case of successful validation
or a {@link JoinDeniedResponse} otherwise.
+ */
+ Serializable validateNode(JoinRequestCommand command) {
+ ClusterState state = storage.getClusterState();
+
+ if (state == null) {
+ return new JoinDeniedResponse("Cluster has not been initialized
yet");
+ }
+
+ var validationInfo = new ValidationInfo(command.igniteVersion(),
command.clusterTag());
+
+ ValidationError validationError = NodeValidator.validateNode(state,
validationInfo);
+
+ if (validationError != null) {
+ return new JoinDeniedResponse(validationError.rejectReason());
+ }
+
+ UUID validationToken = UUID.randomUUID();
+
+ ClusterNode validatedNode = command.node();
+
+ storage.putValidationToken(validatedNode, validationToken);
+
+ scheduleValidationTokenRemoval(validatedNode.id());
+
+ return new NodeValidatedResponse(validationToken);
+ }
+
+ /**
+ * Checks and invalidates the given node's validation token thus
completing the validation procedure.
+ *
+ * @param command Command sent by the node that wishes to join the logical
topology.
+ * @return {@code null} if the tokens match or {@link JoinDeniedResponse}
otherwise.
+ */
+ @Nullable
+ Serializable completeValidation(JoinReadyCommand command) {
+ UUID validationToken = storage.getValidationToken(command.node());
+
+ String nodeId = command.node().id();
+
+ if (validationToken == null) {
+ return new JoinDeniedResponse(String.format("Node \"%s\" has not
yet passed the validation step", nodeId));
+ } else if (!validationToken.equals(command.validationToken())) {
+ return new JoinDeniedResponse("Incorrect validation token");
+ }
+
+ Future<?> cleanupFuture = cleanupFutures.remove(nodeId);
+
+ if (cleanupFuture != null) {
+ cleanupFuture.cancel(false);
+ }
+
+ storage.removeValidationToken(nodeId);
+
+ return null;
+ }
+
+ private void scheduleValidationTokenRemoval(String nodeId) {
+ // TODO: delay should be configurable, see
https://issues.apache.org/jira/browse/IGNITE-16785
+ Future<?> future = executor.schedule(() -> {
+ cleanupFutures.remove(nodeId);
+
+ storage.removeValidationToken(nodeId);
Review Comment:
Let's also log such removal.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java:
##########
@@ -73,15 +81,42 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>>
iterator) {
if (command instanceof WriteStateCommand) {
storage.putClusterState(((WriteStateCommand)
command).clusterState());
+
+ clo.result(null);
} else if (command instanceof JoinRequestCommand) {
- // TODO: perform validation
https://issues.apache.org/jira/browse/IGNITE-16717
+ Serializable response =
validationTokenManager.validateNode((JoinRequestCommand) command);
+
+ clo.result(response);
} else if (command instanceof JoinReadyCommand) {
- storage.putLogicalTopologyNode(((JoinReadyCommand)
command).node());
+ Serializable response =
validationTokenManager.completeValidation((JoinReadyCommand) command);
+
+ // Non-null response means that the node has not passed the
validation step.
+ if (response == null) {
+ addNodeToLogicalTopology((JoinReadyCommand) command);
+ }
+
+ clo.result(response);
} else if (command instanceof NodesLeaveCommand) {
- storage.removeLogicalTopologyNodes(((NodesLeaveCommand)
command).nodes());
+ removeNodesFromLogicalTopology((NodesLeaveCommand) command);
+
+ clo.result(null);
}
+ }
+ }
+
+ private void addNodeToLogicalTopology(JoinReadyCommand command) {
+ storage.putLogicalTopologyNode(command.node());
+
+ log.info("Node {} has been added to the logical topology",
command.node().name());
Review Comment:
Here and in other places (CmgRaftService, ClusterInitializer, etc) log -> LOG
Let's also consistently use `if (LOG.isInfoEnabled())`
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -401,7 +401,7 @@ public CompletableFuture<Ignite> start(@Language("HOCON")
@Nullable String cfg)
return cmgMgr.joinFuture()
// using the default executor to avoid blocking the CMG
Manager threads
- .thenAcceptAsync(v -> {
+ .thenRunAsync(() -> {
Review Comment:
It's better to use NamedThreadPool for async operations.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterInitializer.java:
##########
@@ -73,11 +97,16 @@ public CompletableFuture<Void>
initCluster(Collection<String> metaStorageNodeNam
CmgInitMessage initMessage = msgFactory.cmgInitMessage()
.metaStorageNodes(metaStorageNodeNames)
.cmgNodes(cmgNodeNames)
+ .clusterName(clusterName)
.build();
return invokeMessage(cmgNodes, initMessage)
Review Comment:
I'd add INFO logging for successful init invocation including init params
both original and resolved.
All in all, in order not to miss elements of node join logging flow
including happy path ones, could you please share the set of such entry points?
I believe that we've might miss some.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationTokenManager.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
+import
org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
+import
org.apache.ignite.internal.cluster.management.raft.responses.JoinDeniedResponse;
+import
org.apache.ignite.internal.cluster.management.raft.responses.NodeValidatedResponse;
+import org.apache.ignite.internal.cluster.management.validation.NodeValidator;
+import
org.apache.ignite.internal.cluster.management.validation.ValidationError;
+import org.apache.ignite.internal.cluster.management.validation.ValidationInfo;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class responsible for managing validation tokens.
+ *
+ * <p>If a node passes the validation successfully, a unique validation token
is issued which exists for a specific period of time.
+ * After the node finishes local recovery procedures, it sends a {@link
JoinReadyCommand} containing the validation
+ * token. If the local token and the received token match, the node will be
added to the logical topology and the token will be invalidated.
+ */
+class ValidationTokenManager implements AutoCloseable {
+ private final ScheduledExecutorService executor =
+ Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("node-validator"));
+
+ private final RaftStorageManager storage;
+
+ /**
+ * Map for storing tasks, submitted to the {@link #executor}, so that it
is possible to cancel them.
+ */
+ private final Map<String, Future<?>> cleanupFutures = new
ConcurrentHashMap<>();
+
+ ValidationTokenManager(RaftStorageManager storage) {
+ this.storage = storage;
+
+ // schedule removal of possibly stale tokens in case the leader has
changed or the node has been restarted
+
storage.getValidatedNodeIds().forEach(this::scheduleValidationTokenRemoval);
+ }
+
+ /**
+ * Validates a given node and issues a validation token.
+ *
+ * @param command Command sent by the node that intends to join the
cluster.
+ * @return {@link NodeValidatedResponse} in case of successful validation
or a {@link JoinDeniedResponse} otherwise.
+ */
+ Serializable validateNode(JoinRequestCommand command) {
+ ClusterState state = storage.getClusterState();
+
+ if (state == null) {
+ return new JoinDeniedResponse("Cluster has not been initialized
yet");
+ }
+
+ var validationInfo = new ValidationInfo(command.igniteVersion(),
command.clusterTag());
+
+ ValidationError validationError = NodeValidator.validateNode(state,
validationInfo);
+
+ if (validationError != null) {
+ return new JoinDeniedResponse(validationError.rejectReason());
+ }
+
+ UUID validationToken = UUID.randomUUID();
Review Comment:
As was discussed let's use idempotent validationToken generation.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java:
##########
@@ -108,10 +110,65 @@ void removeLogicalTopologyNodes(Set<ClusterNode> nodes) {
}
private static byte[] logicalTopologyKey(ClusterNode node) {
- byte[] nodeIdBytes = node.id().getBytes(StandardCharsets.UTF_8);
+ return prefixedKey(LOGICAL_TOPOLOGY_PREFIX, node.id());
+ }
+
+ /**
+ * Retrieves the validation token for a given node.
+ *
+ * @return Validation token or {@code null} if it does not exist.
+ */
+ @Nullable
+ UUID getValidationToken(ClusterNode node) {
+ return get(validationTokenKey(node.id()), UUID.class);
+ }
+
+ /**
+ * Saves the validation token for a given node.
+ */
+ void putValidationToken(ClusterNode node, UUID token) {
+ storage.put(validationTokenKey(node.id()), ByteUtils.toBytes(token));
+ }
+
+ /**
+ * Removes the validation token for a given node.
+ */
+ void removeValidationToken(String nodeId) {
+ storage.remove(validationTokenKey(nodeId));
+ }
+
+ private static byte[] validationTokenKey(String nodeId) {
+ return prefixedKey(VALIDATION_TOKEN_PREFIX, nodeId);
+ }
+
+ /**
+ * Returns a collection of node IDs that passed the validation (i.e. node
IDs which have a corresponding validation token).
+ */
+ Collection<String> getValidatedNodeIds() {
Review Comment:
Naming is a bit confusing, cause such nodes will only be available during
node join process. Could you please extent the javadoc?
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -429,11 +426,21 @@ public CompletableFuture<Ignite> start(@Language("HOCON")
@Nullable String cfg)
fut -> new
ConfigurationCatchUpListener(cfgStorage, fut, LOG)
);
- return
CompletableFuture.allOf(notifyConfigurationListeners(), recoveryFuture);
+ return notifyConfigurationListeners()
+ .thenCompose(t -> {
+ // Deploy all registered watches because
all components are ready and have registered their listeners.
+ try {
+ metaStorageMgr.deployWatches();
Review Comment:
Can you please explain why you need to transfer the watch deployment here?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/InitCommand.java:
##########
@@ -30,20 +34,36 @@ public class InitCommand {
private final Collection<String> cmgNodes;
+ private final String clusterName;
+
/**
* Constructor.
*/
@JsonCreator
public InitCommand(
@JsonProperty("metaStorageNodes") Collection<String>
metaStorageNodes,
- @JsonProperty("cmgNodes") Collection<String> cmgNodes
+ @JsonProperty("cmgNodes") Collection<String> cmgNodes,
+ @JsonProperty("clusterName") String clusterName
) {
- if (metaStorageNodes == null || metaStorageNodes.isEmpty()) {
- throw new IllegalArgumentException("Meta Storage node names must
not be empty");
+ if (nullOrEmpty(metaStorageNodes)) {
+ throw new IllegalArgumentException("Meta Storage node names list
must not be empty");
Review Comment:
Same as above, I'd rather use consistent exceptions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]