This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 2fb62e0  [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started 
error message in BlockPushErrorHandler in client
2fb62e0 is described below

commit 2fb62e0e3d40835a3e61fcf210e0772cd0d21a68
Author: zhuqi-lucas <821684...@qq.com>
AuthorDate: Mon Aug 16 13:58:48 2021 -0500

    [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message 
in BlockPushErrorHandler in client
    
    ### What changes were proposed in this pull request?
    Add a new type of error message in BlockPushErrorHandler which indicates 
the PushblockStream message is received after a new application attempt has 
started. This error message should be correctly handled in client without 
retrying the block push.
    
    ### Why are the changes needed?
    When we get a block push failure because of the too old attempt, we will 
not retry pushing the block nor log the exception on the client side.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add the corresponding unit test.
    
    Closes #33617 from zhuqi-lucas/master.
    
    Authored-by: zhuqi-lucas <821684...@qq.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 05cd5f97c3dea25dacdbdb319243cdab9667c774)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../network/server/BlockPushNonFatalFailure.java   | 22 +++++++++++++++-
 .../network/server/TransportRequestHandler.java    | 13 ++++++++--
 .../apache/spark/network/shuffle/ErrorHandler.java |  7 ++---
 .../network/shuffle/RemoteBlockPushResolver.java   | 30 ++++++++++++----------
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  4 +++
 .../shuffle/RemoteBlockPushResolverSuite.java      | 17 +++++++-----
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  | 10 +++-----
 .../spark/shuffle/ShuffleBlockPusherSuite.scala    |  6 +++++
 8 files changed, 77 insertions(+), 32 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
index 5906fa2..4bb22b2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
     " is received after merged shuffle is finalized";
 
   /**
+   * String constant used for generating exception messages indicating the 
application attempt is
+   * not the latest attempt on the server side. When we get a block push 
failure because of the too
+   * old attempt, we will not retry pushing the block nor log the exception on 
the client side.
+   */
+  public static final String TOO_OLD_ATTEMPT_SUFFIX =
+    " is from an older app attempt";
+
+  /**
    * String constant used for generating exception messages indicating a block 
to be merged
    * is a stale block push in the case of indeterminate stage retries on the 
server side.
    * When we get a block push failure because of the block push being stale, 
we will not
@@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
      * indeterminate stage retries. When the client receives this code, it 
will not retry
      * pushing the block.
      */
-    STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+    STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
+    /**
+     * Indicate the application attempt is not the latest attempt on the 
server side.
+     * When the client gets this code, it will not retry pushing the block.
+     */
+    TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);
 
     private final byte id;
     // Error message suffix used to generate an error message for a given 
ReturnCode and
@@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
       case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
       case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
       case 3: return ReturnCode.STALE_BLOCK_PUSH;
+      case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
       default: throw new IllegalArgumentException("Unknown block push return 
code: " + id);
     }
   }
 
