timoninmaxim commented on code in PR #11954: URL: https://github.com/apache/ignite/pull/11954#discussion_r2075868957
########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java: ########## @@ -243,29 +243,34 @@ private <T> void handleServiceAsync( List<ClientConnectionException> failures ) { try { - applyOnDefaultChannel( - channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures), - null, - failures - ); + ClientChannel ch = applyOnDefaultChannel(channel -> channel, null, failures); Review Comment: Use Function.identity(), JVM caches it. ########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java: ########## @@ -395,14 +431,13 @@ public <T> IgniteClientFuture<T> affinityServiceAsync( CompletableFuture<T> fut = new CompletableFuture<>(); List<ClientConnectionException> failures = new ArrayList<>(); - Object result = applyOnNodeChannel( - affNodeId, - channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures), - failures - ); + ClientChannel ch = applyOnNodeChannel(affNodeId, channel -> channel, failures); Review Comment: Use Function.identity(), JVM caches it. ########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java: ########## @@ -243,29 +243,34 @@ private <T> void handleServiceAsync( List<ClientConnectionException> failures ) { try { - applyOnDefaultChannel( - channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures), - null, - failures - ); + ClientChannel ch = applyOnDefaultChannel(channel -> channel, null, failures); + + applyOnClientChannelAsync(fut, ch, op, payloadWriter, payloadReader, failures); } catch (Throwable ex) { fut.completeExceptionally(ex); } } - /** */ - private <T> Object applyOnClientChannelAsync( - final CompletableFuture<T> fut, + /** + * Retries an async operation on the same channel if it fails with a connection exception + * then falls back to other channels if retry fails. Aggregates failures and completes the original future. + */ + private <T> void applyOnClientChannelAsync( + CompletableFuture<T> fut, Review Comment: final ########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java: ########## @@ -243,29 +243,34 @@ private <T> void handleServiceAsync( List<ClientConnectionException> failures ) { try { - applyOnDefaultChannel( - channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures), - null, - failures - ); + ClientChannel ch = applyOnDefaultChannel(channel -> channel, null, failures); + + applyOnClientChannelAsync(fut, ch, op, payloadWriter, payloadReader, failures); } catch (Throwable ex) { fut.completeExceptionally(ex); } } - /** */ - private <T> Object applyOnClientChannelAsync( - final CompletableFuture<T> fut, + /** + * Retries an async operation on the same channel if it fails with a connection exception + * then falls back to other channels if retry fails. Aggregates failures and completes the original future. + */ + private <T> void applyOnClientChannelAsync( + CompletableFuture<T> fut, ClientChannel ch, ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader, List<ClientConnectionException> failures ) { - return ch - .serviceAsync(op, payloadWriter, payloadReader) + CompletableFuture<T> chFut = ch.serviceAsync(op, payloadWriter, payloadReader); + + // Retry use same channel in case of connection exception. + CompletableFuture<T> retryFut = chFut .handle((res, err) -> { + UUID nodeID = ch.serverNodeId(); Review Comment: nodeId ########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java: ########## @@ -275,31 +280,62 @@ private <T> Object applyOnClientChannelAsync( if (err instanceof ClientConnectionException) { ClientConnectionException failure0 = (ClientConnectionException)err; - failures.add(failure0); + ClientChannelHolder hld = (nodeID != null) ? nodeChannels.get(nodeID) : null; try { // Will try to reinit channels if topology changed. onChannelFailure(ch, err, failures); + + if (hld == null) { + failures.add(failure0); + + throw failure0; + } + + if (shouldRetry(op, failures.size() - 1, failure0)) { + ClientChannel newCh = hld.getOrCreateChannel(); + + return newCh.serviceAsync(op, payloadWriter, payloadReader); + } + else { + failures.add(failure0); + + fut.completeExceptionally(composeException(failures)); + } + } + catch (ClientConnectionException reconnectEx) { + onChannelFailure(hld, null, reconnectEx, failures); } catch (Throwable ex) { fut.completeExceptionally(ex); return null; } - - if (failures.size() < srvcChannelsLimit && shouldRetry(op, failures.size() - 1, failure0)) { - handleServiceAsync(fut, op, payloadWriter, payloadReader, failures); - - return null; - } - - fut.completeExceptionally(composeException(failures)); } else fut.completeExceptionally(err instanceof ClientException ? err : new ClientException(err)); return null; - }); + }).thenCompose(f -> f == null ? CompletableFuture.completedFuture(null) : f); Review Comment: Could you please take a look, Is there way to simplify it? ########## modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java: ########## @@ -243,29 +243,34 @@ private <T> void handleServiceAsync( List<ClientConnectionException> failures ) { try { - applyOnDefaultChannel( - channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures), - null, - failures - ); + ClientChannel ch = applyOnDefaultChannel(channel -> channel, null, failures); + + applyOnClientChannelAsync(fut, ch, op, payloadWriter, payloadReader, failures); } catch (Throwable ex) { fut.completeExceptionally(ex); } } - /** */ - private <T> Object applyOnClientChannelAsync( - final CompletableFuture<T> fut, + /** + * Retries an async operation on the same channel if it fails with a connection exception + * then falls back to other channels if retry fails. Aggregates failures and completes the original future. + */ + private <T> void applyOnClientChannelAsync( + CompletableFuture<T> fut, ClientChannel ch, ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, Function<PayloadInputChannel, T> payloadReader, List<ClientConnectionException> failures ) { - return ch - .serviceAsync(op, payloadWriter, payloadReader) + CompletableFuture<T> chFut = ch.serviceAsync(op, payloadWriter, payloadReader); + + // Retry use same channel in case of connection exception. + CompletableFuture<T> retryFut = chFut .handle((res, err) -> { + UUID nodeID = ch.serverNodeId(); Review Comment: move it closer to place where you use it ########## modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java: ########## @@ -718,6 +718,7 @@ private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable clo) t }); // Use Ignite while nodes keep failing. + Review Comment: useless change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org