mridulm commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1320766012
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -177,20 +178,27 @@ private void transferAllOutstanding() {
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
if (shouldRetry(e)) {
- initiateRetry(e);
- } else {
- for (String bid : blockIdsToTransfer) {
- listener.onBlockTransferFailure(bid, e);
+ try {
+ initiateRetry(e);
+ return;
+ } catch (Throwable t) {
+ logger.error("Exception while trying to initiate retry", t);
Review Comment:
Note: I was considering if `e.addSuppressed(t);` will help here or not, but
finally decided it might not be very useful
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -177,20 +178,27 @@ private void transferAllOutstanding() {
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
if (shouldRetry(e)) {
- initiateRetry(e);
- } else {
- for (String bid : blockIdsToTransfer) {
- listener.onBlockTransferFailure(bid, e);
+ try {
+ initiateRetry(e);
+ return;
+ } catch (Throwable t) {
+ logger.error("Exception while trying to initiate retry", t);
}
}
+
+ // retry is not possible, so fail remaining blocks
+ for (String bid : blockIdsToTransfer) {
+ listener.onBlockTransferFailure(bid, e);
+ }
}
}
/**
* Lightweight method which initiates a retry in a different thread. The
retry will involve
* calling transferAllOutstanding() after a configured wait time.
*/
- private synchronized void initiateRetry(Throwable e) {
+ @VisibleForTesting
+ public synchronized void initiateRetry(Throwable e) {
Review Comment:
```suggestion
synchronized void initiateRetry(Throwable e) {
```
##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -365,13 +365,32 @@ public void
testIOExceptionFailsConnectionEvenWithSaslException()
ImmutableMap.of("b0", block0)
);
configMap.put("spark.shuffle.sasl.enableRetries", "true");
- performInteractions(interactions, listener);
+ performInteractions(interactions, listener, false);
verify(listener, timeout(5000)).onBlockTransferFailure("b0",
saslExceptionFinal);
verify(listener, atLeastOnce()).getTransferType();
verifyNoMoreInteractions(listener);
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
}
+ @Test
+ public void testRetryInitiationFailure() throws IOException,
InterruptedException {
+ BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
+ // IOException will initiate a retry, but the initiation will fail
+ ImmutableMap.<String, Object>builder()
+ .put("b0", new IOException("Connection failed or something"))
+ .put("b1", block1)
+ .build()
+ );
+
+ performInteractions(interactions, listener, true);
Review Comment:
Using the proposed changes below, this will become:
```suggestion
configureInteractions(interactions, listener);
_retryingBlockTransferor = spy(_retryingBlockTransferor);
// Throw an OOM when initiating retries.
doThrow(OutOfMemoryError.class).when(_retryingBlockTransferor).initiateRetry(any());
// Override listener, so that it delegates to the spied instance and not
the original class.
_retryingBlockTransferor.setCurrentListener(_retryingBlockTransferor.new
RetryingBlockTransferListener());
_retryingBlockTransferor.start();
```
##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -80,7 +80,7 @@ public void testNoFailures() throws IOException,
InterruptedException {
.build()
);
- performInteractions(interactions, listener);
+ performInteractions(interactions, listener, false);
Review Comment:
Let us revert all these `performInteractions` related diffs, more below.
##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -381,9 +400,12 @@ public void
testIOExceptionFailsConnectionEvenWithSaslException()
* If multiple interactions are supplied, they will be used in order. This
is useful for encoding
* retries -- the first interaction may include an IOException, which causes
a retry of some
* subset of the original blocks in a second interaction.
+ *
+ * If mockInitiateRetryFailure is set to true, we mock initiateRetry() and
throw an exception.
*/
private static void performInteractions(List<? extends Map<String, Object>>
interactions,
- BlockFetchingListener listener)
+ BlockFetchingListener listener,
+ boolean mockInitiateRetryFailure)
Review Comment:
Split `performInteractions` into `configureInteractions` and
`performInteractions`.
`configureInteractions` is existing `performInteractions` in master, but
without the `_retryingBlockTransferor.start();`
`performInteractions` will be:
```
private static void performInteractions(List<? extends Map<String,
Object>> interactions,
BlockFetchingListener listener)
throws IOException, InterruptedException {
configureInteractions(interactions, listener);
_retryingBlockTransferor.start();
}
```
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -149,6 +149,7 @@ public RetryingBlockTransferor(
* in the event of transient IOExceptions.
*/
public void start() {
+ currentListener = new RetryingBlockTransferListener();
Review Comment:
Instead, let us explicitly introduce a package-private setter for use in
tests, and remove this change:
```
@VisibleForTesting
synchronized void setCurrentListener(RetryingBlockTransferListener listener)
{
this.currentListener = listener;
}
```
See more on the Suite on how to do this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]