+  public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
+    return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+      returnCode == ReturnCode.STALE_BLOCK_PUSH ||
+      returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
+  }
+
   public static String getErrorMsg(String blockId, ReturnCode errorCode) {
     Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
     return "Block " + blockId + errorCode.errorMsgSuffix;
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 5c07f20..bc99248 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -249,8 +249,17 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
         wrappedCallback.onComplete(wrappedCallback.getID());
       }
     } catch (Exception e) {
-      logger.error("Error while invoking RpcHandler#receive() on RPC id " + 
req.requestId, e);
-      respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
+      if (e instanceof BlockPushNonFatalFailure) {
+        // Thrown by rpcHandler.receiveStream(reverseClient, meta, callback), 
the same as
+        // onComplete method. Respond an RPC message with the error code to 
client instead of
+        // using exceptions encoded in the RPCFailure. Using a proper 
RPCResponse is more
+        // efficient, and now only include the too old attempt case here.
+        respond(new RpcResponse(req.requestId,
+          new NioManagedBuffer(((BlockPushNonFatalFailure) e).getResponse())));
+      } else {
+        logger.error("Error while invoking RpcHandler#receive() on RPC id " + 
req.requestId, e);
+        respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
+      }
       // We choose to totally fail the channel, rather than trying to recover 
as we do in other
       // cases.  We don't know how many bytes of the stream the client has 
already sent for the
       // stream, it's not worth trying to recover.
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 271d762..9136ff6 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -87,10 +87,11 @@ public interface ErrorHandler {
         return false;
       }
 
-      // If the block is too late or stale block push, there is no need to 
retry it
+      // If the block is too late or the invalid block push or the attempt is 
not the latest one,
+      // there is no need to retry it
       return !(t instanceof BlockPushNonFatalFailure &&
-        (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH 
||
-          ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
+        BlockPushNonFatalFailure
+          .shouldNotRetryErrorCode(((BlockPushNonFatalFailure) 
t).getReturnCode()));
     }
 
     @Override
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 80174d1..d04db67 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -401,19 +401,17 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    if (appShuffleInfo.attemptId != msg.appAttemptId) {
-      // If this Block belongs to a former application attempt, it is 
considered late,
-      // as only the blocks from the current application attempt will be merged
-      // TODO: [SPARK-35548] Client should be updated to handle this error.
-      throw new IllegalArgumentException(
-        String.format("The attempt id %s in this PushBlockStream message does 
not match "
-          + "with the current attempt id %s stored in shuffle service for 
application %s",
-          msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
-    }
     // Use string concatenation here to avoid the overhead with String.format 
on every
     // pushed block.
     final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + 
"_"
       + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + 
msg.reduceId;
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      // If this Block belongs to a former application attempt, it is 
considered late,
+      // as only the blocks from the current application attempt will be merged
+      throw new BlockPushNonFatalFailure(new BlockPushReturnCode(ReturnCode
+        .TOO_OLD_ATTEMPT_PUSH.id(), streamId).toByteBuffer(),
+        BlockPushNonFatalFailure.getErrorMsg(streamId, 
ReturnCode.TOO_OLD_ATTEMPT_PUSH));
+    }
     // Retrieve merged shuffle file metadata
     AppShufflePartitionInfo partitionInfoBeforeCheck;
     BlockPushNonFatalFailure failure = null;
@@ -519,12 +517,18 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
     if (appShuffleInfo.attemptId != msg.appAttemptId) {
-      // If this Block belongs to a former application attempt, it is 
considered late,
-      // as only the blocks from the current application attempt will be merged
-      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      // If finalizeShuffleMerge from a former application attempt, it is 
considered late,
+      // as only the finalizeShuffleMerge request from the current application 
attempt
+      // will be merged. Too old app attempt only being seen by an already 
failed
+      // app attempt, and no need use callback to return to client now, because
+      // the finalizeShuffleMerge in DAGScheduler has no retry policy, and 
don't
+      // use the BlockPushNonFatalFailure because it's the finalizeShuffleMerge
+      // related case, not the block push case, just throw it in server side 
now.
+      // TODO we may use a new exception class to include the 
finalizeShuffleMerge
+      // related case just as the BlockPushNonFatalFailure contains the block 
push cases.
       throw new IllegalArgumentException(
         String.format("The attempt id %s in this FinalizeShuffleMerge message 
does not match "
-          + "with the current attempt id %s stored in shuffle service for 
application %s",
+            + "with the current attempt id %s stored in shuffle service for 
application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> 
shuffleMergePartitionsRef =
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
index 56c9a97..246fda6 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -37,6 +37,8 @@ public class ErrorHandlerSuite {
     assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
       ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+    assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
       ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldRetryError(new RuntimeException(new 
ConnectException())));
     assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
@@ -54,6 +56,8 @@ public class ErrorHandlerSuite {
     assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index d7881f0..3324b4e 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -112,6 +112,8 @@ public class RemoteBlockPushResolverSuite {
     assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
     assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, 
"")));
@@ -939,7 +941,7 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test(expected = IllegalArgumentException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testPushBlockFromPreviousAttemptIsRejected()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
@@ -998,11 +1000,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       pushResolver.receiveBlockDataAsStream(
         new PushBlockStream(testApp, 1, 0, 0, 1, 0, 0));
-    } catch (IllegalArgumentException re) {
-      assertEquals(
-        "The attempt id 1 in this PushBlockStream message does not match " +
-          "with the current attempt id 2 stored in shuffle service for 
application " +
-          testApp, re.getMessage());
+    } catch (BlockPushNonFatalFailure re) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(re.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream2.getID());
       throw re;
     }
   }
@@ -1034,7 +1037,7 @@ public class RemoteBlockPushResolverSuite {
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
         String.format("The attempt id %s in this FinalizeShuffleMerge message 
does not " +
-          "match with the current attempt id %s stored in shuffle service for 
application %s",
+            "match with the current attempt id %s stored in shuffle service 
for application %s",
           ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
       throw e;
     }
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index e6af767..bb260f8 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -32,7 +32,6 @@ import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.server.BlockPushNonFatalFailure
-import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
 import org.apache.spark.network.shuffle.BlockPushingListener
 import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
@@ -78,12 +77,11 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         if (t.getCause != null && 
t.getCause.isInstanceOf[FileNotFoundException]) {
           return false
         }
-        // If the block is too late or the invalid block push, there is no 
need to retry it
+        // If the block is too late or the invalid block push or the attempt 
is not the latest one,
+        // there is no need to retry it
         !(t.isInstanceOf[BlockPushNonFatalFailure] &&
-          (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
-            == ReturnCode.TOO_LATE_BLOCK_PUSH ||
-            t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
-            == ReturnCode.STALE_BLOCK_PUSH))
+          BlockPushNonFatalFailure.
+            
shouldNotRetryErrorCode(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode));
       }
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 6f9b5e4..298ba50 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -224,6 +224,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
         ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
     assert(
       !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+    assert(
+      !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
         ReturnCode.STALE_BLOCK_PUSH, "")))
     assert(errorHandler.shouldRetryError(new RuntimeException(new 
ConnectException())))
     assert(
@@ -240,6 +243,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
         ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
     assert(
       !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
+    assert(
+      !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
         ReturnCode.STALE_BLOCK_PUSH, "")))
     assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
       ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to