otterc commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070168146


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws 
IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", 
saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", 
saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   We should verify the retry count is reset in the tests once  the SASL 
request is successful.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -187,13 +195,20 @@ private synchronized void initiateRetry() {
 
   /**
    * Returns true if we should retry due a block transfer failure. We will 
retry if and only if
-   * the exception was an IOException and we haven't retried 'maxRetries' 
times already.
+   * the exception was an IOException or SaslTimeoutException and we haven't 
retried
+   * 'maxRetries' times already.
    */
   private synchronized boolean shouldRetry(Throwable e) {
     boolean isIOException = e instanceof IOException
       || e.getCause() instanceof IOException;
+    boolean isSaslTimeout = enableSaslRetries && e instanceof 
SaslTimeoutException;
     boolean hasRemainingRetries = retryCount < maxRetries;
-    return isIOException && hasRemainingRetries && 
errorHandler.shouldRetryError(e);
+    boolean shouldRetry =  (isSaslTimeout || isIOException) &&
+        hasRemainingRetries && errorHandler.shouldRetryError(e);
+    if (shouldRetry && isSaslTimeout) {
+      this.isCurrentSaslTimeout = true;
+    }

Review Comment:
   Should we not reset `isCurrentSaslTimeout=false` and `retryCount=0` when 
there is an exception but it's not a `SaslTimeout`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -85,6 +86,8 @@ void createAndStart(String[] blockIds, BlockTransferListener 
listener)
   /** Number of times we've attempted to retry so far. */
   private int retryCount = 0;
 
+  private boolean isCurrentSaslTimeout;

Review Comment:
   Nit: rename to "saslTimeoutSeen" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to