This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new fed407a4de2 [SPARK-42090][3.2] Introduce sasl retry count in 
RetryingBlockTransferor
fed407a4de2 is described below

commit fed407a4de24f7a387cd25bb2297de0bf4ca016c
Author: Ted Yu <yuzhih...@gmail.com>
AuthorDate: Tue Jan 24 12:15:22 2023 -0600

    [SPARK-42090][3.2] Introduce sasl retry count in RetryingBlockTransferor
    
    ### What changes were proposed in this pull request? This PR introduces 
sasl retry count in RetryingBlockTransferor.
    
    ### Why are the changes needed?
    Previously a boolean variable, saslTimeoutSeen, was used. However, the 
boolean variable wouldn't cover the following scenario:
    
    1. SaslTimeoutException
    2. IOException
    3. SaslTimeoutException
    4. IOException
    
    Even though IOException at #2 is retried (resulting in increment of 
retryCount), the retryCount would be cleared at step #4. Since the intention of 
saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, 
we should keep a counter for SaslTimeoutException retries and subtract the 
value of this counter from retryCount.
    
    ### Does this PR introduce _any_ user-facing change? No
    
    ### How was this patch tested?
    New test is added, courtesy of Mridul.
    
    Closes #39611 from tedyu/sasl-cnt.
    
    Authored-by: Ted Yu <yuzhihonggmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    
    Closes #39710 from akpatnam25/SPARK-42090-backport-3.2.
    
    Authored-by: Ted Yu <yuzhih...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/RetryingBlockTransferor.java   | 46 +++++++++++++++-------
 .../shuffle/RetryingBlockTransferorSuite.java      | 35 +++++++++++++++-
 2 files changed, 65 insertions(+), 16 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
