JAkutenshi commented on code in PR #7507:
URL: https://github.com/apache/ignite-3/pull/7507#discussion_r2872619660


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/LeaderAvailabilityState.java:
##########
@@ -112,34 +126,95 @@ void onLeaderElected(InternalClusterNode leader, long 
term) {
 
         synchronized (mutex) {
             if (stopped) {
-                LOG.debug("Ignoring leader election after stop [leader={}, 
term={}]", leader, term);
-                return;
+                LOG.debug("Ignoring leader update after stop [leader={}, 
term={}]", leader, term);
+                return false;
             }
 
             // Ignore stale term notifications.
             if (term <= currentTerm) {
                 LOG.debug("Ignoring stale leader [newTerm={}, 
currentTerm={}]", term, currentTerm);
-                return;
+                return false;
             }
 
             long previousTerm = currentTerm;
             State previousState = currentState;
 
             currentTerm = term;
+            this.leader = leader;
 
-            if (currentState == State.WAITING_FOR_LEADER) {
+            if (leader != null && currentState == State.WAITING_FOR_LEADER) {
                 currentState = State.LEADER_AVAILABLE;
                 futureToComplete = waiters;
             }
 
-            LOG.debug("Leader elected [leader={}, term={}, previousTerm={}, 
stateChange={}->{}]",
+            LOG.debug("Leader updated [leader={}, term={}, previousTerm={}, 
stateChange={}->{}]",
                     leader, term, previousTerm, previousState, currentState);
         }
 
         // Complete outside the lock to avoid potential deadlocks with future 
callbacks.
         if (futureToComplete != null) {
             futureToComplete.complete(term);
         }
+
+        return true;
+    }
+
+    /**
+     * Updates the cached leader without changing the term or availability 
state.
+     *
+     * <p>This is an optimistic cache update used when a response or redirect 
provides leader information
+     * without a term change. Has no effect after stop.
+     *
+     * @param leader The leader peer hint (may be {@code null} to clear).
+     */
+    void setLeaderHint(@Nullable Peer leader) {

Review Comment:
   Optional but consider `setCachedLeader` 



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java:
##########
@@ -880,49 +828,222 @@ private void verifyExact3PeersCalled() {
         assertThat(triedPeers, equalTo(expectedPeers));
     }
 
-    private static class TestResponse {
+    // 
==========================================================================================
+    // Tests for refreshLeader and refreshAndGetLeaderWithTerm (RANDOM 
strategy)
+    // 
==========================================================================================
+
+    /**
+     * Tests that for GetLeaderRequest with UNKNOWN/EINTERNAL/ENOENT errors, 
the executor tries another peer
+     * instead of retrying the same peer. This matches RaftGroupServiceImpl 
behavior.
+     *
+     * <p>The test verifies that after receiving one of these errors from a 
peer, the next request goes to
+     * a DIFFERENT peer, not the same one.
+     */
+    @ParameterizedTest
+    @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+    void testGetLeaderRequestTriesDifferentPeerOnTransientError(RaftError 
error) throws Exception {
+        Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+        AtomicInteger callCount = new AtomicInteger(0);
+
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+            int count = callCount.incrementAndGet();
+
+            if (count == 1) {
+                // First call returns transient error.
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .build());
+            }
+
+            // Second call succeeds.
+            return completedFuture(FACTORY.getLeaderResponse()
+                    .leaderId(PeerId.fromPeer(NODES.get(0)).toString())
+                    .currentTerm(CURRENT_TERM)
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        CompletableFuture<LeaderWithTerm> result = 
svc.refreshAndGetLeaderWithTerm(TIMEOUT);
+
+        assertThat(result, willCompleteSuccessfully());
+
+        // Verify that at least 2 different peers were called.
+        // If the same peer was retried, calledPeers would have size 1.
+        assertThat("Should try different peers on " + error + " error, but 
called peers were: " + calledPeers,
+                calledPeers.size(), greaterThan(1));
     }
 
     /**
-     * Tests single-attempt mode (timeout=0) with 5 nodes: all return "no 
leader".
+     * Tests that RANDOM strategy with bounded timeout keeps cycling through 
peers until timeout.
      *
-     * <p>In single-attempt mode, "no leader" is treated same as unavailable.
-     * Each peer is tried exactly once, then fails with 
ReplicationGroupUnavailableException.
+     * <p>When all peers return EPERM (no leader), RaftGroupServiceImpl resets 
unavailable peers
+     * and keeps cycling until timeout. RaftCommandExecutor should do the same 
for RANDOM strategy.
      */
     @Test
-    void testSingleAttemptModeWithAllNoLeader() {
+    void testRandomStrategyWithBoundedTimeoutKeepsCyclingUntilTimeout() throws 
Exception {
+        AtomicInteger callCount = new AtomicInteger(0);
+        CountDownLatch sixCallsReached = new CountDownLatch(6);

Review Comment:
   ```suggestion
           long invokeTimeoutSec = 2;
           
           // We would expect at least 6 calls: 3 nodes * (~2-3 calls with 0.5s 
leader refresh rate during invoke timeout).
           CountDownLatch atLeastCallsReached = new CountDownLatch(NODES.size() 
* invokeTimeoutSec);
   ```
   
   ^ optional but I'm afraid we may lost the number once upon a time or it may 
be unclear later for a problem solver engineer. 



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java:
##########
@@ -880,49 +828,222 @@ private void verifyExact3PeersCalled() {
         assertThat(triedPeers, equalTo(expectedPeers));
     }
 
-    private static class TestResponse {
+    // 
==========================================================================================
+    // Tests for refreshLeader and refreshAndGetLeaderWithTerm (RANDOM 
strategy)
+    // 
==========================================================================================
+
+    /**
+     * Tests that for GetLeaderRequest with UNKNOWN/EINTERNAL/ENOENT errors, 
the executor tries another peer
+     * instead of retrying the same peer. This matches RaftGroupServiceImpl 
behavior.
+     *
+     * <p>The test verifies that after receiving one of these errors from a 
peer, the next request goes to
+     * a DIFFERENT peer, not the same one.
+     */
+    @ParameterizedTest
+    @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+    void testGetLeaderRequestTriesDifferentPeerOnTransientError(RaftError 
error) throws Exception {
+        Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+        AtomicInteger callCount = new AtomicInteger(0);
+
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+            int count = callCount.incrementAndGet();
+
+            if (count == 1) {
+                // First call returns transient error.
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .build());
+            }
+
+            // Second call succeeds.
+            return completedFuture(FACTORY.getLeaderResponse()
+                    .leaderId(PeerId.fromPeer(NODES.get(0)).toString())
+                    .currentTerm(CURRENT_TERM)
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        CompletableFuture<LeaderWithTerm> result = 
svc.refreshAndGetLeaderWithTerm(TIMEOUT);
+
+        assertThat(result, willCompleteSuccessfully());
+
+        // Verify that at least 2 different peers were called.
+        // If the same peer was retried, calledPeers would have size 1.
+        assertThat("Should try different peers on " + error + " error, but 
called peers were: " + calledPeers,
+                calledPeers.size(), greaterThan(1));
     }
 
     /**
-     * Tests single-attempt mode (timeout=0) with 5 nodes: all return "no 
leader".
+     * Tests that RANDOM strategy with bounded timeout keeps cycling through 
peers until timeout.
      *
-     * <p>In single-attempt mode, "no leader" is treated same as unavailable.
-     * Each peer is tried exactly once, then fails with 
ReplicationGroupUnavailableException.
+     * <p>When all peers return EPERM (no leader), RaftGroupServiceImpl resets 
unavailable peers
+     * and keeps cycling until timeout. RaftCommandExecutor should do the same 
for RANDOM strategy.
      */
     @Test
-    void testSingleAttemptModeWithAllNoLeader() {
+    void testRandomStrategyWithBoundedTimeoutKeepsCyclingUntilTimeout() throws 
Exception {
+        AtomicInteger callCount = new AtomicInteger(0);
+        CountDownLatch sixCallsReached = new CountDownLatch(6);
+
+        // All peers return EPERM (no leader) every time.
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
+            callCount.incrementAndGet();
+            sixCallsReached.countDown();
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EPERM.getNumber())
+                    .leaderId(null)
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // With bounded timeout (500ms), should keep cycling until timeout.
+        // With 3 peers and 50ms retry delay, we should get more than 3 calls 
before timeout.
+        CompletableFuture<Void> result = svc.refreshLeader(500);
+
+        // Wait for at least 6 calls (2 complete cycles through all 3 peers).
+        boolean sixCallsHappened = sixCallsReached.await(2, TimeUnit.SECONDS);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class, 2, TimeUnit.SECONDS));
+
+        // Verify more than 3 calls were made (cycling through peers multiple 
times).
+        assertThat("Should cycle through peers multiple times before timeout, 
but only " + callCount.get() + " calls were made",
+                sixCallsHappened, is(true));
+        assertThat(callCount.get(), greaterThan(3));
+    }
+
+    /**
+     * Tests that the timeout parameter is respected for 
refreshLeader/refreshAndGetLeaderWithTerm.
+     * With a short timeout, the request should fail fast, not wait for the 
default response timeout.
+     */
+    @Test
+    void testTimeoutParameterIsRespectedForRefreshLeader() {
+        // All peers never respond (simulating network issue).
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenReturn(new CompletableFuture<>()); // Never completes
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        long startTime = System.currentTimeMillis();
+
+        // With 200ms timeout, should fail within ~300ms, not the default 
3000ms.
+        CompletableFuture<Void> result = svc.refreshLeader(200);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class, 1, TimeUnit.SECONDS));
+
+        long elapsed = System.currentTimeMillis() - startTime;
+        assertThat("Expected to fail within 1000ms but took " + elapsed + 
"ms", elapsed < 1000, is(true));
+    }
+
+    /**
+     * Tests that RANDOM strategy with timeout=0 (single attempt) tries all 
peers once and fails.
+     *
+     * <p>With timeout=0, each peer should be tried at most once, then fail 
with ReplicationGroupUnavailableException.
+     */
+    @Test
+    void testRandomStrategySingleAttemptTriesAllPeersOnce() throws Exception {
         AtomicInteger callCount = new AtomicInteger(0);
         Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+        CountDownLatch constructorCallsDone = new CountDownLatch(3); // 
Constructor tries all 3 peers
 
-        // All peers return EPERM with no leader.
+        // All peers return EPERM (no leader).
         when(messagingService.invoke(
                 any(InternalClusterNode.class),
-                argThat(this::isTestWriteCommand),
-                anyLong()
-        )).thenAnswer(invocation -> {
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
             InternalClusterNode target = invocation.getArgument(0);
             calledPeers.add(target.name());
-            callCount.incrementAndGet();
+            int count = callCount.incrementAndGet();
+            // Signal when constructor's calls complete (first 3 calls).
+            if (count <= 3) {

Review Comment:
   `NODES.size()` ?



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java:
##########
@@ -880,49 +828,222 @@ private void verifyExact3PeersCalled() {
         assertThat(triedPeers, equalTo(expectedPeers));
     }
 
-    private static class TestResponse {
+    // 
==========================================================================================
+    // Tests for refreshLeader and refreshAndGetLeaderWithTerm (RANDOM 
strategy)
+    // 
==========================================================================================

Review Comment:
   Do we need this? Looks inconsistent with other test classes (but optional, I 
guess)



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java:
##########
@@ -880,49 +828,222 @@ private void verifyExact3PeersCalled() {
         assertThat(triedPeers, equalTo(expectedPeers));
     }
 
-    private static class TestResponse {
+    // 
==========================================================================================
+    // Tests for refreshLeader and refreshAndGetLeaderWithTerm (RANDOM 
strategy)
+    // 
==========================================================================================
+
+    /**
+     * Tests that for GetLeaderRequest with UNKNOWN/EINTERNAL/ENOENT errors, 
the executor tries another peer
+     * instead of retrying the same peer. This matches RaftGroupServiceImpl 
behavior.
+     *
+     * <p>The test verifies that after receiving one of these errors from a 
peer, the next request goes to
+     * a DIFFERENT peer, not the same one.
+     */
+    @ParameterizedTest
+    @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+    void testGetLeaderRequestTriesDifferentPeerOnTransientError(RaftError 
error) throws Exception {
+        Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+        AtomicInteger callCount = new AtomicInteger(0);
+
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
+            InternalClusterNode target = invocation.getArgument(0);
+            calledPeers.add(target.name());
+            int count = callCount.incrementAndGet();
+
+            if (count == 1) {
+                // First call returns transient error.
+                return completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .build());
+            }
+
+            // Second call succeeds.
+            return completedFuture(FACTORY.getLeaderResponse()
+                    .leaderId(PeerId.fromPeer(NODES.get(0)).toString())
+                    .currentTerm(CURRENT_TERM)
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        CompletableFuture<LeaderWithTerm> result = 
svc.refreshAndGetLeaderWithTerm(TIMEOUT);
+
+        assertThat(result, willCompleteSuccessfully());
+
+        // Verify that at least 2 different peers were called.
+        // If the same peer was retried, calledPeers would have size 1.
+        assertThat("Should try different peers on " + error + " error, but 
called peers were: " + calledPeers,
+                calledPeers.size(), greaterThan(1));
     }
 
     /**
-     * Tests single-attempt mode (timeout=0) with 5 nodes: all return "no 
leader".
+     * Tests that RANDOM strategy with bounded timeout keeps cycling through 
peers until timeout.
      *
-     * <p>In single-attempt mode, "no leader" is treated same as unavailable.
-     * Each peer is tried exactly once, then fails with 
ReplicationGroupUnavailableException.
+     * <p>When all peers return EPERM (no leader), RaftGroupServiceImpl resets 
unavailable peers
+     * and keeps cycling until timeout. RaftCommandExecutor should do the same 
for RANDOM strategy.
      */
     @Test
-    void testSingleAttemptModeWithAllNoLeader() {
+    void testRandomStrategyWithBoundedTimeoutKeepsCyclingUntilTimeout() throws 
Exception {
+        AtomicInteger callCount = new AtomicInteger(0);
+        CountDownLatch sixCallsReached = new CountDownLatch(6);
+
+        // All peers return EPERM (no leader) every time.
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
+            callCount.incrementAndGet();
+            sixCallsReached.countDown();
+            return completedFuture(FACTORY.errorResponse()
+                    .errorCode(RaftError.EPERM.getNumber())
+                    .leaderId(null)
+                    .build());
+        });
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        // With bounded timeout (500ms), should keep cycling until timeout.
+        // With 3 peers and 50ms retry delay, we should get more than 3 calls 
before timeout.
+        CompletableFuture<Void> result = svc.refreshLeader(500);
+
+        // Wait for at least 6 calls (2 complete cycles through all 3 peers).
+        boolean sixCallsHappened = sixCallsReached.await(2, TimeUnit.SECONDS);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class, 2, TimeUnit.SECONDS));
+
+        // Verify more than 3 calls were made (cycling through peers multiple 
times).
+        assertThat("Should cycle through peers multiple times before timeout, 
but only " + callCount.get() + " calls were made",
+                sixCallsHappened, is(true));
+        assertThat(callCount.get(), greaterThan(3));
+    }
+
+    /**
+     * Tests that the timeout parameter is respected for 
refreshLeader/refreshAndGetLeaderWithTerm.
+     * With a short timeout, the request should fail fast, not wait for the 
default response timeout.
+     */
+    @Test
+    void testTimeoutParameterIsRespectedForRefreshLeader() {
+        // All peers never respond (simulating network issue).
+        when(messagingService.invoke(
+                any(InternalClusterNode.class),
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenReturn(new CompletableFuture<>()); // Never completes
+
+        PhysicalTopologyAwareRaftGroupService svc = startService();
+
+        long startTime = System.currentTimeMillis();
+
+        // With 200ms timeout, should fail within ~300ms, not the default 
3000ms.
+        CompletableFuture<Void> result = svc.refreshLeader(200);
+
+        assertThat(result, 
willThrow(ReplicationGroupUnavailableException.class, 1, TimeUnit.SECONDS));
+
+        long elapsed = System.currentTimeMillis() - startTime;
+        assertThat("Expected to fail within 1000ms but took " + elapsed + 
"ms", elapsed < 1000, is(true));
+    }
+
+    /**
+     * Tests that RANDOM strategy with timeout=0 (single attempt) tries all 
peers once and fails.
+     *
+     * <p>With timeout=0, each peer should be tried at most once, then fail 
with ReplicationGroupUnavailableException.
+     */
+    @Test
+    void testRandomStrategySingleAttemptTriesAllPeersOnce() throws Exception {
         AtomicInteger callCount = new AtomicInteger(0);
         Set<String> calledPeers = ConcurrentHashMap.newKeySet();
+        CountDownLatch constructorCallsDone = new CountDownLatch(3); // 
Constructor tries all 3 peers
 
-        // All peers return EPERM with no leader.
+        // All peers return EPERM (no leader).
         when(messagingService.invoke(
                 any(InternalClusterNode.class),
-                argThat(this::isTestWriteCommand),
-                anyLong()
-        )).thenAnswer(invocation -> {
+                any(GetLeaderRequest.class),
+                anyLong())
+        ).thenAnswer(invocation -> {
             InternalClusterNode target = invocation.getArgument(0);
             calledPeers.add(target.name());
-            callCount.incrementAndGet();
+            int count = callCount.incrementAndGet();
+            // Signal when constructor's calls complete (first 3 calls).
+            if (count <= 3) {

Review Comment:
   On the other hand below `3` is hardcoded, consider as optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to