This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new fed407a4de2 [SPARK-42090][3.2] Introduce sasl retry count in RetryingBlockTransferor fed407a4de2 is described below commit fed407a4de24f7a387cd25bb2297de0bf4ca016c Author: Ted Yu <yuzhih...@gmail.com> AuthorDate: Tue Jan 24 12:15:22 2023 -0600 [SPARK-42090][3.2] Introduce sasl retry count in RetryingBlockTransferor ### What changes were proposed in this pull request? This PR introduces sasl retry count in RetryingBlockTransferor. ### Why are the changes needed? Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test is added, courtesy of Mridul. Closes #39611 from tedyu/sasl-cnt. Authored-by: Ted Yu <yuzhihonggmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> Closes #39710 from akpatnam25/SPARK-42090-backport-3.2. Authored-by: Ted Yu <yuzhih...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/shuffle/RetryingBlockTransferor.java | 46 +++++++++++++++------- .../shuffle/RetryingBlockTransferorSuite.java | 35 +++++++++++++++- 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java index 4515e3a5c28..892de991612 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; @@ -87,7 +88,16 @@ public class RetryingBlockTransferor { /** Number of times we've attempted to retry so far. */ private int retryCount = 0; - private boolean saslTimeoutSeen; + // Number of times SASL timeout has been retried without success. + // If we see maxRetries consecutive failures, the request is failed. + // On the other hand, if sasl succeeds and we are able to send other requests subsequently, + // we reduce the SASL failures from retryCount (since SASL failures were part of + // connection bootstrap - which ended up being successful). + // spark.network.auth.rpcTimeout is much lower than spark.network.timeout and others - + // and so sasl is more susceptible to failures when remote service + // (like external shuffle service) is under load: but once it succeeds, we do not want to + // include it as part of request retries. + private int saslRetryCount = 0; /** * Set of all block ids which have not been transferred successfully or with a non-IO Exception. @@ -123,7 +133,7 @@ public class RetryingBlockTransferor { this.currentListener = new RetryingBlockTransferListener(); this.errorHandler = errorHandler; this.enableSaslRetries = conf.enableSaslRetries(); - this.saslTimeoutSeen = false; + this.saslRetryCount = 0; } public RetryingBlockTransferor( @@ -167,7 +177,7 @@ public class RetryingBlockTransferor { numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e); if (shouldRetry(e)) { - initiateRetry(); + initiateRetry(e); } else { for (String bid : blockIdsToTransfer) { listener.onBlockTransferFailure(bid, e); @@ -180,7 +190,10 @@ public class RetryingBlockTransferor { * Lightweight method which initiates a retry in a different thread. The retry will involve * calling transferAllOutstanding() after a configured wait time. */ - private synchronized void initiateRetry() { + private synchronized void initiateRetry(Throwable e) { + if (enableSaslRetries && e instanceof SaslTimeoutException) { + saslRetryCount += 1; + } retryCount += 1; currentListener = new RetryingBlockTransferListener(); @@ -203,16 +216,17 @@ public class RetryingBlockTransferor { boolean isIOException = e instanceof IOException || e.getCause() instanceof IOException; boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException; - if (!isSaslTimeout && saslTimeoutSeen) { - retryCount = 0; - saslTimeoutSeen = false; + // If this is a non SASL request failure, reduce earlier SASL failures from retryCount + // since some subsequent SASL attempt was successful + if (!isSaslTimeout && saslRetryCount > 0) { + Preconditions.checkState(retryCount >= saslRetryCount, + "retryCount must be greater than or equal to saslRetryCount"); + retryCount -= saslRetryCount; + saslRetryCount = 0; } boolean hasRemainingRetries = retryCount < maxRetries; boolean shouldRetry = (isSaslTimeout || isIOException) && hasRemainingRetries && errorHandler.shouldRetryError(e); - if (shouldRetry && isSaslTimeout) { - this.saslTimeoutSeen = true; - } return shouldRetry; } @@ -236,9 +250,13 @@ public class RetryingBlockTransferor { if (this == currentListener && outstandingBlocksIds.contains(blockId)) { outstandingBlocksIds.remove(blockId); shouldForwardSuccess = true; - if (saslTimeoutSeen) { - retryCount = 0; - saslTimeoutSeen = false; + // If there were SASL failures earlier, remove them from retryCount, as there was + // a SASL success (and some other request post bootstrap was also successful). + if (saslRetryCount > 0) { + Preconditions.checkState(retryCount >= saslRetryCount, + "retryCount must be greater than or equal to saslRetryCount"); + retryCount -= saslRetryCount; + saslRetryCount = 0; } } } @@ -256,7 +274,7 @@ public class RetryingBlockTransferor { synchronized (RetryingBlockTransferor.this) { if (this == currentListener && outstandingBlocksIds.contains(blockId)) { if (shouldRetry(exception)) { - initiateRetry(); + initiateRetry(exception); } else { if (errorHandler.shouldLogError(exception)) { logger.error( diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java index 117a9ba08df..31fe6184166 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java @@ -58,10 +58,12 @@ public class RetryingBlockTransferorSuite { private static Map<String, String> configMap; private static RetryingBlockTransferor _retryingBlockTransferor; + private static final int MAX_RETRIES = 2; + @Before public void initMap() { configMap = new HashMap<String, String>() {{ - put("spark.shuffle.io.maxRetries", "2"); + put("spark.shuffle.io.maxRetries", Integer.toString(MAX_RETRIES)); put("spark.shuffle.io.retryWait", "0"); }}; } @@ -309,7 +311,7 @@ public class RetryingBlockTransferorSuite { verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException); verify(listener, times(3)).getTransferType(); verifyNoMoreInteractions(listener); - assert(_retryingBlockTransferor.getRetryCount() == 2); + assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES); } @Test @@ -341,6 +343,35 @@ public class RetryingBlockTransferorSuite { assert(_retryingBlockTransferor.getRetryCount() == 1); } + @Test + public void testIOExceptionFailsConnectionEvenWithSaslException() + throws IOException, InterruptedException { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + + SaslTimeoutException saslExceptionInitial = new SaslTimeoutException("initial", + new TimeoutException()); + SaslTimeoutException saslExceptionFinal = new SaslTimeoutException("final", + new TimeoutException()); + IOException ioException = new IOException(); + List<? extends Map<String, Object>> interactions = Arrays.asList( + ImmutableMap.of("b0", saslExceptionInitial), + ImmutableMap.of("b0", ioException), + ImmutableMap.of("b0", saslExceptionInitial), + ImmutableMap.of("b0", ioException), + ImmutableMap.of("b0", saslExceptionFinal), + // will not get invoked because the connection fails + ImmutableMap.of("b0", ioException), + // will not get invoked + ImmutableMap.of("b0", block0) + ); + configMap.put("spark.shuffle.sasl.enableRetries", "true"); + performInteractions(interactions, listener); + verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal); + verify(listener, atLeastOnce()).getTransferType(); + verifyNoMoreInteractions(listener); + assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES); + } + /** * Performs a set of interactions in response to block requests from a RetryingBlockFetcher. * Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org