index 4515e3a5c28..892de991612 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
@@ -87,7 +88,16 @@ public class RetryingBlockTransferor {
   /** Number of times we've attempted to retry so far. */
   private int retryCount = 0;
 
-  private boolean saslTimeoutSeen;
+  // Number of times SASL timeout has been retried without success.
+  // If we see maxRetries consecutive failures, the request is failed.
+  // On the other hand, if sasl succeeds and we are able to send other 
requests subsequently,
+  // we reduce the SASL failures from retryCount (since SASL failures were 
part of
+  // connection bootstrap - which ended up being successful).
+  // spark.network.auth.rpcTimeout is much lower than spark.network.timeout 
and others -
+  // and so sasl is more susceptible to failures when remote service
+  // (like external shuffle service) is under load: but once it succeeds, we 
do not want to
+  // include it as part of request retries.
+  private int saslRetryCount = 0;
 
   /**
    * Set of all block ids which have not been transferred successfully or with 
a non-IO Exception.
@@ -123,7 +133,7 @@ public class RetryingBlockTransferor {
     this.currentListener = new RetryingBlockTransferListener();
     this.errorHandler = errorHandler;
     this.enableSaslRetries = conf.enableSaslRetries();
-    this.saslTimeoutSeen = false;
+    this.saslRetryCount = 0;
   }
 
   public RetryingBlockTransferor(
@@ -167,7 +177,7 @@ public class RetryingBlockTransferor {
         numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
       if (shouldRetry(e)) {
-        initiateRetry();
+        initiateRetry(e);
       } else {
         for (String bid : blockIdsToTransfer) {
           listener.onBlockTransferFailure(bid, e);
@@ -180,7 +190,10 @@ public class RetryingBlockTransferor {
    * Lightweight method which initiates a retry in a different thread. The 
retry will involve
    * calling transferAllOutstanding() after a configured wait time.
    */
-  private synchronized void initiateRetry() {
+  private synchronized void initiateRetry(Throwable e) {
+    if (enableSaslRetries && e instanceof SaslTimeoutException) {
+      saslRetryCount += 1;
+    }
     retryCount += 1;
     currentListener = new RetryingBlockTransferListener();
 
@@ -203,16 +216,17 @@ public class RetryingBlockTransferor {
     boolean isIOException = e instanceof IOException
       || e.getCause() instanceof IOException;
     boolean isSaslTimeout = enableSaslRetries && e instanceof 
SaslTimeoutException;
-    if (!isSaslTimeout && saslTimeoutSeen) {
-      retryCount = 0;
-      saslTimeoutSeen = false;
+    // If this is a non SASL request failure, reduce earlier SASL failures 
from retryCount
+    // since some subsequent SASL attempt was successful
+    if (!isSaslTimeout && saslRetryCount > 0) {
+      Preconditions.checkState(retryCount >= saslRetryCount,
+        "retryCount must be greater than or equal to saslRetryCount");
+      retryCount -= saslRetryCount;
+      saslRetryCount = 0;
     }
     boolean hasRemainingRetries = retryCount < maxRetries;
     boolean shouldRetry =  (isSaslTimeout || isIOException) &&
         hasRemainingRetries && errorHandler.shouldRetryError(e);
-    if (shouldRetry && isSaslTimeout) {
-      this.saslTimeoutSeen = true;
-    }
     return shouldRetry;
   }
 
@@ -236,9 +250,13 @@ public class RetryingBlockTransferor {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) 
{
           outstandingBlocksIds.remove(blockId);
           shouldForwardSuccess = true;
-          if (saslTimeoutSeen) {
-            retryCount = 0;
-            saslTimeoutSeen = false;
+          // If there were SASL failures earlier, remove them from retryCount, 
as there was
+          // a SASL success (and some other request post bootstrap was also 
successful).
+          if (saslRetryCount > 0) {
+            Preconditions.checkState(retryCount >= saslRetryCount,
+              "retryCount must be greater than or equal to saslRetryCount");
+            retryCount -= saslRetryCount;
+            saslRetryCount = 0;
           }
         }
       }
@@ -256,7 +274,7 @@ public class RetryingBlockTransferor {
       synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) 
{
           if (shouldRetry(exception)) {
-            initiateRetry();
+            initiateRetry(exception);
           } else {
             if (errorHandler.shouldLogError(exception)) {
               logger.error(
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
index 117a9ba08df..31fe6184166 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
@@ -58,10 +58,12 @@ public class RetryingBlockTransferorSuite {
   private static Map<String, String> configMap;
   private static RetryingBlockTransferor _retryingBlockTransferor;
 
+  private static final int MAX_RETRIES = 2;
+
   @Before
   public void initMap() {
     configMap = new HashMap<String, String>() {{
-      put("spark.shuffle.io.maxRetries", "2");
+      put("spark.shuffle.io.maxRetries", Integer.toString(MAX_RETRIES));
       put("spark.shuffle.io.retryWait", "0");
     }};
   }
@@ -309,7 +311,7 @@ public class RetryingBlockTransferorSuite {
     verify(listener, timeout(5000)).onBlockTransferFailure("b0", 
saslTimeoutException);
     verify(listener, times(3)).getTransferType();
     verifyNoMoreInteractions(listener);
-    assert(_retryingBlockTransferor.getRetryCount() == 2);
+    assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
   }
 
   @Test
@@ -341,6 +343,35 @@ public class RetryingBlockTransferorSuite {
     assert(_retryingBlockTransferor.getRetryCount() == 1);
   }
 
+  @Test
+  public void testIOExceptionFailsConnectionEvenWithSaslException()
+    throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    SaslTimeoutException saslExceptionInitial = new 
SaslTimeoutException("initial",
+            new TimeoutException());
+    SaslTimeoutException saslExceptionFinal = new SaslTimeoutException("final",
+            new TimeoutException());
+    IOException ioException = new IOException();
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+            ImmutableMap.of("b0", saslExceptionInitial),
+            ImmutableMap.of("b0", ioException),
+            ImmutableMap.of("b0", saslExceptionInitial),
+            ImmutableMap.of("b0", ioException),
+            ImmutableMap.of("b0", saslExceptionFinal),
+            // will not get invoked because the connection fails
+            ImmutableMap.of("b0", ioException),
+            // will not get invoked
+            ImmutableMap.of("b0", block0)
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", 
saslExceptionFinal);
+    verify(listener, atLeastOnce()).getTransferType();
+    verifyNoMoreInteractions(listener);
+    assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
+  }
+
   /**
    * Performs a set of interactions in response to block requests from a 
RetryingBlockFetcher.
    * Each interaction is a Map from BlockId to either ManagedBuffer or 
Exception. This interaction


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to