Ngone51 commented on a change in pull request #33613:
URL: https://github.com/apache/spark/pull/33613#discussion_r683208602



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A special RuntimeException thrown when shuffle service experiences a 
non-fatal failure
+ * with handling block push requests with push-based shuffle. Due to the 
best-effort nature
+ * of push-based shuffle, there are cases where the exceptions gets thrown 
under certain
+ * relatively common cases such as when a pushed block is received after the 
corresponding
+ * shuffle is merge finalized or when a pushed block experiences merge 
collision. Under these
+ * scenarios, we throw this special RuntimeException.
+ */
+public class BlockPushNonFatalFailure extends RuntimeException {
+
+  /**
+   * The error code of the failure, encoded as a ByteBuffer to be responded 
back to the client.
+   * Instead of responding a RPCFailure with the exception stack trace as the 
payload,
+   * which makes checking the content of the exception very tedious on the 
client side,
+   * we can respond a proper RPCResponse to make it more robust and efficient. 
This
+   * field is only set on the shuffle server side when the exception is 
originally generated.
+   */
+  private ByteBuffer response;
+
+  /**
+   * The error code of the failure. This field is only set on the client side 
when a
+   * BlockPushNonFatalFailure is recreated from the error code received from 
the server.
+   */
+  private ReturnCode returnCode;
+
+  public BlockPushNonFatalFailure(ByteBuffer response) {
+    this.response = response;
+  }
+
+  public BlockPushNonFatalFailure(ReturnCode returnCode) {
+    this.returnCode = returnCode;
+  }
+
+  /**
+   * Since this type of exception is used to only convey the error code, we 
reduce the
+   * exception initialization overhead by skipping filling the stack trace.
+   */
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return this;
+  }
+
+  public ByteBuffer getResponse() {
+    return response;
+  }
+
+  public ReturnCode getReturnCode() {
+    return returnCode;
+  }
+
+  public enum ReturnCode {
+    /**
+     * Indicate the case of a successful merge of a pushed block.
+     */
+    SUCCESS(0),

Review comment:
       `UNKNOWN` sgtm.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -164,10 +182,7 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
           // current incoming one
           int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
-            throw new StaleBlockPushException(String.format("Rejecting shuffle 
blocks push request"
-              + " for shuffle %s with shuffleMergeId %s for application %s_%s 
as a higher"
-              + " shuffleMergeId %s request is already seen", shuffleId, 
shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));

Review comment:
       log the error message?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -164,10 +182,7 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
           // current incoming one
           int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
-            throw new StaleBlockPushException(String.format("Rejecting shuffle 
blocks push request"
-              + " for shuffle %s with shuffleMergeId %s for application %s_%s 
as a higher"
-              + " shuffleMergeId %s request is already seen", shuffleId, 
shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));

Review comment:
       Could we also log for the case of:
   ```
   if (dataFile.exists()) {
     return null;
   }
   ```

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -164,10 +182,7 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
           // current incoming one
           int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
-            throw new StaleBlockPushException(String.format("Rejecting shuffle 
blocks push request"
-              + " for shuffle %s with shuffleMergeId %s for application %s_%s 
as a higher"
-              + " shuffleMergeId %s request is already seen", shuffleId, 
shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));

Review comment:
       Could we also log for the case of:
   ```
   if (dataFile.exists()) {
     return null;
   }
   ```
   
   Btw, is it equivalent to throw 
`BlockPushNonFatalFailure(ReturnCode.TOO_LATE_BLOCK_PUSH)` rather than return 
null? If it's equivalent, I think throwing exception is more consistent with 
the case of "stale push"

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -385,14 +397,24 @@ public StreamCallbackWithID 
receiveBlockDataAsStream(PushBlockStream msg) {
           + "with the current attempt id %s stored in shuffle service for 
application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
+    // Use string concatenation here to avoid the overhead with String.format 
on every
+    // pushed block.
+    final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + 
"_"
+      + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + 
msg.reduceId;
     // Retrieve merged shuffle file metadata
     AppShufflePartitionInfo partitionInfoBeforeCheck;
+    boolean isStaleBlock = false;
+    boolean isTooLate = false;

Review comment:
       Why not just use the `ReturnCode` filed directly?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -77,42 +81,55 @@ public OneForOneBlockPusher(
 
     @Override
     public void onSuccess(ByteBuffer response) {
-      // On receipt of a successful block push
-      listener.onBlockPushSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+      BlockPushReturnCode returnCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(response);
+      // If the return code is not SUCCESS, the server has responded some 
error code. Handle
+      // the error accordingly.
+      if (returnCode.returnCode != ReturnCode.SUCCESS.id()) {
+        checkAndFailRemainingBlocks(index, new BlockPushNonFatalFailure(
+          BlockPushNonFatalFailure.getReturnCode(returnCode.returnCode)));
+      } else {
+        // On receipt of a successful block push
+        listener.onBlockPushSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+      }
     }
 
     @Override
     public void onFailure(Throwable e) {
-      // Since block push is best effort, i.e., if we encounter a block push 
failure that's still
-      // retriable according to ErrorHandler (not a connection exception and 
the block is not too
-      // late), we should not fail all remaining block pushes even though
-      // RetryingBlockTransferor might consider this failure not retriable 
(exceeding max retry
-      // count etc). The best effort nature makes block push tolerable of a 
partial completion.
-      // Thus, we only fail the block that's actually failed in this case. 
Note that, on the
-      // RetryingBlockTransferor side, if retry is initiated, it would still 
invalidate the
-      // previous active retry listener, and retry pushing all outstanding 
blocks. However, since
-      // the blocks to be pushed are preloaded into memory and the first 
attempt of pushing these
-      // blocks might have already succeeded, retry pushing all the 
outstanding blocks should be
-      // very cheap (on the client side, the block data is in memory; on the 
server side, the block
-      // will be recognized as a duplicate which triggers noop handling). 
Here, by failing only the
-      // one block that's actually failed, we are essentially preventing 
forwarding unnecessary
-      // block push failures to the parent listener of the retry listener.
-      //
-      // Take the following as an example. For the common exception during 
block push handling,
-      // i.e. block collision, it is considered as retriable by ErrorHandler 
but not retriable
-      // by RetryingBlockTransferor. When we encounter a failure of this type, 
we only fail the
-      // one block encountering this issue not the remaining blocks in the 
same batch. On the
-      // RetryingBlockTransferor side, since this exception is considered as 
not retriable, it
-      // would immediately invoke parent listener's onBlockTransferFailure. 
However, the remaining
-      // blocks in the same batch would remain current and active and they 
won't be impacted by
-      // this exception.
-      if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
-        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 
1);
-        failRemainingBlocks(targetBlockId, e);
-      } else {
-        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, 
blockIds.length);
-        failRemainingBlocks(targetBlockId, e);
-      }
+      checkAndFailRemainingBlocks(index, e);
+    }
+  }
+
+  private void checkAndFailRemainingBlocks(int index, Throwable e) {
+    // Since block push is best effort, i.e., if we encounter a block push 
failure that's still
+    // retriable according to ErrorHandler (not a connection exception and the 
block is not too
+    // late), we should not fail all remaining block pushes even though
+    // RetryingBlockTransferor might consider this failure not retriable 
(exceeding max retry
+    // count etc). The best effort nature makes block push tolerable of a 
partial completion.
+    // Thus, we only fail the block that's actually failed in this case. Note 
that, on the
+    // RetryingBlockTransferor side, if retry is initiated, it would still 
invalidate the
+    // previous active retry listener, and retry pushing all outstanding 
blocks. However, since
+    // the blocks to be pushed are preloaded into memory and the first attempt 
of pushing these
+    // blocks might have already succeeded, retry pushing all the outstanding 
blocks should be
+    // very cheap (on the client side, the block data is in memory; on the 
server side, the block
+    // will be recognized as a duplicate which triggers noop handling). Here, 
by failing only the
+    // one block that's actually failed, we are essentially preventing 
forwarding unnecessary
+    // block push failures to the parent listener of the retry listener.
+    //
+    // Take the following as an example. For the common exception during block 
push handling,
+    // i.e. block collision, it is considered as retriable by ErrorHandler but 
not retriable
+    // by RetryingBlockTransferor. When we encounter a failure of this type, 
we only fail the
+    // one block encountering this issue not the remaining blocks in the same 
batch. On the
+    // RetryingBlockTransferor side, since this exception is considered as not 
retriable, it
+    // would immediately invoke parent listener's onBlockTransferFailure. 
However, the remaining
+    // blocks in the same batch would remain current and active and they won't 
be impacted by
+    // this exception.

Review comment:
       Shall we log the specific error for each case? Also, including whether 
we'd retry the block or not.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -213,7 +213,15 @@ public void onData(String streamId, ByteBuffer buf) throws 
IOException {
         public void onComplete(String streamId) throws IOException {
            try {
              streamHandler.onComplete(streamId);
-             callback.onSuccess(ByteBuffer.allocate(0));
+             callback.onSuccess(streamHandler.getCompletionResponse());
+           } catch (BlockPushNonFatalFailure ex) {

Review comment:
       Shall we catch `BlockPushNonFatalFailure` inside the `onComplete`? We 
can have a response filed inside `streamHandler` to store the response value. 
In this way,  `getCompletionResponse` can handle the success and error cases 
consistently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to