JAkutenshi commented on code in PR #7507:
URL: https://github.com/apache/ignite-3/pull/7507#discussion_r2872745918
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
- .groupId(groupId())
+ .groupId(groupId)
.subscribe(subscribe)
.build();
}
- private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
- var msgSendFut = new CompletableFuture<Boolean>();
-
- sendWithRetry(node, msg, msgSendFut);
-
- return msgSendFut;
- }
-
- private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
- Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
-
- clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
- if (invokeThrowable == null) {
- msgSendFut.complete(true);
-
- return;
- }
-
- Throwable invokeCause = unwrapCause(invokeThrowable);
- if (!msg.subscribe()) {
- // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
- if (invokeCause instanceof Error) {
- msgSendFut.completeExceptionally(invokeThrowable);
- } else {
- LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
-
- msgSendFut.complete(false);
- }
- } else if (RaftErrorUtils.recoverable(invokeCause)) {
- sendWithRetry(node, msg, msgSendFut);
- } else if (invokeCause instanceof RecipientLeftException) {
- LOG.info(
- "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
- node
- );
-
- msgSendFut.complete(false);
- } else if (invokeCause instanceof NodeStoppingException) {
- msgSendFut.complete(false);
- } else {
- LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
-
- msgSendFut.completeExceptionally(invokeThrowable);
- }
- }, executor);
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
return commandExecutor.run(cmd, timeoutMillis);
}
@Override
public ReplicationGroupId groupId() {
- return raftClient.groupId();
+ return groupId;
}
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ return commandExecutor.leader();
}
@Override
public List<Peer> peers() {
- return raftClient.peers();
+ return peers;
}
@Override
public @Nullable List<Peer> learners() {
- return raftClient.learners();
+ return learners;
}
@Override
- public CompletableFuture<Void> refreshLeader() {
- return raftClient.refreshLeader();
+ public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
Review Comment:
Just a comment ^. Resolved.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
- .groupId(groupId())
+ .groupId(groupId)
.subscribe(subscribe)
.build();
}
- private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
- var msgSendFut = new CompletableFuture<Boolean>();
-
- sendWithRetry(node, msg, msgSendFut);
-
- return msgSendFut;
- }
-
- private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
- Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
-
- clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
- if (invokeThrowable == null) {
- msgSendFut.complete(true);
-
- return;
- }
-
- Throwable invokeCause = unwrapCause(invokeThrowable);
- if (!msg.subscribe()) {
- // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
- if (invokeCause instanceof Error) {
- msgSendFut.completeExceptionally(invokeThrowable);
- } else {
- LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
-
- msgSendFut.complete(false);
- }
- } else if (RaftErrorUtils.recoverable(invokeCause)) {
- sendWithRetry(node, msg, msgSendFut);
- } else if (invokeCause instanceof RecipientLeftException) {
- LOG.info(
- "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
- node
- );
-
- msgSendFut.complete(false);
- } else if (invokeCause instanceof NodeStoppingException) {
- msgSendFut.complete(false);
- } else {
- LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
-
- msgSendFut.completeExceptionally(invokeThrowable);
- }
- }, executor);
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
return commandExecutor.run(cmd, timeoutMillis);
}
@Override
public ReplicationGroupId groupId() {
- return raftClient.groupId();
+ return groupId;
}
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ return commandExecutor.leader();
}
@Override
public List<Peer> peers() {
- return raftClient.peers();
+ return peers;
}
@Override
public @Nullable List<Peer> learners() {
- return raftClient.learners();
+ return learners;
}
@Override
- public CompletableFuture<Void> refreshLeader() {
- return raftClient.refreshLeader();
+ public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(parsePeer(resp.leaderId()),
resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
- return raftClient.refreshAndGetLeaderWithTerm();
+ public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm(long
timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenApply(resp -> {
+ if (resp.leaderId() == null) {
+ return LeaderWithTerm.NO_LEADER;
+ }
+
+ Peer respLeader = parsePeer(resp.leaderId());
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(respLeader,
resp.currentTerm());
+ return new LeaderWithTerm(respLeader, resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
- return raftClient.refreshMembers(onlyAlive);
+ public CompletableFuture<Void> refreshMembers(boolean onlyAlive, long
timeoutMillis) {
+ return commandExecutor.<GetPeersResponse>send(
+ peer -> MESSAGES_FACTORY.getPeersRequest()
+ .leaderId(peerId(peer))
+ .onlyAlive(onlyAlive)
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.LEADER,
Review Comment:
Just a comment ^. Resolved.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener
callback) {
private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
- .groupId(groupId())
+ .groupId(groupId)
.subscribe(subscribe)
.build();
}
- private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
- var msgSendFut = new CompletableFuture<Boolean>();
-
- sendWithRetry(node, msg, msgSendFut);
-
- return msgSendFut;
- }
-
- private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
- Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
-
- clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
- if (invokeThrowable == null) {
- msgSendFut.complete(true);
-
- return;
- }
-
- Throwable invokeCause = unwrapCause(invokeThrowable);
- if (!msg.subscribe()) {
- // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
- if (invokeCause instanceof Error) {
- msgSendFut.completeExceptionally(invokeThrowable);
- } else {
- LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
-
- msgSendFut.complete(false);
- }
- } else if (RaftErrorUtils.recoverable(invokeCause)) {
- sendWithRetry(node, msg, msgSendFut);
- } else if (invokeCause instanceof RecipientLeftException) {
- LOG.info(
- "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
- node
- );
-
- msgSendFut.complete(false);
- } else if (invokeCause instanceof NodeStoppingException) {
- msgSendFut.complete(false);
- } else {
- LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
-
- msgSendFut.completeExceptionally(invokeThrowable);
- }
- }, executor);
- }
-
@Override
public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
return commandExecutor.run(cmd, timeoutMillis);
}
@Override
public ReplicationGroupId groupId() {
- return raftClient.groupId();
+ return groupId;
}
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ return commandExecutor.leader();
}
@Override
public List<Peer> peers() {
- return raftClient.peers();
+ return peers;
}
@Override
public @Nullable List<Peer> learners() {
- return raftClient.learners();
+ return learners;
}
@Override
- public CompletableFuture<Void> refreshLeader() {
- return raftClient.refreshLeader();
+ public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(parsePeer(resp.leaderId()),
resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
- return raftClient.refreshAndGetLeaderWithTerm();
+ public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm(long
timeoutMillis) {
+ return commandExecutor.<GetLeaderResponse>send(
+ peer -> MESSAGES_FACTORY.getLeaderRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.RANDOM,
+ timeoutMillis
+ ).thenApply(resp -> {
+ if (resp.leaderId() == null) {
+ return LeaderWithTerm.NO_LEADER;
+ }
+
+ Peer respLeader = parsePeer(resp.leaderId());
+ // Only update cached leader if the term is newer to avoid
overwriting
+ // fresher info from leader election notifications with stale
responses.
+ commandExecutor.setLeaderIfTermNewer(respLeader,
resp.currentTerm());
+ return new LeaderWithTerm(respLeader, resp.currentTerm());
+ });
}
@Override
- public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
- return raftClient.refreshMembers(onlyAlive);
+ public CompletableFuture<Void> refreshMembers(boolean onlyAlive, long
timeoutMillis) {
+ return commandExecutor.<GetPeersResponse>send(
+ peer -> MESSAGES_FACTORY.getPeersRequest()
+ .leaderId(peerId(peer))
+ .onlyAlive(onlyAlive)
+ .groupId(groupId.toString())
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ this.peers = parsePeerList(resp.peersList());
+ this.learners = parsePeerList(resp.learnersList());
+ });
}
@Override
- public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
- return raftClient.addPeer(peer, sequenceToken);
+ public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken, long
timeoutMillis) {
+ return commandExecutor.<AddPeerResponse>send(
+ targetPeer -> MESSAGES_FACTORY.addPeerRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .peerId(peerId(peer))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.peers = parsePeerList(resp.newPeersList()));
}
@Override
- public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
- return raftClient.removePeer(peer, sequenceToken);
+ public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken,
long timeoutMillis) {
+ return commandExecutor.<RemovePeerResponse>send(
+ targetPeer -> MESSAGES_FACTORY.removePeerRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .peerId(peerId(peer))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.peers = parsePeerList(resp.newPeersList()));
}
@Override
- public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners
peersAndLearners, long term, long sequenceToken) {
- return raftClient.changePeersAndLearners(peersAndLearners, term,
sequenceToken);
+ public CompletableFuture<Void> changePeersAndLearners(
+ PeersAndLearners peersAndLearners, long term, long sequenceToken,
long timeoutMillis) {
+ LOG.info("Sending changePeersAndLearners request for group={} to
peers={} and learners={} with leader term={}",
+ groupId, peersAndLearners.peers(),
peersAndLearners.learners(), term);
+
+ return commandExecutor.<ChangePeersAndLearnersResponse>send(
+ targetPeer -> MESSAGES_FACTORY.changePeersAndLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .term(term)
+ .newPeersList(peerIds(peersAndLearners.peers()))
+ .newLearnersList(peerIds(peersAndLearners.learners()))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+ this.learners = parsePeerList(resp.newLearnersList());
+ });
}
@Override
- public CompletableFuture<Void>
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long
sequenceToken) {
- return raftClient.changePeersAndLearnersAsync(peersAndLearners, term,
sequenceToken);
+ public CompletableFuture<Void> changePeersAndLearnersAsync(
+ PeersAndLearners peersAndLearners, long term, long sequenceToken,
long timeoutMillis) {
+ LOG.info("Sending changePeersAndLearnersAsync request for group={} to
peers={} and learners={} with leader term={}",
+ groupId, peersAndLearners.peers(),
peersAndLearners.learners(), term);
+
+ return commandExecutor.<ChangePeersAndLearnersAsyncResponse>send(
+ targetPeer ->
MESSAGES_FACTORY.changePeersAndLearnersAsyncRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .term(term)
+ .newPeersList(peerIds(peersAndLearners.peers()))
+ .newLearnersList(peerIds(peersAndLearners.learners()))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+ this.learners = parsePeerList(resp.newLearnersList());
+ });
}
@Override
- public CompletableFuture<Void> addLearners(Collection<Peer> learners, long
sequenceToken) {
- return raftClient.addLearners(learners, sequenceToken);
+ public CompletableFuture<Void> addLearners(Collection<Peer> learners, long
sequenceToken, long timeoutMillis) {
+ return commandExecutor.<LearnersOpResponse>send(
+ targetPeer -> MESSAGES_FACTORY.addLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .learnersList(peerIds(learners))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
}
@Override
- public CompletableFuture<Void> removeLearners(Collection<Peer> learners,
long sequenceToken) {
- return raftClient.removeLearners(learners, sequenceToken);
+ public CompletableFuture<Void> removeLearners(Collection<Peer> learners,
long sequenceToken, long timeoutMillis) {
+ return commandExecutor.<LearnersOpResponse>send(
+ targetPeer -> MESSAGES_FACTORY.removeLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .learnersList(peerIds(learners))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
}
@Override
- public CompletableFuture<Void> resetLearners(Collection<Peer> learners,
long sequenceToken) {
- return raftClient.resetLearners(learners, sequenceToken);
+ public CompletableFuture<Void> resetLearners(Collection<Peer> learners,
long sequenceToken, long timeoutMillis) {
+ return commandExecutor.<LearnersOpResponse>send(
+ targetPeer -> MESSAGES_FACTORY.resetLearnersRequest()
+ .leaderId(peerId(targetPeer))
+ .groupId(groupId.toString())
+ .learnersList(peerIds(learners))
+ .sequenceToken(sequenceToken)
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
}
@Override
- public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
- return raftClient.snapshot(peer, forced);
+ public CompletableFuture<Void> snapshot(Peer peer, boolean forced, long
timeoutMillis) {
+ return commandExecutor.send(
+ targetPeer -> MESSAGES_FACTORY.snapshotRequest()
+ .peerId(peerId(peer))
+ .groupId(groupId.toString())
+ .forced(forced)
+ .build(),
+ TargetPeerStrategy.SPECIFIC,
+ peer,
+ timeoutMillis
+ ).thenApply(unused -> null);
}
@Override
- public CompletableFuture<Void> transferLeadership(Peer newLeader) {
- return raftClient.transferLeadership(newLeader);
+ public CompletableFuture<Void> transferLeadership(Peer newLeader, long
timeoutMillis) {
+ return commandExecutor.send(
+ targetPeer -> MESSAGES_FACTORY.transferLeaderRequest()
+ .groupId(groupId.toString())
+ .leaderId(peerId(targetPeer))
+ .peerId(peerId(newLeader))
+ .build(),
+ TargetPeerStrategy.LEADER,
+ timeoutMillis
+ ).thenAccept(resp -> commandExecutor.setLeader(newLeader));
}
@Override
public void shutdown() {
- // Stop the command executor first - blocks new run() calls, cancels
leader waiters.
+ // Remove topology event handler to prevent new topology events from
triggering activity.
+
clusterService.topologyService().removeEventHandler(topologyEventHandler);
+
+ // Remove leader election listener to prevent memory leaks and stop
receiving events.
+ eventsClientListener.removeLeaderElectionListener(groupId,
generalLeaderElectionListener);
+
+ // Stop the command executor - blocks new run() calls, cancels leader
waiters.
commandExecutor.shutdown(stoppingExceptionFactory.create("Raft client
is stopping [groupId=" + groupId() + "]."));
+ // Unsubscribe from leader updates with timeout to ensure clean
shutdown.
finishSubscriptions();
-
- raftClient.shutdown();
}
@Override
- public CompletableFuture<Long> readIndex() {
- return raftClient.readIndex();
+ public CompletableFuture<Long> readIndex(long timeoutMillis) {
+ return commandExecutor.<ReadIndexResponse>send(
+ peer -> MESSAGES_FACTORY.readIndexRequest()
+ .groupId(groupId.toString())
+ .peerId(peer.consistentId())
+ .serverId(peer.consistentId())
+ .build(),
+ TargetPeerStrategy.LEADER,
Review Comment:
Just a comment ^. Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]