JAkutenshi commented on code in PR #7507:
URL: https://github.com/apache/ignite-3/pull/7507#discussion_r2872745918


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener 
callback) {
 
     private SubscriptionLeaderChangeRequest 
subscriptionLeaderChangeRequest(boolean subscribe) {
         return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
-                .groupId(groupId())
+                .groupId(groupId)
                 .subscribe(subscribe)
                 .build();
     }
 
-    private CompletableFuture<Boolean> sendMessage(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg) {
-        var msgSendFut = new CompletableFuture<Boolean>();
-
-        sendWithRetry(node, msg, msgSendFut);
-
-        return msgSendFut;
-    }
-
-    private void sendWithRetry(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
-        Long responseTimeout = 
raftConfiguration.responseTimeoutMillis().value();
-
-        clusterService.messagingService().invoke(node, msg, 
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
-            if (invokeThrowable == null) {
-                msgSendFut.complete(true);
-
-                return;
-            }
-
-            Throwable invokeCause = unwrapCause(invokeThrowable);
-            if (!msg.subscribe()) {
-                // We don't want to propagate exceptions when unsubscribing 
(if it's not an Error!).
-                if (invokeCause instanceof Error) {
-                    msgSendFut.completeExceptionally(invokeThrowable);
-                } else {
-                    LOG.debug("An exception while trying to unsubscribe.", 
invokeThrowable);
-
-                    msgSendFut.complete(false);
-                }
-            } else if (RaftErrorUtils.recoverable(invokeCause)) {
-                sendWithRetry(node, msg, msgSendFut);
-            } else if (invokeCause instanceof RecipientLeftException) {
-                LOG.info(
-                        "Could not subscribe to leader update from a specific 
node, because the node had left the cluster: [node={}].",
-                        node
-                );
-
-                msgSendFut.complete(false);
-            } else if (invokeCause instanceof NodeStoppingException) {
-                msgSendFut.complete(false);
-            } else {
-                LOG.error("Could not send the subscribe message to the node: 
[node={}, msg={}].", invokeThrowable, node, msg);
-
-                msgSendFut.completeExceptionally(invokeThrowable);
-            }
-        }, executor);
-    }
-
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
         return commandExecutor.run(cmd, timeoutMillis);
     }
 
     @Override
     public ReplicationGroupId groupId() {
-        return raftClient.groupId();
+        return groupId;
     }
 
     @Override
     public @Nullable Peer leader() {
-        return raftClient.leader();
+        return commandExecutor.leader();
     }
 
     @Override
     public List<Peer> peers() {
-        return raftClient.peers();
+        return peers;
     }
 
     @Override
     public @Nullable List<Peer> learners() {
-        return raftClient.learners();
+        return learners;
     }
 
     @Override
-    public CompletableFuture<Void> refreshLeader() {
-        return raftClient.refreshLeader();
+    public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+        return commandExecutor.<GetLeaderResponse>send(
+                peer -> MESSAGES_FACTORY.getLeaderRequest()
+                        .peerId(peerId(peer))
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.RANDOM,

Review Comment:
   Just a comment ^. Resolved.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener 
callback) {
 
     private SubscriptionLeaderChangeRequest 
subscriptionLeaderChangeRequest(boolean subscribe) {
         return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
-                .groupId(groupId())
+                .groupId(groupId)
                 .subscribe(subscribe)
                 .build();
     }
 
-    private CompletableFuture<Boolean> sendMessage(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg) {
-        var msgSendFut = new CompletableFuture<Boolean>();
-
-        sendWithRetry(node, msg, msgSendFut);
-
-        return msgSendFut;
-    }
-
-    private void sendWithRetry(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
-        Long responseTimeout = 
raftConfiguration.responseTimeoutMillis().value();
-
-        clusterService.messagingService().invoke(node, msg, 
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
-            if (invokeThrowable == null) {
-                msgSendFut.complete(true);
-
-                return;
-            }
-
-            Throwable invokeCause = unwrapCause(invokeThrowable);
-            if (!msg.subscribe()) {
-                // We don't want to propagate exceptions when unsubscribing 
(if it's not an Error!).
-                if (invokeCause instanceof Error) {
-                    msgSendFut.completeExceptionally(invokeThrowable);
-                } else {
-                    LOG.debug("An exception while trying to unsubscribe.", 
invokeThrowable);
-
-                    msgSendFut.complete(false);
-                }
-            } else if (RaftErrorUtils.recoverable(invokeCause)) {
-                sendWithRetry(node, msg, msgSendFut);
-            } else if (invokeCause instanceof RecipientLeftException) {
-                LOG.info(
-                        "Could not subscribe to leader update from a specific 
node, because the node had left the cluster: [node={}].",
-                        node
-                );
-
-                msgSendFut.complete(false);
-            } else if (invokeCause instanceof NodeStoppingException) {
-                msgSendFut.complete(false);
-            } else {
-                LOG.error("Could not send the subscribe message to the node: 
[node={}, msg={}].", invokeThrowable, node, msg);
-
-                msgSendFut.completeExceptionally(invokeThrowable);
-            }
-        }, executor);
-    }
-
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
         return commandExecutor.run(cmd, timeoutMillis);
     }
 
     @Override
     public ReplicationGroupId groupId() {
-        return raftClient.groupId();
+        return groupId;
     }
 
     @Override
     public @Nullable Peer leader() {
-        return raftClient.leader();
+        return commandExecutor.leader();
     }
 
     @Override
     public List<Peer> peers() {
-        return raftClient.peers();
+        return peers;
     }
 
     @Override
     public @Nullable List<Peer> learners() {
-        return raftClient.learners();
+        return learners;
     }
 
     @Override
-    public CompletableFuture<Void> refreshLeader() {
-        return raftClient.refreshLeader();
+    public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+        return commandExecutor.<GetLeaderResponse>send(
+                peer -> MESSAGES_FACTORY.getLeaderRequest()
+                        .peerId(peerId(peer))
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.RANDOM,
+                timeoutMillis
+        ).thenAccept(resp -> {
+            // Only update cached leader if the term is newer to avoid 
overwriting
+            // fresher info from leader election notifications with stale 
responses.
+            commandExecutor.setLeaderIfTermNewer(parsePeer(resp.leaderId()), 
resp.currentTerm());
+        });
     }
 
     @Override
-    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
-        return raftClient.refreshAndGetLeaderWithTerm();
+    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm(long 
timeoutMillis) {
+        return commandExecutor.<GetLeaderResponse>send(
+                peer -> MESSAGES_FACTORY.getLeaderRequest()
+                        .peerId(peerId(peer))
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.RANDOM,
+                timeoutMillis
+        ).thenApply(resp -> {
+            if (resp.leaderId() == null) {
+                return LeaderWithTerm.NO_LEADER;
+            }
+
+            Peer respLeader = parsePeer(resp.leaderId());
+            // Only update cached leader if the term is newer to avoid 
overwriting
+            // fresher info from leader election notifications with stale 
responses.
+            commandExecutor.setLeaderIfTermNewer(respLeader, 
resp.currentTerm());
+            return new LeaderWithTerm(respLeader, resp.currentTerm());
+        });
     }
 
     @Override
-    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
-        return raftClient.refreshMembers(onlyAlive);
+    public CompletableFuture<Void> refreshMembers(boolean onlyAlive, long 
timeoutMillis) {
+        return commandExecutor.<GetPeersResponse>send(
+                peer -> MESSAGES_FACTORY.getPeersRequest()
+                        .leaderId(peerId(peer))
+                        .onlyAlive(onlyAlive)
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.LEADER,

Review Comment:
   Just a comment ^. Resolved.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -287,166 +312,276 @@ public void unsubscribeLeader(LeaderElectionListener 
callback) {
 
     private SubscriptionLeaderChangeRequest 
subscriptionLeaderChangeRequest(boolean subscribe) {
         return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
-                .groupId(groupId())
+                .groupId(groupId)
                 .subscribe(subscribe)
                 .build();
     }
 
-    private CompletableFuture<Boolean> sendMessage(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg) {
-        var msgSendFut = new CompletableFuture<Boolean>();
-
-        sendWithRetry(node, msg, msgSendFut);
-
-        return msgSendFut;
-    }
-
-    private void sendWithRetry(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
-        Long responseTimeout = 
raftConfiguration.responseTimeoutMillis().value();
-
-        clusterService.messagingService().invoke(node, msg, 
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
-            if (invokeThrowable == null) {
-                msgSendFut.complete(true);
-
-                return;
-            }
-
-            Throwable invokeCause = unwrapCause(invokeThrowable);
-            if (!msg.subscribe()) {
-                // We don't want to propagate exceptions when unsubscribing 
(if it's not an Error!).
-                if (invokeCause instanceof Error) {
-                    msgSendFut.completeExceptionally(invokeThrowable);
-                } else {
-                    LOG.debug("An exception while trying to unsubscribe.", 
invokeThrowable);
-
-                    msgSendFut.complete(false);
-                }
-            } else if (RaftErrorUtils.recoverable(invokeCause)) {
-                sendWithRetry(node, msg, msgSendFut);
-            } else if (invokeCause instanceof RecipientLeftException) {
-                LOG.info(
-                        "Could not subscribe to leader update from a specific 
node, because the node had left the cluster: [node={}].",
-                        node
-                );
-
-                msgSendFut.complete(false);
-            } else if (invokeCause instanceof NodeStoppingException) {
-                msgSendFut.complete(false);
-            } else {
-                LOG.error("Could not send the subscribe message to the node: 
[node={}, msg={}].", invokeThrowable, node, msg);
-
-                msgSendFut.completeExceptionally(invokeThrowable);
-            }
-        }, executor);
-    }
-
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
         return commandExecutor.run(cmd, timeoutMillis);
     }
 
     @Override
     public ReplicationGroupId groupId() {
-        return raftClient.groupId();
+        return groupId;
     }
 
     @Override
     public @Nullable Peer leader() {
-        return raftClient.leader();
+        return commandExecutor.leader();
     }
 
     @Override
     public List<Peer> peers() {
-        return raftClient.peers();
+        return peers;
     }
 
     @Override
     public @Nullable List<Peer> learners() {
-        return raftClient.learners();
+        return learners;
     }
 
     @Override
-    public CompletableFuture<Void> refreshLeader() {
-        return raftClient.refreshLeader();
+    public CompletableFuture<Void> refreshLeader(long timeoutMillis) {
+        return commandExecutor.<GetLeaderResponse>send(
+                peer -> MESSAGES_FACTORY.getLeaderRequest()
+                        .peerId(peerId(peer))
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.RANDOM,
+                timeoutMillis
+        ).thenAccept(resp -> {
+            // Only update cached leader if the term is newer to avoid 
overwriting
+            // fresher info from leader election notifications with stale 
responses.
+            commandExecutor.setLeaderIfTermNewer(parsePeer(resp.leaderId()), 
resp.currentTerm());
+        });
     }
 
     @Override
-    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
-        return raftClient.refreshAndGetLeaderWithTerm();
+    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm(long 
timeoutMillis) {
+        return commandExecutor.<GetLeaderResponse>send(
+                peer -> MESSAGES_FACTORY.getLeaderRequest()
+                        .peerId(peerId(peer))
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.RANDOM,
+                timeoutMillis
+        ).thenApply(resp -> {
+            if (resp.leaderId() == null) {
+                return LeaderWithTerm.NO_LEADER;
+            }
+
+            Peer respLeader = parsePeer(resp.leaderId());
+            // Only update cached leader if the term is newer to avoid 
overwriting
+            // fresher info from leader election notifications with stale 
responses.
+            commandExecutor.setLeaderIfTermNewer(respLeader, 
resp.currentTerm());
+            return new LeaderWithTerm(respLeader, resp.currentTerm());
+        });
     }
 
     @Override
-    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
-        return raftClient.refreshMembers(onlyAlive);
+    public CompletableFuture<Void> refreshMembers(boolean onlyAlive, long 
timeoutMillis) {
+        return commandExecutor.<GetPeersResponse>send(
+                peer -> MESSAGES_FACTORY.getPeersRequest()
+                        .leaderId(peerId(peer))
+                        .onlyAlive(onlyAlive)
+                        .groupId(groupId.toString())
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> {
+            this.peers = parsePeerList(resp.peersList());
+            this.learners = parsePeerList(resp.learnersList());
+        });
     }
 
     @Override
-    public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
-        return raftClient.addPeer(peer, sequenceToken);
+    public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken, long 
timeoutMillis) {
+        return commandExecutor.<AddPeerResponse>send(
+                targetPeer -> MESSAGES_FACTORY.addPeerRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .peerId(peerId(peer))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> this.peers = parsePeerList(resp.newPeersList()));
     }
 
     @Override
-    public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
-        return raftClient.removePeer(peer, sequenceToken);
+    public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken, 
long timeoutMillis) {
+        return commandExecutor.<RemovePeerResponse>send(
+                targetPeer -> MESSAGES_FACTORY.removePeerRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .peerId(peerId(peer))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> this.peers = parsePeerList(resp.newPeersList()));
     }
 
     @Override
-    public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners 
peersAndLearners, long term, long sequenceToken) {
-        return raftClient.changePeersAndLearners(peersAndLearners, term, 
sequenceToken);
+    public CompletableFuture<Void> changePeersAndLearners(
+            PeersAndLearners peersAndLearners, long term, long sequenceToken, 
long timeoutMillis) {
+        LOG.info("Sending changePeersAndLearners request for group={} to 
peers={} and learners={} with leader term={}",
+                groupId, peersAndLearners.peers(), 
peersAndLearners.learners(), term);
+
+        return commandExecutor.<ChangePeersAndLearnersResponse>send(
+                targetPeer -> MESSAGES_FACTORY.changePeersAndLearnersRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .term(term)
+                        .newPeersList(peerIds(peersAndLearners.peers()))
+                        .newLearnersList(peerIds(peersAndLearners.learners()))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> {
+            this.peers = parsePeerList(resp.newPeersList());
+            this.learners = parsePeerList(resp.newLearnersList());
+        });
     }
 
     @Override
-    public CompletableFuture<Void> 
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long 
sequenceToken) {
-        return raftClient.changePeersAndLearnersAsync(peersAndLearners, term, 
sequenceToken);
+    public CompletableFuture<Void> changePeersAndLearnersAsync(
+            PeersAndLearners peersAndLearners, long term, long sequenceToken, 
long timeoutMillis) {
+        LOG.info("Sending changePeersAndLearnersAsync request for group={} to 
peers={} and learners={} with leader term={}",
+                groupId, peersAndLearners.peers(), 
peersAndLearners.learners(), term);
+
+        return commandExecutor.<ChangePeersAndLearnersAsyncResponse>send(
+                targetPeer -> 
MESSAGES_FACTORY.changePeersAndLearnersAsyncRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .term(term)
+                        .newPeersList(peerIds(peersAndLearners.peers()))
+                        .newLearnersList(peerIds(peersAndLearners.learners()))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> {
+            this.peers = parsePeerList(resp.newPeersList());
+            this.learners = parsePeerList(resp.newLearnersList());
+        });
     }
 
     @Override
-    public CompletableFuture<Void> addLearners(Collection<Peer> learners, long 
sequenceToken) {
-        return raftClient.addLearners(learners, sequenceToken);
+    public CompletableFuture<Void> addLearners(Collection<Peer> learners, long 
sequenceToken, long timeoutMillis) {
+        return commandExecutor.<LearnersOpResponse>send(
+                targetPeer -> MESSAGES_FACTORY.addLearnersRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .learnersList(peerIds(learners))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> this.learners = 
parsePeerList(resp.newLearnersList()));
     }
 
     @Override
-    public CompletableFuture<Void> removeLearners(Collection<Peer> learners, 
long sequenceToken) {
-        return raftClient.removeLearners(learners, sequenceToken);
+    public CompletableFuture<Void> removeLearners(Collection<Peer> learners, 
long sequenceToken, long timeoutMillis) {
+        return commandExecutor.<LearnersOpResponse>send(
+                targetPeer -> MESSAGES_FACTORY.removeLearnersRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .learnersList(peerIds(learners))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> this.learners = 
parsePeerList(resp.newLearnersList()));
     }
 
     @Override
-    public CompletableFuture<Void> resetLearners(Collection<Peer> learners, 
long sequenceToken) {
-        return raftClient.resetLearners(learners, sequenceToken);
+    public CompletableFuture<Void> resetLearners(Collection<Peer> learners, 
long sequenceToken, long timeoutMillis) {
+        return commandExecutor.<LearnersOpResponse>send(
+                targetPeer -> MESSAGES_FACTORY.resetLearnersRequest()
+                        .leaderId(peerId(targetPeer))
+                        .groupId(groupId.toString())
+                        .learnersList(peerIds(learners))
+                        .sequenceToken(sequenceToken)
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> this.learners = 
parsePeerList(resp.newLearnersList()));
     }
 
     @Override
-    public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
-        return raftClient.snapshot(peer, forced);
+    public CompletableFuture<Void> snapshot(Peer peer, boolean forced, long 
timeoutMillis) {
+        return commandExecutor.send(
+                targetPeer -> MESSAGES_FACTORY.snapshotRequest()
+                        .peerId(peerId(peer))
+                        .groupId(groupId.toString())
+                        .forced(forced)
+                        .build(),
+                TargetPeerStrategy.SPECIFIC,
+                peer,
+                timeoutMillis
+        ).thenApply(unused -> null);
     }
 
     @Override
-    public CompletableFuture<Void> transferLeadership(Peer newLeader) {
-        return raftClient.transferLeadership(newLeader);
+    public CompletableFuture<Void> transferLeadership(Peer newLeader, long 
timeoutMillis) {
+        return commandExecutor.send(
+                targetPeer -> MESSAGES_FACTORY.transferLeaderRequest()
+                        .groupId(groupId.toString())
+                        .leaderId(peerId(targetPeer))
+                        .peerId(peerId(newLeader))
+                        .build(),
+                TargetPeerStrategy.LEADER,
+                timeoutMillis
+        ).thenAccept(resp -> commandExecutor.setLeader(newLeader));
     }
 
     @Override
     public void shutdown() {
-        // Stop the command executor first - blocks new run() calls, cancels 
leader waiters.
+        // Remove topology event handler to prevent new topology events from 
triggering activity.
+        
clusterService.topologyService().removeEventHandler(topologyEventHandler);
+
+        // Remove leader election listener to prevent memory leaks and stop 
receiving events.
+        eventsClientListener.removeLeaderElectionListener(groupId, 
generalLeaderElectionListener);
+
+        // Stop the command executor - blocks new run() calls, cancels leader 
waiters.
         commandExecutor.shutdown(stoppingExceptionFactory.create("Raft client 
is stopping [groupId=" + groupId() + "]."));
 
+        // Unsubscribe from leader updates with timeout to ensure clean 
shutdown.
         finishSubscriptions();
-
-        raftClient.shutdown();
     }
 
     @Override
-    public CompletableFuture<Long> readIndex() {
-        return raftClient.readIndex();
+    public CompletableFuture<Long> readIndex(long timeoutMillis) {
+        return commandExecutor.<ReadIndexResponse>send(
+                peer -> MESSAGES_FACTORY.readIndexRequest()
+                        .groupId(groupId.toString())
+                        .peerId(peer.consistentId())
+                        .serverId(peer.consistentId())
+                        .build(),
+                TargetPeerStrategy.LEADER,

Review Comment:
   Just a comment ^. Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to