JAkutenshi commented on code in PR #7507:
URL: https://github.com/apache/ignite-3/pull/7507#discussion_r2815416695
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java:
##########
@@ -25,23 +25,28 @@
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.jetbrains.annotations.Nullable;
/**
- * A service providing operations on a replication group with operations
timeout.
+ * A service providing operations on a replication group with explicit timeout
control.
*
- * <p>Most of operations require a known group leader. The group leader can be
refreshed at any time by calling {@link #refreshLeader()}
- * method, otherwise it will happen automatically on a first call.
+ * <p>Most of operations require a known group leader. The group leader can be
refreshed at any time by calling
+ * {@link #refreshLeader(long)} method, otherwise it will happen automatically
on a first call.
*
* <p>If a leader has been changed while the operation in progress, the
operation will be transparently retried until timeout is reached.
* The current leader will be refreshed automatically (maybe several times) in
the process.
*
- * <p>Each asynchronous method (returning a future) uses a default timeout to
finish, see {@link RaftConfiguration#retryTimeoutMillis()}.
- * If a result is not available within the timeout, the future will be
completed with a {@link TimeoutException}
+ * <p>Each asynchronous method takes a {@code timeoutMillis} parameter with
the following semantics:
+ * <ul>
+ * <li>{@code 0} - single attempt without retries</li>
+ * <li>{@code Long.MAX_VALUE} - infinite wait</li>
+ * <li>negative values - treated as infinite for compatibility</li>
+ * <li>positive values - bounded wait up to the specified timeout</li>
Review Comment:
```suggestion
* <li>{@code 0} - single attempt without retries.</li>
* <li>{@code Long.MAX_VALUE} - infinite wait.</li>
* <li>negative values - treated as infinite for compatibility.</li>
* <li>positive values - bounded wait up to the specified timeout.</li>
```
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -879,20 +1060,27 @@ public boolean trackNoLeaderSeparately() {
* <li>When all peers are exhausted, waits for leader notification</li>
* </ul>
*/
- private class LeaderWaitRetryStrategy implements RetryExecutionStrategy {
- private final CompletableFuture<ActionResponse> fut;
- private final Command cmd;
+ private class LeaderWaitRetryStrategy<R extends NetworkMessage> implements
RetryExecutionStrategy {
Review Comment:
The executor class code became too big, let's extract all classes, its
interfaces, enums inside it up to package-private level.
##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/TimeAwareRaftGroupService.java:
##########
@@ -176,13 +195,16 @@ public interface TimeAwareRaftGroupService {
* @param term Current known leader term.
* If real raft group term will be different - configuration
update will be skipped.
* @param sequenceToken Sequence token of the current change.
+ * @param timeoutMillis Timeout in milliseconds. {@code 0} means single
attempt without retries;
+ * {@code Long.MAX_VALUE} means infinite wait; negative values are
treated as infinite for compatibility.
*
* @return A future.
*/
- CompletableFuture<Void> changePeersAndLearnersAsync(PeersAndLearners
peersAndLearners, @Deprecated long term, long sequenceToken);
+ CompletableFuture<Void> changePeersAndLearnersAsync(
+ PeersAndLearners peersAndLearners, @Deprecated long term, long
sequenceToken, long timeoutMillis);
Review Comment:
```suggestion
CompletableFuture<Void> changePeersAndLearnersAsync(
PeersAndLearners peersAndLearners,
@Deprecated long term,
long sequenceToken,
long timeoutMillis
);
```
^ Here and there would be better to format because at least eventually
`@Deprecated` annotation should get release date for term removal and the line
will be moved again.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -109,80 +132,71 @@ private PhysicalTopologyAwareRaftGroupService(
PeersAndLearners configuration,
Marshaller cmdMarshaller,
ExceptionFactory stoppingExceptionFactory,
- ThrottlingContextHolder throttlingContextHolder,
RaftGroupEventsClientListener eventsClientListener
) {
+ this.groupId = groupId;
this.failureManager = failureManager;
this.clusterService = clusterService;
- this.executor = executor;
- this.raftConfiguration = raftConfiguration;
this.stoppingExceptionFactory = stoppingExceptionFactory;
- this.raftClient = RaftGroupServiceImpl.start(
- groupId,
- clusterService,
- MESSAGES_FACTORY,
- raftConfiguration,
- configuration,
- executor,
- cmdMarshaller,
- stoppingExceptionFactory,
- throttlingContextHolder
- );
+ this.peers = List.copyOf(configuration.peers());
+ this.learners = List.copyOf(configuration.learners());
this.commandExecutor = new RaftCommandExecutor(
groupId,
- raftClient::peers,
+ this::peers,
clusterService,
executor,
raftConfiguration,
cmdMarshaller,
stoppingExceptionFactory
);
+ this.messageSender = new
SubscriptionMessageSender(clusterService.messagingService(), executor,
raftConfiguration);
+
this.generalLeaderElectionListener = new ServerEventHandler(executor);
- eventsClientListener.addLeaderElectionListener(raftClient.groupId(),
generalLeaderElectionListener);
+ this.eventsClientListener = eventsClientListener;
+ eventsClientListener.addLeaderElectionListener(groupId,
generalLeaderElectionListener);
// Subscribe the command executor's leader availability state to
leader election notifications.
subscribeLeader(commandExecutor.leaderElectionListener());
TopologyService topologyService = clusterService.topologyService();
- topologyService.addEventHandler(new TopologyEventHandler() {
+ this.topologyEventHandler = new TopologyEventHandler() {
@Override
public void onAppeared(InternalClusterNode member) {
Review Comment:
Should we do the same `onDisappeared` action? At least in case if the member
node is equal to cached leader?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
Review Comment:
Why to log instance?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
Review Comment:
Let's inline this method, there is the only usage
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/SubscriptionMessageSender.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.SubscriptionLeaderChangeRequest;
+
+/**
+ * Handles sending subscription messages to RAFT group members with retry
logic.
+ */
+class SubscriptionMessageSender {
+ private static final IgniteLogger LOG =
Loggers.forClass(SubscriptionMessageSender.class);
+
+ private final ScheduledExecutorService executor;
+
+ private final RaftConfiguration raftConfiguration;
+
+ private final MessagingService messagingService;
+
+ /**
+ * Constructor.
+ *
+ * @param messagingService Messaging service.
+ * @param executor Executor for async operations.
+ * @param raftConfiguration RAFT configuration.
+ */
+ SubscriptionMessageSender(
+ MessagingService messagingService,
+ ScheduledExecutorService executor,
+ RaftConfiguration raftConfiguration
+ ) {
+ this.messagingService = messagingService;
+ this.executor = executor;
+ this.raftConfiguration = raftConfiguration;
+ }
+
+ /**
+ * Sends a subscription message to the specified node with retry logic.
+ *
+ * @param node Target node.
+ * @param msg Subscription message to send.
+ * @return Future that completes with {@code true} if the message was sent
successfully,
+ * {@code false} if sending failed but should not be treated as an
error,
+ * or completes exceptionally on unrecoverable errors.
+ */
+ CompletableFuture<Boolean> send(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
+ var msgSendFut = new CompletableFuture<Boolean>();
+
+ sendWithRetry(node, msg, msgSendFut);
+
+ return msgSendFut;
+ }
+
+ private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
+ Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
+
+ messagingService.invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
+ if (invokeThrowable == null) {
+ msgSendFut.complete(true);
+
+ return;
+ }
+
+ Throwable invokeCause = unwrapCause(invokeThrowable);
Review Comment:
`retryCause`? or `failedInvokeCause`?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RetryContext.java:
##########
@@ -44,8 +45,6 @@
* calls.
*/
class RetryContext {
- /** Indicates that default response timeout should be used. */
- static final long USE_DEFAULT_RESPONSE_TIMEOUT = -1;
Review Comment:
```suggestion
```
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -203,74 +272,133 @@ void shutdown(Throwable stopException) {
}
/**
- * Resolves initial target peer for a command execution.
+ * Marks the executor as stopping. After this call, operations will fail
with an exception from the
+ * stopping exception factory instead of TimeoutException.
+ */
+ void markAsStopping() {
+ stopping = true;
+ }
+
+ /**
+ * Returns the current leader.
+ *
+ * @return Current leader or {@code null} if unknown.
+ */
+ @Nullable Peer leader() {
+ return leader;
+ }
+
+ /**
+ * Sets the current leader. Used by PhysicalTopologyAwareRaftGroupService
to update state from response.
*
- * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ * @param leader New leader.
+ */
+ void setLeader(@Nullable Peer leader) {
+ this.leader = leader;
+ }
+
+ /**
+ * Updates the leader only if the given term is newer than the currently
known term.
*
+ * <p>This prevents stale leader information from overwriting fresher
information
+ * that may have been received via leader election notifications or
previous refresh calls.
+ *
+ * @param leader New leader (can be {@code null}).
+ * @param term Term associated with this leader.
+ * @return {@code true} if the leader was updated (term is newer), {@code
false} otherwise.
+ */
+ boolean setLeaderIfTermNewer(@Nullable Peer leader, long term) {
+ // Compare against the highest known term from either source:
+ // - leaderAvailabilityState.currentTerm(): term from leader election
notifications
+ // - cachedLeaderTerm: term from previous setLeaderIfTermNewer calls
+ long highestKnownTerm =
Math.max(leaderAvailabilityState.currentTerm(), cachedLeaderTerm);
+
+ if (term > highestKnownTerm) {
+ this.leader = leader;
+ this.cachedLeaderTerm = term;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns the cluster service.
+ *
+ * @return Cluster service.
+ */
+ ClusterService clusterService() {
+ return clusterService;
+ }
+
+ /**
+ * Resolves initial target peer based on strategy.
+ *
+ * @param strategy Target peer strategy.
+ * @param specificPeer Specific peer for SPECIFIC strategy.
* @return Initial target peer, or {@code null}.
*/
- private @Nullable Peer resolveInitialPeer() {
- Peer targetPeer = leader;
- if (targetPeer == null) {
- targetPeer = randomNode(null, false);
+ private @Nullable Peer resolveInitialPeer(TargetPeerStrategy strategy,
@Nullable Peer specificPeer) {
Review Comment:
Would it be a good idea to move it as enum's method? And implement it for
each value instead of switch there. Not sure because of `randomNode`
implementation complexity.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -825,6 +985,11 @@ private interface RetryExecutionStrategy {
* as unavailable.
*/
boolean trackNoLeaderSeparately();
+
+ /**
+ * Returns the target peer strategy used for this request.
+ */
+ TargetPeerStrategy targetStrategy();
Review Comment:
`targetSelectionStrategy`? Without a verb-based name of the method in the
class `*Strategy` looks confusing.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -76,21 +90,30 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
/** Cluster service. */
private final ClusterService clusterService;
- /** RPC RAFT client. */
- private final RaftGroupService raftClient;
-
- /** Executor to invoke RPC requests. */
- private final ScheduledExecutorService executor;
-
- /** RAFT configuration. */
- private final RaftConfiguration raftConfiguration;
-
/** Factory for creating stopping exceptions. */
private final ExceptionFactory stoppingExceptionFactory;
/** Command executor with retry semantics. */
private final RaftCommandExecutor commandExecutor;
+ /** Sender for subscription messages with retry logic. */
+ private final SubscriptionMessageSender messageSender;
Review Comment:
Also, correct me please. Now we have 2 ways of communication:
1. All subscribe logic walks through this `subscriptionMessageSender`.
2. All RAFT commands logic walks through `RaftCommandExecutor`.
The last works on top of our new "smart" retry depends on timeout value, the
first one is our "old" retry approach with recursion.
The first cares about leader only and works as a background activity that
updates leader and term in async way.
The latter cares about RAFT commands that includes asking about current
leader and term on demand. The last one will update local cached leader and
term in case if the method will receive a larger term than the cached one.
Is everything correct?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
- .groupId(groupId())
+ .groupId(groupId)
.subscribe(subscribe)
.build();
}
- private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
- var msgSendFut = new CompletableFuture<Boolean>();
-
- sendWithRetry(node, msg, msgSendFut);
-
- return msgSendFut;
- }
-
- private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
- Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
-
- clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
- if (invokeThrowable == null) {
- msgSendFut.complete(true);
-
- return;
- }
-
- Throwable invokeCause = unwrapCause(invokeThrowable);
- if (!msg.subscribe()) {
- // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
- if (invokeCause instanceof Error) {
- msgSendFut.completeExceptionally(invokeThrowable);
- } else {
- LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
-
- msgSendFut.complete(false);
- }
- } else if (RaftErrorUtils.recoverable(invokeCause)) {
- sendWithRetry(node, msg, msgSendFut);
- } else if (invokeCause instanceof RecipientLeftException) {
- LOG.info(
- "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
- node
- );
-
- msgSendFut.complete(false);
- } else if (invokeCause instanceof NodeStoppingException) {
- msgSendFut.complete(false);
- } else {
- LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
-
- msgSendFut.completeExceptionally(invokeThrowable);
- }
- }, executor);
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
return commandExecutor.run(cmd, timeoutMillis);
}
@Override
public ReplicationGroupId groupId() {
- return raftClient.groupId();
+ return groupId;
}
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ return commandExecutor.leader();
}
@Override
public List<Peer> peers() {
- return raftClient.peers();
+ return peers;
}
@Override
public @Nullable List<Peer> learners() {
- return raftClient.learners();
+ return learners;
}
@Override
- public CompletableFuture<Void> refreshLeader() {
- return raftClient.refreshLeader();
+ public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(parsePeer(resp.leaderId()),
resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
- return raftClient.refreshAndGetLeaderWithTerm();
+ public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm(long
timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenApply(resp -> {
+ if (resp.leaderId() == null) {
+ return LeaderWithTerm.NO_LEADER;
+ }
+
+ Peer respLeader = parsePeer(resp.leaderId());
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(respLeader,
resp.currentTerm());
+ return new LeaderWithTerm(respLeader, resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
- return raftClient.refreshMembers(onlyAlive);
+ public CompletableFuture<Void> refreshMembers(boolean onlyAlive, long
timeoutMillis) {
+ return commandExecutor.<GetPeersResponse>send(
+ peer -> MESSAGES_FACTORY.getPeersRequest()
+ .leaderId(peerId(peer))
+ .onlyAlive(onlyAlive)
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.LEADER,
Review Comment:
It's correct to use LEADER strategy because:
1 . The request implements `BaseCliRequestProcessor#getPeerId` as
`request.leaderId()` so it's rely on invariant that the node that should
process the req is leader
2. Add peers call must be proceed on leader node only
(`unsafeRegisterConfChange` checks node state and if not a leader then conf
update will be refused).
3. So we will have actual peer set for sure only on the leader. And
`getPeersRequest` rely on it, but doesn't check internally. It means you may
pass a random node as `.leaderId(peerId(peer))` from the strategy and it will
work, but the peer set may be stale.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
- .groupId(groupId())
+ .groupId(groupId)
.subscribe(subscribe)
.build();
}
- private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
- var msgSendFut = new CompletableFuture<Boolean>();
-
- sendWithRetry(node, msg, msgSendFut);
-
- return msgSendFut;
- }
-
- private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
- Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
-
- clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
- if (invokeThrowable == null) {
- msgSendFut.complete(true);
-
- return;
- }
-
- Throwable invokeCause = unwrapCause(invokeThrowable);
- if (!msg.subscribe()) {
- // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
- if (invokeCause instanceof Error) {
- msgSendFut.completeExceptionally(invokeThrowable);
- } else {
- LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
-
- msgSendFut.complete(false);
- }
- } else if (RaftErrorUtils.recoverable(invokeCause)) {
- sendWithRetry(node, msg, msgSendFut);
- } else if (invokeCause instanceof RecipientLeftException) {
- LOG.info(
- "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
- node
- );
-
- msgSendFut.complete(false);
- } else if (invokeCause instanceof NodeStoppingException) {
- msgSendFut.complete(false);
- } else {
- LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
-
- msgSendFut.completeExceptionally(invokeThrowable);
- }
- }, executor);
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
return commandExecutor.run(cmd, timeoutMillis);
}
@Override
public ReplicationGroupId groupId() {
- return raftClient.groupId();
+ return groupId;
}
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ return commandExecutor.leader();
}
@Override
public List<Peer> peers() {
- return raftClient.peers();
+ return peers;
}
@Override
public @Nullable List<Peer> learners() {
- return raftClient.learners();
+ return learners;
}
@Override
- public CompletableFuture<Void> refreshLeader() {
- return raftClient.refreshLeader();
+ public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(parsePeer(resp.leaderId()),
resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
- return raftClient.refreshAndGetLeaderWithTerm();
+ public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm(long
timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenApply(resp -> {
+ if (resp.leaderId() == null) {
+ return LeaderWithTerm.NO_LEADER;
+ }
+
+ Peer respLeader = parsePeer(resp.leaderId());
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(respLeader,
resp.currentTerm());
+ return new LeaderWithTerm(respLeader, resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
- return raftClient.refreshMembers(onlyAlive);
+ public CompletableFuture<Void> refreshMembers(boolean onlyAlive, long
timeoutMillis) {
+ return commandExecutor.<GetPeersResponse>send(
+ peer -> MESSAGES_FACTORY.getPeersRequest()
+ .leaderId(peerId(peer))
+ .onlyAlive(onlyAlive)
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ this.peers = parsePeerList(resp.peersList());
+ this.learners = parsePeerList(resp.learnersList());
+ });
}
@Override
- public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
- return raftClient.addPeer(peer, sequenceToken);
+ public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken, long
timeoutMillis) {
+ return commandExecutor.<AddPeerResponse>send(
+ targetPeer -> MESSAGES_FACTORY.addPeerRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .peerId(peerId(peer))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.peers = parsePeerList(resp.newPeersList()));
}
@Override
- public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
- return raftClient.removePeer(peer, sequenceToken);
+ public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken,
long timeoutMillis) {
+ return commandExecutor.<RemovePeerResponse>send(
+ targetPeer -> MESSAGES_FACTORY.removePeerRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .peerId(peerId(peer))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.peers = parsePeerList(resp.newPeersList()));
}
@Override
- public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners
peersAndLearners, long term, long sequenceToken) {
- return raftClient.changePeersAndLearners(peersAndLearners, term,
sequenceToken);
+ public CompletableFuture<Void> changePeersAndLearners(
+ PeersAndLearners peersAndLearners, long term, long sequenceToken,
long timeoutMillis) {
+ LOG.info("Sending changePeersAndLearners request for group={} to
peers={} and learners={} with leader term={}",
+ groupId, peersAndLearners.peers(),
peersAndLearners.learners(), term);
+
+ return commandExecutor.<ChangePeersAndLearnersResponse>send(
+ targetPeer -> MESSAGES_FACTORY.changePeersAndLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .term(term)
+ .newPeersList(peerIds(peersAndLearners.peers()))
+ .newLearnersList(peerIds(peersAndLearners.learners()))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+ this.learners = parsePeerList(resp.newLearnersList());
+ });
}
@Override
- public CompletableFuture<Void>
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long
sequenceToken) {
- return raftClient.changePeersAndLearnersAsync(peersAndLearners, term,
sequenceToken);
+ public CompletableFuture<Void> changePeersAndLearnersAsync(
+ PeersAndLearners peersAndLearners, long term, long sequenceToken,
long timeoutMillis) {
+ LOG.info("Sending changePeersAndLearnersAsync request for group={} to
peers={} and learners={} with leader term={}",
+ groupId, peersAndLearners.peers(),
peersAndLearners.learners(), term);
+
+ return commandExecutor.<ChangePeersAndLearnersAsyncResponse>send(
+ targetPeer ->
MESSAGES_FACTORY.changePeersAndLearnersAsyncRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .term(term)
+ .newPeersList(peerIds(peersAndLearners.peers()))
+ .newLearnersList(peerIds(peersAndLearners.learners()))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+ this.learners = parsePeerList(resp.newLearnersList());
+ });
}
@Override
- public CompletableFuture<Void> addLearners(Collection<Peer> learners, long
sequenceToken) {
- return raftClient.addLearners(learners, sequenceToken);
+ public CompletableFuture<Void> addLearners(Collection<Peer> learners, long
sequenceToken, long timeoutMillis) {
+ return commandExecutor.<LearnersOpResponse>send(
+ targetPeer -> MESSAGES_FACTORY.addLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .learnersList(peerIds(learners))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
}
@Override
- public CompletableFuture<Void> removeLearners(Collection<Peer> learners,
long sequenceToken) {
- return raftClient.removeLearners(learners, sequenceToken);
+ public CompletableFuture<Void> removeLearners(Collection<Peer> learners,
long sequenceToken, long timeoutMillis) {
+ return commandExecutor.<LearnersOpResponse>send(
+ targetPeer -> MESSAGES_FACTORY.removeLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .learnersList(peerIds(learners))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
}
@Override
- public CompletableFuture<Void> resetLearners(Collection<Peer> learners,
long sequenceToken) {
- return raftClient.resetLearners(learners, sequenceToken);
+ public CompletableFuture<Void> resetLearners(Collection<Peer> learners,
long sequenceToken, long timeoutMillis) {
+ return commandExecutor.<LearnersOpResponse>send(
+ targetPeer -> MESSAGES_FACTORY.resetLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .learnersList(peerIds(learners))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
}
@Override
- public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
- return raftClient.snapshot(peer, forced);
+ public CompletableFuture<Void> snapshot(Peer peer, boolean forced, long
timeoutMillis) {
+ return commandExecutor.send(
+ targetPeer -> MESSAGES_FACTORY.snapshotRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .forced(forced)
+ .build(),
+ TargetPeerStrategy.SPECIFIC,
+ peer,
+ timeoutMillis
+ ).thenApply(unused -> null);
}
@Override
- public CompletableFuture<Void> transferLeadership(Peer newLeader) {
- return raftClient.transferLeadership(newLeader);
+ public CompletableFuture<Void> transferLeadership(Peer newLeader, long
timeoutMillis) {
+ return commandExecutor.send(
+ targetPeer -> MESSAGES_FACTORY.transferLeaderRequest()
+ .groupId(groupId.toString())
+ .leaderId(peerId(targetPeer))
+ .peerId(peerId(newLeader))
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> commandExecutor.setLeader(newLeader));
}
@Override
public void shutdown() {
- // Stop the command executor first - blocks new run() calls, cancels
leader waiters.
+ // Remove topology event handler to prevent new topology events from
triggering activity.
+
clusterService.topologyService().removeEventHandler(topologyEventHandler);
+
+ // Remove leader election listener to prevent memory leaks and stop
receiving events.
+ eventsClientListener.removeLeaderElectionListener(groupId,
generalLeaderElectionListener);
+
+ // Stop the command executor - blocks new run() calls, cancels leader
waiters.
commandExecutor.shutdown(stoppingExceptionFactory.create("Raft client
is stopping [groupId=" + groupId() + "]."));
+ // Unsubscribe from leader updates with timeout to ensure clean
shutdown.
finishSubscriptions();
-
- raftClient.shutdown();
}
@Override
- public CompletableFuture<Long> readIndex() {
- return raftClient.readIndex();
+ public CompletableFuture<Long> readIndex(long timeoutMillis) {
+ return commandExecutor.<ReadIndexResponse>send(
+ peer -> MESSAGES_FACTORY.readIndexRequest()
+ .groupId(groupId.toString())
+ .peerId(peer.consistentId())
+ .serverId(peer.consistentId())
+ .build(),
+ TargetPeerStrategy.LEADER,
Review Comment:
Correct too, leader has the most up to date index.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -169,16 +185,69 @@ LeaderElectionListener leaderElectionListener() {
* @return Future that completes with the command result.
*/
<R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
+ return this.<ActionResponse>send(
+ createRequestFactory(cmd),
+ TargetPeerStrategy.LEADER,
+ null,
+ timeoutMillis
+ ).thenApply(resp -> (R) resp.result());
+ }
+
+ /**
+ * Sends a request with leader-aware retry semantics.
+ *
+ * @param requestFactory Factory creating requests for target peer.
+ * @param targetStrategy How to select the initial target peer.
+ * @param timeoutMillis Timeout (0=single attempt, MAX_VALUE=infinite,
positive=bounded).
+ * @param <R> Response type.
+ * @return Future with response.
+ */
+ <R extends NetworkMessage> CompletableFuture<R> send(
+ Function<Peer, ? extends NetworkMessage> requestFactory,
+ TargetPeerStrategy targetStrategy,
+ long timeoutMillis
+ ) {
+ return send(requestFactory, targetStrategy, null, timeoutMillis);
+ }
+
+ /**
+ * Sends a request with leader-aware retry semantics.
+ *
+ * @param requestFactory Factory creating requests for target peer.
+ * @param targetStrategy How to select the initial target peer.
+ * @param specificPeer Target peer for SPECIFIC strategy (ignored for
other strategies).
+ * @param timeoutMillis Timeout (0=single attempt, MAX_VALUE=infinite,
positive=bounded).
+ * @param <R> Response type.
+ * @return Future with response.
+ */
+ <R extends NetworkMessage> CompletableFuture<R> send(
+ Function<Peer, ? extends NetworkMessage> requestFactory,
+ TargetPeerStrategy targetStrategy,
+ @Nullable Peer specificPeer,
+ long timeoutMillis
+ ) {
// Normalize timeout: negative values mean infinite wait.
long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE :
timeoutMillis;
- // Wait for leader mode (bounded or infinite).
long deadline = Utils.monotonicMsAfter(effectiveTimeout);
return executeWithBusyLock(responseFuture -> {
+ Peer initialPeer = resolveInitialPeer(targetStrategy,
specificPeer);
Review Comment:
```suggestion
Peer initialPeer = resolveInitialPeer(targetStrategy,
specificPeer);
```
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -203,74 +272,133 @@ void shutdown(Throwable stopException) {
}
/**
- * Resolves initial target peer for a command execution.
+ * Marks the executor as stopping. After this call, operations will fail
with an exception from the
+ * stopping exception factory instead of TimeoutException.
+ */
+ void markAsStopping() {
+ stopping = true;
+ }
+
+ /**
+ * Returns the current leader.
+ *
+ * @return Current leader or {@code null} if unknown.
+ */
+ @Nullable Peer leader() {
+ return leader;
+ }
+
+ /**
+ * Sets the current leader. Used by PhysicalTopologyAwareRaftGroupService
to update state from response.
*
- * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ * @param leader New leader.
+ */
+ void setLeader(@Nullable Peer leader) {
+ this.leader = leader;
+ }
+
+ /**
+ * Updates the leader only if the given term is newer than the currently
known term.
*
+ * <p>This prevents stale leader information from overwriting fresher
information
+ * that may have been received via leader election notifications or
previous refresh calls.
+ *
+ * @param leader New leader (can be {@code null}).
+ * @param term Term associated with this leader.
+ * @return {@code true} if the leader was updated (term is newer), {@code
false} otherwise.
+ */
+ boolean setLeaderIfTermNewer(@Nullable Peer leader, long term) {
+ // Compare against the highest known term from either source:
+ // - leaderAvailabilityState.currentTerm(): term from leader election
notifications
+ // - cachedLeaderTerm: term from previous setLeaderIfTermNewer calls
+ long highestKnownTerm =
Math.max(leaderAvailabilityState.currentTerm(), cachedLeaderTerm);
Review Comment:
Why we can't use `leaderAvailabilityState` as the single source of truth
about current local cache of leader and term?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/SubscriptionMessageSender.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.SubscriptionLeaderChangeRequest;
+
+/**
+ * Handles sending subscription messages to RAFT group members with retry
logic.
+ */
+class SubscriptionMessageSender {
+ private static final IgniteLogger LOG =
Loggers.forClass(SubscriptionMessageSender.class);
+
+ private final ScheduledExecutorService executor;
+
+ private final RaftConfiguration raftConfiguration;
+
+ private final MessagingService messagingService;
+
+ /**
+ * Constructor.
+ *
+ * @param messagingService Messaging service.
+ * @param executor Executor for async operations.
+ * @param raftConfiguration RAFT configuration.
+ */
+ SubscriptionMessageSender(
+ MessagingService messagingService,
+ ScheduledExecutorService executor,
+ RaftConfiguration raftConfiguration
+ ) {
+ this.messagingService = messagingService;
+ this.executor = executor;
+ this.raftConfiguration = raftConfiguration;
+ }
+
+ /**
+ * Sends a subscription message to the specified node with retry logic.
+ *
+ * @param node Target node.
+ * @param msg Subscription message to send.
+ * @return Future that completes with {@code true} if the message was sent
successfully,
+ * {@code false} if sending failed but should not be treated as an
error,
+ * or completes exceptionally on unrecoverable errors.
+ */
+ CompletableFuture<Boolean> send(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
+ var msgSendFut = new CompletableFuture<Boolean>();
+
+ sendWithRetry(node, msg, msgSendFut);
+
+ return msgSendFut;
+ }
+
+ private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
+ Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
+
+ messagingService.invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
+ if (invokeThrowable == null) {
+ msgSendFut.complete(true);
+
+ return;
+ }
+
+ Throwable invokeCause = unwrapCause(invokeThrowable);
+ if (!msg.subscribe()) {
+ // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
+ if (invokeCause instanceof Error) {
+ msgSendFut.completeExceptionally(invokeThrowable);
+ } else {
+ LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
+
+ msgSendFut.complete(false);
+ }
+ } else if (RaftErrorUtils.recoverable(invokeCause)) {
+ sendWithRetry(node, msg, msgSendFut);
+ } else if (invokeCause instanceof RecipientLeftException) {
+ LOG.info(
+ "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
+ node
+ );
+
+ msgSendFut.complete(false);
+ } else if (invokeCause instanceof HandshakeException) {
+ LOG.info(
+ "Could not subscribe to leader update from a specific
node, because the handshake failed "
+ + "(node may be unavailable): [node={}].",
+ node
+ );
+
+ msgSendFut.complete(false);
+ } else if (invokeCause instanceof CancellationException) {
+ LOG.info(
+ "Could not subscribe to leader update from a specific
node, because the operation was cancelled "
+ + "(node may be stopping): [node={}].",
+ node
+ );
+
+ msgSendFut.complete(false);
+ } else if (invokeCause instanceof NodeStoppingException) {
+ msgSendFut.complete(false);
Review Comment:
No logging because it will be handled upper on stack?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -76,21 +90,30 @@ public class PhysicalTopologyAwareRaftGroupService
implements TimeAwareRaftGroup
/** Cluster service. */
private final ClusterService clusterService;
- /** RPC RAFT client. */
- private final RaftGroupService raftClient;
-
- /** Executor to invoke RPC requests. */
- private final ScheduledExecutorService executor;
-
- /** RAFT configuration. */
- private final RaftConfiguration raftConfiguration;
-
/** Factory for creating stopping exceptions. */
private final ExceptionFactory stoppingExceptionFactory;
/** Command executor with retry semantics. */
private final RaftCommandExecutor commandExecutor;
+ /** Sender for subscription messages with retry logic. */
+ private final SubscriptionMessageSender messageSender;
Review Comment:
Let's call it `subscriptionMessageSender` too, because I got "aha" moment
only when figured out that it's not just the common message service, but
specific sender with retry logic inside.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -834,15 +999,26 @@ private interface RetryExecutionStrategy {
* as unavailable. When all peers have been tried, the request fails with
* {@link ReplicationGroupUnavailableException}.
*/
- private class SingleAttemptRetryStrategy implements RetryExecutionStrategy
{
- private final CompletableFuture<? extends NetworkMessage> fut;
+ private class SingleAttemptRetryStrategy<R extends NetworkMessage>
implements RetryExecutionStrategy {
+ private final CompletableFuture<R> fut;
Review Comment:
Here and for leader strategy lets name this as `futureInvokeResult`. It
takes a time to figure out what a `fut` is there that is completed in some
handle in far away code.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -203,74 +272,133 @@ void shutdown(Throwable stopException) {
}
/**
- * Resolves initial target peer for a command execution.
+ * Marks the executor as stopping. After this call, operations will fail
with an exception from the
+ * stopping exception factory instead of TimeoutException.
+ */
+ void markAsStopping() {
+ stopping = true;
+ }
+
+ /**
+ * Returns the current leader.
+ *
+ * @return Current leader or {@code null} if unknown.
+ */
+ @Nullable Peer leader() {
+ return leader;
+ }
+
+ /**
+ * Sets the current leader. Used by PhysicalTopologyAwareRaftGroupService
to update state from response.
*
- * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ * @param leader New leader.
+ */
+ void setLeader(@Nullable Peer leader) {
+ this.leader = leader;
+ }
+
+ /**
+ * Updates the leader only if the given term is newer than the currently
known term.
*
+ * <p>This prevents stale leader information from overwriting fresher
information
+ * that may have been received via leader election notifications or
previous refresh calls.
+ *
+ * @param leader New leader (can be {@code null}).
+ * @param term Term associated with this leader.
+ * @return {@code true} if the leader was updated (term is newer), {@code
false} otherwise.
+ */
+ boolean setLeaderIfTermNewer(@Nullable Peer leader, long term) {
+ // Compare against the highest known term from either source:
+ // - leaderAvailabilityState.currentTerm(): term from leader election
notifications
+ // - cachedLeaderTerm: term from previous setLeaderIfTermNewer calls
+ long highestKnownTerm =
Math.max(leaderAvailabilityState.currentTerm(), cachedLeaderTerm);
Review Comment:
I mean to make `onLeaderElected` -> `updateKnownLeaderAndTerm` and use it
both for background update through subscriptions and for manual update on
`refreshLeader*` calls from the client? State object is synchronized so there
shouldn't be a concurrency problems. Stale leader term guard is presented. What
I missed?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
- .groupId(groupId())
+ .groupId(groupId)
.subscribe(subscribe)
.build();
}
- private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
- var msgSendFut = new CompletableFuture<Boolean>();
-
- sendWithRetry(node, msg, msgSendFut);
-
- return msgSendFut;
- }
-
- private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
- Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
-
- clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
- if (invokeThrowable == null) {
- msgSendFut.complete(true);
-
- return;
- }
-
- Throwable invokeCause = unwrapCause(invokeThrowable);
- if (!msg.subscribe()) {
- // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
- if (invokeCause instanceof Error) {
- msgSendFut.completeExceptionally(invokeThrowable);
- } else {
- LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
-
- msgSendFut.complete(false);
- }
- } else if (RaftErrorUtils.recoverable(invokeCause)) {
- sendWithRetry(node, msg, msgSendFut);
- } else if (invokeCause instanceof RecipientLeftException) {
- LOG.info(
- "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
- node
- );
-
- msgSendFut.complete(false);
- } else if (invokeCause instanceof NodeStoppingException) {
- msgSendFut.complete(false);
- } else {
- LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
-
- msgSendFut.completeExceptionally(invokeThrowable);
- }
- }, executor);
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
return commandExecutor.run(cmd, timeoutMillis);
}
@Override
public ReplicationGroupId groupId() {
- return raftClient.groupId();
+ return groupId;
}
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ return commandExecutor.leader();
}
@Override
public List<Peer> peers() {
- return raftClient.peers();
+ return peers;
}
@Override
public @Nullable List<Peer> learners() {
- return raftClient.learners();
+ return learners;
}
@Override
- public CompletableFuture<Void> refreshLeader() {
- return raftClient.refreshLeader();
+ public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
Review Comment:
It's ok to use `RANDOM`, because jRAFT under the hood looks at the receiver
node state (LEADER vs FOLLOWER) and find a leader depends on it.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/RaftCommandExecutor.java:
##########
@@ -203,74 +272,133 @@ void shutdown(Throwable stopException) {
}
/**
- * Resolves initial target peer for a command execution.
+ * Marks the executor as stopping. After this call, operations will fail
with an exception from the
+ * stopping exception factory instead of TimeoutException.
+ */
+ void markAsStopping() {
+ stopping = true;
+ }
+
+ /**
+ * Returns the current leader.
+ *
+ * @return Current leader or {@code null} if unknown.
+ */
+ @Nullable Peer leader() {
+ return leader;
+ }
+
+ /**
+ * Sets the current leader. Used by PhysicalTopologyAwareRaftGroupService
to update state from response.
*
- * <p>Tries the known leader first, falling back to a random peer if no
leader is known.
+ * @param leader New leader.
+ */
+ void setLeader(@Nullable Peer leader) {
Review Comment:
It's a strange method. On the one hand it used for transfer leadership and I
can't understand why it shouldn't change term. I see that beside tests it has
been used (in the original RAFT client) in placement driver on lease grant
message handling and I'm afraid of this mechanic.
And the second one is used for configuration update (imo, it's more like
reset).
I guess both usages are rely on background leader update handling with term
in the event. Am I right there?
And to the question about leader state object -- should we use it too there?
In case of peers reset `{ leader=null, term=-1 }` as initial, in case of
transfer leadership I'm not sure about the term. Should we retrieve a new term
after the transfer with the response?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]