Ngone51 commented on a change in pull request #33613:
URL: https://github.com/apache/spark/pull/33613#discussion_r682262016
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -77,42 +80,55 @@ public OneForOneBlockPusher(
@Override
public void onSuccess(ByteBuffer response) {
- // On receipt of a successful block push
- listener.onBlockPushSuccess(blockId, new
NioManagedBuffer(ByteBuffer.allocate(0)));
+ // If the response is non-empty, the server has responded some error
code. Decode
+ // it and handle the error accordingly.
+ if (response.hasRemaining()) {
Review comment:
Shall we add a new code for the successful response? I think that would
be more reboust. We can rename `PushBlockNonFatalErrorCode` to
`BlockPushResponse` if we go this way.
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A special RuntimeException thrown when shuffle service experiences a
non-fatal failure
+ * with handling block push requests with push-based shuffle. Due to the
best-effort nature
+ * of push-based shuffle, there are cases where the exceptions gets thrown
under certain
+ * relatively common cases such as when a pushed block is received after the
corresponding
+ * shuffle is merge finalized or when a pushed block experiences merge
collision. Under these
+ * scenarios, we throw this special RuntimeException.
+ */
+public class BlockPushNonFatalFailure extends RuntimeException {
+
+ /**
+ * The error code of the failure, encoded as a ByteBuffer to be responded
back to the client.
+ * Instead of responding a RPCFailure with the exception stack trace as the
payload,
+ * which makes checking the content of the exception very tedious on the
client side,
+ * we can respond a proper RPCResponse to make it more robust and efficient.
This
+ * field is only set on the shuffle server side when the exception is
originally generated.
+ */
+ private ByteBuffer response;
+
+ /**
+ * The error code of the failure. This field is only set on the client side
when a
+ * BlockPushNonFatalFailure is recreated from the error code received from
the server.
+ */
+ private ErrorCode errorCode;
+
+ public BlockPushNonFatalFailure(ByteBuffer response) {
+ this.response = response;
+ }
+
+ public BlockPushNonFatalFailure(ErrorCode errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Since this type of exception is used to only convey the error code, we
reduce the
+ * exception initialization overhead by skipping filling the stack trace.
+ */
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+
+ public ByteBuffer getResponse() {
+ return response;
+ }
Review comment:
```suggestion
public ByteBuffer toByteBuffer() {
return new PushBlockNonFatalErrorCode(this.errorCode).toByteBuffer();
}
```
And we don't need the constructor which accepts `ByteBuffer`.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -471,9 +488,10 @@ public void onData(String streamId, ByteBuffer buf) {
public void onComplete(String streamId) {
if (isStaleBlockOrTooLate) {
// Throw an exception here so the block data is drained from
channel and server
- // responds RpcFailure to the client.
- throw new RuntimeException(String.format("Block %s %s", streamId,
-
ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+ // responds the error code to the client.
+ throw new BlockPushNonFatalFailure(
+ new
PushBlockNonFatalErrorCode(ErrorCode.TOO_LATE_OR_STALE_BLOCK_PUSH.id())
Review comment:
Shall we use the individual error codes for "too late push" and "stale
push" separately?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]