otterc commented on code in PR #38959: URL: https://github.com/apache/spark/pull/38959#discussion_r1070168146
########## common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java: ########## @@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException verifyNoMoreInteractions(listener); } + @Test + public void testSaslTimeoutFailure() throws IOException, InterruptedException { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + TimeoutException timeoutException = new TimeoutException(); + SaslTimeoutException saslTimeoutException = + new SaslTimeoutException(timeoutException); + List<? extends Map<String, Object>> interactions = Arrays.asList( + ImmutableMap.<String, Object>builder() + .put("b0", saslTimeoutException) + .build(), + ImmutableMap.<String, Object>builder() + .put("b0", block0) + .build() + ); + + performInteractions(interactions, listener); + + verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException); + verify(listener).getTransferType(); + verifyNoMoreInteractions(listener); + } + + @Test + public void testRetryOnSaslTimeout() throws IOException, InterruptedException { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + + List<? extends Map<String, Object>> interactions = Arrays.asList( + // SaslTimeout will cause a retry. Since b0 fails, we will retry both. + ImmutableMap.<String, Object>builder() + .put("b0", new SaslTimeoutException(new TimeoutException())) + .build(), + ImmutableMap.<String, Object>builder() + .put("b0", block0) + .build() + ); + configMap.put("spark.shuffle.sasl.enableRetries", "true"); + performInteractions(interactions, listener); + + verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0); + verify(listener).getTransferType(); + verifyNoMoreInteractions(listener); + } + + @Test + public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + TimeoutException timeoutException = new TimeoutException(); + SaslTimeoutException saslTimeoutException = + new SaslTimeoutException(timeoutException); + List<ImmutableMap<String, Object>> interactions = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + interactions.add( + ImmutableMap.<String, Object>builder() + .put("b0", saslTimeoutException) + .build() + ); + } + configMap.put("spark.shuffle.sasl.enableRetries", "true"); + performInteractions(interactions, listener); + verify(listener, times(3)).getTransferType(); + verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException); + verifyNoMoreInteractions(listener); + } Review Comment: We should verify the retry count is reset in the tests once the SASL request is successful. ########## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java: ########## @@ -187,13 +195,20 @@ private synchronized void initiateRetry() { /** * Returns true if we should retry due a block transfer failure. We will retry if and only if - * the exception was an IOException and we haven't retried 'maxRetries' times already. + * the exception was an IOException or SaslTimeoutException and we haven't retried + * 'maxRetries' times already. */ private synchronized boolean shouldRetry(Throwable e) { boolean isIOException = e instanceof IOException || e.getCause() instanceof IOException; + boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException; boolean hasRemainingRetries = retryCount < maxRetries; - return isIOException && hasRemainingRetries && errorHandler.shouldRetryError(e); + boolean shouldRetry = (isSaslTimeout || isIOException) && + hasRemainingRetries && errorHandler.shouldRetryError(e); + if (shouldRetry && isSaslTimeout) { + this.isCurrentSaslTimeout = true; + } Review Comment: Should we not reset `isCurrentSaslTimeout=false` and `retryCount=0` when there is an exception but it's not a `SaslTimeout` ########## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java: ########## @@ -85,6 +86,8 @@ void createAndStart(String[] blockIds, BlockTransferListener listener) /** Number of times we've attempted to retry so far. */ private int retryCount = 0; + private boolean isCurrentSaslTimeout; Review Comment: Nit: rename to "saslTimeoutSeen" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org