JAkutenshi commented on code in PR #7242:
URL: https://github.com/apache/ignite-3/pull/7242#discussion_r2737247552


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to