akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070170721


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws 
IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", 
saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", 
saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   retryCount is not exposed from `RetryingBlockTransferor`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to