mridulm commented on a change in pull request #33613:
URL: https://github.com/apache/spark/pull/33613#discussion_r683089989



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A special RuntimeException thrown when shuffle service experiences a 
non-fatal failure
+ * with handling block push requests with push-based shuffle. Due to the 
best-effort nature
+ * of push-based shuffle, there are cases where the exceptions gets thrown 
under certain
+ * relatively common cases such as when a pushed block is received after the 
corresponding
+ * shuffle is merge finalized or when a pushed block experiences merge 
collision. Under these
+ * scenarios, we throw this special RuntimeException.
+ */
+public class BlockPushNonFatalFailure extends RuntimeException {
+
+  /**
+   * The error code of the failure, encoded as a ByteBuffer to be responded 
back to the client.
+   * Instead of responding a RPCFailure with the exception stack trace as the 
payload,
+   * which makes checking the content of the exception very tedious on the 
client side,
+   * we can respond a proper RPCResponse to make it more robust and efficient. 
This
+   * field is only set on the shuffle server side when the exception is 
originally generated.
+   */
+  private ByteBuffer response;
+
+  /**
+   * The error code of the failure. This field is only set on the client side 
when a
+   * BlockPushNonFatalFailure is recreated from the error code received from 
the server.
+   */
+  private ReturnCode returnCode;
+
+  public BlockPushNonFatalFailure(ByteBuffer response) {
+    this.response = response;
+  }
+
+  public BlockPushNonFatalFailure(ReturnCode returnCode) {
+    this.returnCode = returnCode;
+  }
+
+  /**
+   * Since this type of exception is used to only convey the error code, we 
reduce the
+   * exception initialization overhead by skipping filling the stack trace.
+   */
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return this;
+  }
+
+  public ByteBuffer getResponse() {

Review comment:
       Add `Preconditions.checkNotNull` here and in `getReturnCode` to ensure 
incorrect usage is flagged.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -218,7 +218,7 @@ public void getMergedBlockMeta(
           public void onSuccess(int numChunks, ManagedBuffer buffer) {
             logger.trace("Successfully got merged block meta for shuffleId {} 
shuffleMergeId {}"
               + " reduceId {}", shuffleId, shuffleMergeId, reduceId);
-            listener.onSuccess(shuffleId, reduceId, shuffleMergeId,
+            listener.onSuccess(shuffleId, shuffleMergeId, reduceId,

Review comment:
       Nice catch !

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -669,6 +696,11 @@ public String getID() {
       return streamId;
     }
 
+    @Override
+    public ByteBuffer getCompletionResponse() {
+      return new BlockPushReturnCode(ReturnCode.SUCCESS.id()).toByteBuffer();
+    }
+

Review comment:
       nit:
   Given how frequently this will be called - make this into a static final 
read only bytebuffer we duplicate each time it is required ?
   
   Something like:
   
   ```
       private static final ByteBuffer SUCCESS_BUFFER =
               new 
BlockPushReturnCode(ReturnCode.SUCCESS.id()).toByteBuffer().asReadOnlyBuffer();
   ```
   
   and getCompletionResponse -> `SUCCESS_BUFFER.duplicate()`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -373,9 +386,10 @@ void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    final String streamId = String.format("%s_%d_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.shuffleMergeId,
-      msg.mapIndex, msg.reduceId);
+    // Use string concatenation here to avoid the overhead with String.format 
on every
+    // pushed block.
+    final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + 
"_"
+      + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + 
msg.reduceId;
     if (appShuffleInfo.attemptId != msg.appAttemptId) {

Review comment:
       Move this to after the attemptId check.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A special RuntimeException thrown when shuffle service experiences a 
non-fatal failure
+ * with handling block push requests with push-based shuffle. Due to the 
best-effort nature
+ * of push-based shuffle, there are cases where the exceptions gets thrown 
under certain
+ * relatively common cases such as when a pushed block is received after the 
corresponding
+ * shuffle is merge finalized or when a pushed block experiences merge 
collision. Under these
+ * scenarios, we throw this special RuntimeException.
+ */
+public class BlockPushNonFatalFailure extends RuntimeException {
+
+  /**
+   * The error code of the failure, encoded as a ByteBuffer to be responded 
back to the client.
+   * Instead of responding a RPCFailure with the exception stack trace as the 
payload,
+   * which makes checking the content of the exception very tedious on the 
client side,
+   * we can respond a proper RPCResponse to make it more robust and efficient. 
This
+   * field is only set on the shuffle server side when the exception is 
originally generated.
+   */
+  private ByteBuffer response;
+
+  /**
+   * The error code of the failure. This field is only set on the client side 
when a
+   * BlockPushNonFatalFailure is recreated from the error code received from 
the server.
+   */
+  private ReturnCode returnCode;
+
+  public BlockPushNonFatalFailure(ByteBuffer response) {
+    this.response = response;
+  }
+
+  public BlockPushNonFatalFailure(ReturnCode returnCode) {
+    this.returnCode = returnCode;
+  }
+
+  /**
+   * Since this type of exception is used to only convey the error code, we 
reduce the
+   * exception initialization overhead by skipping filling the stack trace.
+   */
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return this;
+  }
+
+  public ByteBuffer getResponse() {
+    return response;
+  }
+
+  public ReturnCode getReturnCode() {
+    return returnCode;
+  }
+
+  public enum ReturnCode {
+    /**
+     * Indicate the case of a successful merge of a pushed block.
+     */
+    SUCCESS(0),

Review comment:
       Add a marker at index `0` for `UNKNOWN(0)` or `INVALID(0)` - to help 
with potential incompatible evolutions.
   Thoughts @Ngone51 ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -119,7 +122,20 @@ public RemoteBlockPushResolver(TransportConf conf) {
       .maximumWeight(conf.mergedIndexCacheSize())
       .weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> 
indexInfo.getSize())
       .build(ShuffleIndexInformation::new);
-    this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
+    this.errorHandler = createErrorHandler();
+  }
+
+  @VisibleForTesting
+  protected ErrorHandler.BlockPushErrorHandler createErrorHandler() {
+    return new ErrorHandler.BlockPushErrorHandler() {
+      // Explicitly use a shuffle service side error handler for handling 
exceptions.
+      // BlockPushNonException on the server side only has the response field 
set. It
+      // might require different handling logic compared with a client side 
error handler.
+      @Override
+      public boolean shouldLogError(Throwable t) {
+        return !(t instanceof BlockPushNonFatalFailure);
+      }
+    };

Review comment:
       nit: Make this a static final field and return same instance always

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -379,9 +399,10 @@ void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    final String streamId = String.format("%s_%d_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.shuffleMergeId,
-      msg.mapIndex, msg.reduceId);
+    // Use string concatenation here to avoid the overhead with String.format 
on every

Review comment:
       +1 to avoid `String.format` for simple patterns (unless it is being 
lazily evaluated on demand).

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -77,42 +80,55 @@ public OneForOneBlockPusher(
 
     @Override
     public void onSuccess(ByteBuffer response) {
-      // On receipt of a successful block push
-      listener.onBlockPushSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+      BlockPushReturnCode returnCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(response);
+      // If the return code is not 0, the server has responded some error 
code. Handle
+      // the error accordingly.
+      if (returnCode.returnCode > 0) {

Review comment:
       `ReturnCode.SUCCESS != returnCode.returnCode`

##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##########
@@ -242,10 +244,16 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         handleResult(PushResult(blockId, exception))
       }
     }
+    // In addition to randomizing the order of the push requests, further 
randomize the order
+    // of blocks within the push request to further reduce the likelihood of 
shuffle server side
+    // collision of pushed blocks. This does not increase the cost of reading 
unmerged shuffle
+    // files on the executor side, because we are still reading MB-size chunks 
and only randomize
+    // the in-memory sliced buffers post reading.
+    val blocksToPush = Utils.randomize(blockIds.zip(
+      sliceReqBufferIntoBlockBuffers(request.reqBuffer, 
request.blocks.map(_._2))))

Review comment:
       Move this to a different PR - this is not related to the current change.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -781,8 +819,10 @@ public void onData(String streamId, ByteBuffer buf) throws 
IOException {
       // to disk as well. This way, we avoid having to buffer the entirety of 
every blocks in
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
-        if 
(isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-            partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
+        if (isStale(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
+            partitionInfo.shuffleMergeId) ||
+            isTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
+            partitionInfo.reduceId)) {

Review comment:
       Fetch `appShuffleInfo.shuffles.get(partitionInfo.shuffleId)` and use 
same instance for both checks.
   This pattern is there in `onComplete` and `onFailure` as well.

##########
File path: pusher.diff
##########
@@ -0,0 +1,351 @@
+diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala

Review comment:
       Looks like it @zhuqi-lucas - please remove this file @Victsm 

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -436,17 +456,16 @@ public StreamCallbackWithID 
receiveBlockDataAsStream(PushBlockStream msg) {
     // getting killed. When this happens, we need to distinguish the duplicate 
blocks as they
     // arrive. More details on this is explained in later comments.
 
-    // Track if the block is received after shuffle merge finalized or from an 
older
-    // shuffleMergeId attempt.
-    final boolean isStaleBlockOrTooLate = partitionInfoBeforeCheck == null;
     // Check if the given block is already merged by checking the bitmap 
against the given map
     // index
-    final AppShufflePartitionInfo partitionInfo = isStaleBlockOrTooLate ? null 
:
+    final AppShufflePartitionInfo partitionInfo = isStaleBlock || isTooLate ? 
null :
       partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : 
partitionInfoBeforeCheck;
     if (partitionInfo != null) {
       return new PushBlockStreamCallback(
         this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
     } else {
+      final boolean finalIsTooLate = isTooLate;
+      final boolean finalIsStaleBlock = isStaleBlock;

Review comment:
       ```suggestion
         final BlockPushNonFatalFailure exception = isTooLate ? createTooLate 
.. : (isStaleBlock ? createStale ..  : null);
   ```
   
   and `if (null != exception) throw exception` in onComplete handler.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Objects;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ * Error code indicating a non-fatal failure of a block push request.
+ * Due to the best-effort nature of push-based shuffle, these failures
+ * do not impact the completion of the block push process. The list of
+ * such errors is in
+ * {@link org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode}.
+ *
+ * @since 3.2.0
+ */
+public class BlockPushReturnCode extends BlockTransferMessage {
+  public final byte returnCode;
+
+  public BlockPushReturnCode(byte returnCode) {
+    this.returnCode = returnCode;

Review comment:
       Validate if the returnCode is supported.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to