JAkutenshi commented on code in PR #7242:
URL: https://github.com/apache/ignite-3/pull/7242#discussion_r2737311786


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java:
##########
@@ -326,7 +397,823 @@ private static boolean recoverable(Throwable t) {
 
     @Override
     public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
-        return raftClient.run(cmd, timeoutMillis);
+        // Normalize timeout: negative values mean infinite wait.
+        long effectiveTimeout = (timeoutMillis < 0) ? Long.MAX_VALUE : 
timeoutMillis;
+        // Wait for leader mode (bounded or infinite).
+        long deadline = Utils.monotonicMsAfter(effectiveTimeout);
+
+        return executeWithBusyLock(responseFuture -> {
+            if (effectiveTimeout == 0) {
+                tryAllPeersOnce(responseFuture, cmd);
+            } else {
+                startRetryPhase(responseFuture, cmd, deadline, 
leaderAvailabilityState.currentTerm());
+            }
+        });
+    }
+
+    /**
+     * Resolves initial target peer for a command execution.
+     *
+     * <p>Tries the known leader first, falling back to a random peer if no 
leader is known.
+     *
+     * @return Initial target peer, or {@code null}.
+     */
+    @Nullable
+    private Peer resolveInitialPeer() {
+        Peer targetPeer = leader;
+        if (targetPeer == null) {
+            targetPeer = randomNode(null, false);
+        }
+        return targetPeer;
+    }
+
+    /**
+     * Tries all peers once without waiting for leader.
+     *
+     * @param resultFuture Future that completes with the response, or fails 
with {@link ReplicationGroupUnavailableException} if no
+     *         peer responds successfully.
+     * @param cmd The command to execute.
+     */
+    private void tryAllPeersOnce(CompletableFuture<ActionResponse> 
resultFuture, Command cmd) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                0,  // Single attempt - no retry timeout
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetrySingleAttempt(resultFuture, context);
+    }
+
+    /**
+     * Executes an action within busy lock and transforms the response.
+     */
+    private <R> CompletableFuture<R> 
executeWithBusyLock(Consumer<CompletableFuture<ActionResponse>> action) {
+        var responseFuture = new CompletableFuture<ActionResponse>();
+        var resultFuture = new CompletableFuture<R>();
+
+        if (!busyLock.enterBusy()) {
+            
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+
+            return resultFuture;
+        }
+
+        try {
+            action.accept(responseFuture);
+
+            // Transform ActionResponse to result type.
+            responseFuture.whenComplete((resp, err) -> {
+                if (err != null) {
+                    resultFuture.completeExceptionally(err);
+                } else {
+                    resultFuture.complete((R) resp.result());
+                }
+            });
+
+            return resultFuture;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Applies deadline to a future using orTimeout.
+     *
+     * @param future The future to apply deadline to.
+     * @param deadline Deadline in monotonic milliseconds, or Long.MAX_VALUE 
for no deadline.
+     * @return The future with timeout applied, or the original future if no 
deadline.
+     */
+    private static <T> CompletableFuture<T> applyDeadline(CompletableFuture<T> 
future, long deadline) {
+        if (deadline == Long.MAX_VALUE) {
+            return future;
+        }
+        long remainingTime = deadline - Utils.monotonicMs();
+        if (remainingTime <= 0) {
+            return CompletableFuture.failedFuture(new TimeoutException());
+        }
+        return future.orTimeout(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a timeout exception for leader wait.
+     */
+    private ReplicationGroupUnavailableException createTimeoutException() {
+        return new ReplicationGroupUnavailableException(
+                groupId(),
+                "Timeout waiting for leader [groupId=" + groupId() + "]."
+        );
+    }
+
+    /**
+     * Waits for leader to become available and then retries the command.
+     */
+    private void waitForLeaderAndRetry(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline) {
+        CompletableFuture<Long> leaderFuture = 
leaderAvailabilityState.awaitLeader();
+
+        // Apply timeout if bounded.
+        CompletableFuture<Long> timedLeaderFuture = 
applyDeadline(leaderFuture, deadline);
+
+        timedLeaderFuture.whenCompleteAsync((term, waitError) -> {
+            if (waitError != null) {
+                Throwable cause = unwrapCause(waitError);
+                if (cause instanceof TimeoutException) {
+                    
resultFuture.completeExceptionally(createTimeoutException());
+                } else {
+                    resultFuture.completeExceptionally(cause);
+                }
+                return;
+            }
+
+            if (!busyLock.enterBusy()) {
+                
resultFuture.completeExceptionally(stoppingExceptionFactory.create("Raft client 
is stopping [" + groupId() + "]."));
+                return;
+            }
+
+            try {
+                // Leader is available, now run the command with retry logic.
+                startRetryPhase(resultFuture, cmd, deadline, term);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }, executor);
+    }
+
+    /**
+     * Starts the retry phase after leader is available.
+     */
+    private void startRetryPhase(CompletableFuture<ActionResponse> 
resultFuture, Command cmd, long deadline, long term) {
+        Peer targetPeer = resolveInitialPeer();
+        if (targetPeer == null) {
+            resultFuture.completeExceptionally(new 
ReplicationGroupUnavailableException(groupId()));
+
+            return;
+        }
+
+        // Check deadline before starting retry phase.
+        long now = Utils.monotonicMs();
+        if (deadline != Long.MAX_VALUE && now >= deadline) {
+            resultFuture.completeExceptionally(createTimeoutException());
+            return;
+        }
+
+        // Use retry timeout bounded by remaining time until deadline.
+        long configTimeout = raftConfiguration.retryTimeoutMillis().value();
+
+        long sendWithRetryTimeoutMillis = deadline == Long.MAX_VALUE ? 
configTimeout : Math.min(configTimeout, deadline - now);
+
+        var context = new RetryContext(
+                groupId().toString(),
+                targetPeer,
+                cmd::toStringForLightLogging,
+                createRequestFactory(cmd),
+                sendWithRetryTimeoutMillis,
+                RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+        );
+
+        sendWithRetryWaitingForLeader(resultFuture, context, cmd, deadline, 
term);
+    }
+
+    /**
+     * Creates a request factory for the given command.
+     */
+    private Function<Peer, ActionRequest> createRequestFactory(Command cmd) {
+        if (cmd instanceof WriteCommand) {
+            return targetPeer -> MESSAGES_FACTORY.writeActionRequest()
+                    .groupId(groupId().toString())
+                    .command(commandsMarshaller.marshall(cmd))
+                    .deserializedCommand((WriteCommand) cmd)
+                    .build();
+        } else {
+            return targetPeer -> MESSAGES_FACTORY.readActionRequest()
+                    .groupId(groupId().toString())
+                    .command((ReadCommand) cmd)
+                    .readOnlySafe(true)
+                    .build();
+        }
+    }
+
+    /**
+     * Sends a request with single attempt (no retry on timeout).
+     *
+     * <p>In single-attempt mode, each peer is tried at most once.
+     */
+    private <R extends NetworkMessage> void sendWithRetrySingleAttempt(
+            CompletableFuture<R> fut,
+            RetryContext retryContext
+    ) {
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(stoppingExceptionFactory.create("Raft 
client is stopping [" + groupId() + "]."));
+            return;
+        }
+
+        try {
+            long responseTimeout = retryContext.responseTimeoutMillis() == 
RetryContext.USE_DEFAULT_RESPONSE_TIMEOUT
+                    ? throttlingContextHolder.peerRequestTimeoutMillis() : 
retryContext.responseTimeoutMillis();
+
+            retryContext.onNewAttempt();
+
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> 
clusterService.messagingService().invoke(node, retryContext.request(), 
responseTimeout))
+                    .whenComplete((resp, err) -> {
+                        if (!busyLock.enterBusy()) {
+                            
fut.completeExceptionally(stoppingExceptionFactory.create("Raft client is 
stopping [" + groupId() + "]."));
+                            return;
+                        }
+
+                        try {
+                            if (err != null) {
+                                handleThrowableSingleAttempt(fut, err, 
retryContext);

Review Comment:
   Consider as resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to