Ngone51 commented on a change in pull request #33613:
URL: https://github.com/apache/spark/pull/33613#discussion_r683251551
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -77,42 +81,55 @@ public OneForOneBlockPusher(
@Override
public void onSuccess(ByteBuffer response) {
- // On receipt of a successful block push
- listener.onBlockPushSuccess(blockId, new
NioManagedBuffer(ByteBuffer.allocate(0)));
+ BlockPushReturnCode returnCode =
+ (BlockPushReturnCode)
BlockTransferMessage.Decoder.fromByteBuffer(response);
+ // If the return code is not SUCCESS, the server has responded some
error code. Handle
+ // the error accordingly.
+ if (returnCode.returnCode != ReturnCode.SUCCESS.id()) {
+ checkAndFailRemainingBlocks(index, new BlockPushNonFatalFailure(
+ BlockPushNonFatalFailure.getReturnCode(returnCode.returnCode)));
+ } else {
+ // On receipt of a successful block push
+ listener.onBlockPushSuccess(blockId, new
NioManagedBuffer(ByteBuffer.allocate(0)));
+ }
}
@Override
public void onFailure(Throwable e) {
- // Since block push is best effort, i.e., if we encounter a block push
failure that's still
- // retriable according to ErrorHandler (not a connection exception and
the block is not too
- // late), we should not fail all remaining block pushes even though
- // RetryingBlockTransferor might consider this failure not retriable
(exceeding max retry
- // count etc). The best effort nature makes block push tolerable of a
partial completion.
- // Thus, we only fail the block that's actually failed in this case.
Note that, on the
- // RetryingBlockTransferor side, if retry is initiated, it would still
invalidate the
- // previous active retry listener, and retry pushing all outstanding
blocks. However, since
- // the blocks to be pushed are preloaded into memory and the first
attempt of pushing these
- // blocks might have already succeeded, retry pushing all the
outstanding blocks should be
- // very cheap (on the client side, the block data is in memory; on the
server side, the block
- // will be recognized as a duplicate which triggers noop handling).
Here, by failing only the
- // one block that's actually failed, we are essentially preventing
forwarding unnecessary
- // block push failures to the parent listener of the retry listener.
- //
- // Take the following as an example. For the common exception during
block push handling,
- // i.e. block collision, it is considered as retriable by ErrorHandler
but not retriable
- // by RetryingBlockTransferor. When we encounter a failure of this type,
we only fail the
- // one block encountering this issue not the remaining blocks in the
same batch. On the
- // RetryingBlockTransferor side, since this exception is considered as
not retriable, it
- // would immediately invoke parent listener's onBlockTransferFailure.
However, the remaining
- // blocks in the same batch would remain current and active and they
won't be impacted by
- // this exception.
- if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
- String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index +
1);
- failRemainingBlocks(targetBlockId, e);
- } else {
- String[] targetBlockId = Arrays.copyOfRange(blockIds, index,
blockIds.length);
- failRemainingBlocks(targetBlockId, e);
- }
+ checkAndFailRemainingBlocks(index, e);
+ }
+ }
+
+ private void checkAndFailRemainingBlocks(int index, Throwable e) {
+ // Since block push is best effort, i.e., if we encounter a block push
failure that's still
+ // retriable according to ErrorHandler (not a connection exception and the
block is not too
+ // late), we should not fail all remaining block pushes even though
+ // RetryingBlockTransferor might consider this failure not retriable
(exceeding max retry
+ // count etc). The best effort nature makes block push tolerable of a
partial completion.
+ // Thus, we only fail the block that's actually failed in this case. Note
that, on the
+ // RetryingBlockTransferor side, if retry is initiated, it would still
invalidate the
+ // previous active retry listener, and retry pushing all outstanding
blocks. However, since
+ // the blocks to be pushed are preloaded into memory and the first attempt
of pushing these
+ // blocks might have already succeeded, retry pushing all the outstanding
blocks should be
+ // very cheap (on the client side, the block data is in memory; on the
server side, the block
+ // will be recognized as a duplicate which triggers noop handling). Here,
by failing only the
+ // one block that's actually failed, we are essentially preventing
forwarding unnecessary
+ // block push failures to the parent listener of the retry listener.
+ //
+ // Take the following as an example. For the common exception during block
push handling,
+ // i.e. block collision, it is considered as retriable by ErrorHandler but
not retriable
+ // by RetryingBlockTransferor. When we encounter a failure of this type,
we only fail the
+ // one block encountering this issue not the remaining blocks in the same
batch. On the
+ // RetryingBlockTransferor side, since this exception is considered as not
retriable, it
+ // would immediately invoke parent listener's onBlockTransferFailure.
However, the remaining
+ // blocks in the same batch would remain current and active and they won't
be impacted by
+ // this exception.
Review comment:
Shall we log the specific error for each case? Also, including whether
we'd retry the block or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]