This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 2fb62e0 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client 2fb62e0 is described below commit 2fb62e0e3d40835a3e61fcf210e0772cd0d21a68 Author: zhuqi-lucas <821684...@qq.com> AuthorDate: Mon Aug 16 13:58:48 2021 -0500 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client ### What changes were proposed in this pull request? Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push. ### Why are the changes needed? When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add the corresponding unit test. Closes #33617 from zhuqi-lucas/master. Authored-by: zhuqi-lucas <821684...@qq.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 05cd5f97c3dea25dacdbdb319243cdab9667c774) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> --- .../network/server/BlockPushNonFatalFailure.java | 22 +++++++++++++++- .../network/server/TransportRequestHandler.java | 13 ++++++++-- .../apache/spark/network/shuffle/ErrorHandler.java | 7 ++--- .../network/shuffle/RemoteBlockPushResolver.java | 30 ++++++++++++---------- .../spark/network/shuffle/ErrorHandlerSuite.java | 4 +++ .../shuffle/RemoteBlockPushResolverSuite.java | 17 +++++++----- .../apache/spark/shuffle/ShuffleBlockPusher.scala | 10 +++----- .../spark/shuffle/ShuffleBlockPusherSuite.scala | 6 +++++ 8 files changed, 77 insertions(+), 32 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java index 5906fa2..4bb22b2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java @@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends RuntimeException { " is received after merged shuffle is finalized"; /** + * String constant used for generating exception messages indicating the application attempt is + * not the latest attempt on the server side. When we get a block push failure because of the too + * old attempt, we will not retry pushing the block nor log the exception on the client side. + */ + public static final String TOO_OLD_ATTEMPT_SUFFIX = + " is from an older app attempt"; + + /** * String constant used for generating exception messages indicating a block to be merged * is a stale block push in the case of indeterminate stage retries on the server side. * When we get a block push failure because of the block push being stale, we will not @@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends RuntimeException { * indeterminate stage retries. When the client receives this code, it will not retry * pushing the block. */ - STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX); + STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX), + /** + * Indicate the application attempt is not the latest attempt on the server side. + * When the client gets this code, it will not retry pushing the block. + */ + TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX); private final byte id; // Error message suffix used to generate an error message for a given ReturnCode and @@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends RuntimeException { case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH; case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED; case 3: return ReturnCode.STALE_BLOCK_PUSH; + case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH; default: throw new IllegalArgumentException("Unknown block push return code: " + id); } } + public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) { + return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH || + returnCode == ReturnCode.STALE_BLOCK_PUSH || + returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH; + } + public static String getErrorMsg(String blockId, ReturnCode errorCode) { Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS); return "Block " + blockId + errorCode.errorMsgSuffix; diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 5c07f20..bc99248 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -249,8 +249,17 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { wrappedCallback.onComplete(wrappedCallback.getID()); } } catch (Exception e) { - logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e); - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + if (e instanceof BlockPushNonFatalFailure) { + // Thrown by rpcHandler.receiveStream(reverseClient, meta, callback), the same as + // onComplete method. Respond an RPC message with the error code to client instead of + // using exceptions encoded in the RPCFailure. Using a proper RPCResponse is more + // efficient, and now only include the too old attempt case here. + respond(new RpcResponse(req.requestId, + new NioManagedBuffer(((BlockPushNonFatalFailure) e).getResponse()))); + } else { + logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e); + respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + } // We choose to totally fail the channel, rather than trying to recover as we do in other // cases. We don't know how many bytes of the stream the client has already sent for the // stream, it's not worth trying to recover. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java index 271d762..9136ff6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java @@ -87,10 +87,11 @@ public interface ErrorHandler { return false; } - // If the block is too late or stale block push, there is no need to retry it + // If the block is too late or the invalid block push or the attempt is not the latest one, + // there is no need to retry it return !(t instanceof BlockPushNonFatalFailure && - (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH || - ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH)); + BlockPushNonFatalFailure + .shouldNotRetryErrorCode(((BlockPushNonFatalFailure) t).getReturnCode())); } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 80174d1..d04db67 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -401,19 +401,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); - if (appShuffleInfo.attemptId != msg.appAttemptId) { - // If this Block belongs to a former application attempt, it is considered late, - // as only the blocks from the current application attempt will be merged - // TODO: [SPARK-35548] Client should be updated to handle this error. - throw new IllegalArgumentException( - String.format("The attempt id %s in this PushBlockStream message does not match " - + "with the current attempt id %s stored in shuffle service for application %s", - msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); - } // Use string concatenation here to avoid the overhead with String.format on every // pushed block. final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + "_" + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + msg.reduceId; + if (appShuffleInfo.attemptId != msg.appAttemptId) { + // If this Block belongs to a former application attempt, it is considered late, + // as only the blocks from the current application attempt will be merged + throw new BlockPushNonFatalFailure(new BlockPushReturnCode(ReturnCode + .TOO_OLD_ATTEMPT_PUSH.id(), streamId).toByteBuffer(), + BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_OLD_ATTEMPT_PUSH)); + } // Retrieve merged shuffle file metadata AppShufflePartitionInfo partitionInfoBeforeCheck; BlockPushNonFatalFailure failure = null; @@ -519,12 +517,18 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId); AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); if (appShuffleInfo.attemptId != msg.appAttemptId) { - // If this Block belongs to a former application attempt, it is considered late, - // as only the blocks from the current application attempt will be merged - // TODO: [SPARK-35548] Client should be updated to handle this error. + // If finalizeShuffleMerge from a former application attempt, it is considered late, + // as only the finalizeShuffleMerge request from the current application attempt + // will be merged. Too old app attempt only being seen by an already failed + // app attempt, and no need use callback to return to client now, because + // the finalizeShuffleMerge in DAGScheduler has no retry policy, and don't + // use the BlockPushNonFatalFailure because it's the finalizeShuffleMerge + // related case, not the block push case, just throw it in server side now. + // TODO we may use a new exception class to include the finalizeShuffleMerge + // related case just as the BlockPushNonFatalFailure contains the block push cases. throw new IllegalArgumentException( String.format("The attempt id %s in this FinalizeShuffleMerge message does not match " - + "with the current attempt id %s stored in shuffle service for application %s", + + "with the current attempt id %s stored in shuffle service for application %s", msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); } AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef = diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java index 56c9a97..246fda6 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java @@ -37,6 +37,8 @@ public class ErrorHandlerSuite { assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( ReturnCode.TOO_LATE_BLOCK_PUSH, ""))); assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.TOO_OLD_ATTEMPT_PUSH, ""))); + assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( ReturnCode.STALE_BLOCK_PUSH, ""))); assertFalse(pushHandler.shouldRetryError(new RuntimeException(new ConnectException()))); assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure( @@ -54,6 +56,8 @@ public class ErrorHandlerSuite { assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( ReturnCode.TOO_LATE_BLOCK_PUSH, ""))); assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.TOO_OLD_ATTEMPT_PUSH, ""))); + assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( ReturnCode.STALE_BLOCK_PUSH, ""))); assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure( ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index d7881f0..3324b4e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -112,6 +112,8 @@ public class RemoteBlockPushResolverSuite { assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, ""))); assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( + BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, ""))); + assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, ""))); assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure( BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))); @@ -939,7 +941,7 @@ public class RemoteBlockPushResolverSuite { } } - @Test(expected = IllegalArgumentException.class) + @Test(expected = BlockPushNonFatalFailure.class) public void testPushBlockFromPreviousAttemptIsRejected() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); @@ -998,11 +1000,12 @@ public class RemoteBlockPushResolverSuite { try { pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, 1, 0, 0, 1, 0, 0)); - } catch (IllegalArgumentException re) { - assertEquals( - "The attempt id 1 in this PushBlockStream message does not match " + - "with the current attempt id 2 stored in shuffle service for application " + - testApp, re.getMessage()); + } catch (BlockPushNonFatalFailure re) { + BlockPushReturnCode errorCode = + (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(re.getResponse()); + assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(), + errorCode.returnCode); + assertEquals(errorCode.failureBlockId, stream2.getID()); throw re; } } @@ -1034,7 +1037,7 @@ public class RemoteBlockPushResolverSuite { } catch (IllegalArgumentException e) { assertEquals(e.getMessage(), String.format("The attempt id %s in this FinalizeShuffleMerge message does not " + - "match with the current attempt id %s stored in shuffle service for application %s", + "match with the current attempt id %s stored in shuffle service for application %s", ATTEMPT_ID_1, ATTEMPT_ID_2, testApp)); throw e; } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala index e6af767..bb260f8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala @@ -32,7 +32,6 @@ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.BlockPushNonFatalFailure -import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode import org.apache.spark.network.shuffle.BlockPushingListener import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler import org.apache.spark.network.util.TransportConf @@ -78,12 +77,11 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging { if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) { return false } - // If the block is too late or the invalid block push, there is no need to retry it + // If the block is too late or the invalid block push or the attempt is not the latest one, + // there is no need to retry it !(t.isInstanceOf[BlockPushNonFatalFailure] && - (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode - == ReturnCode.TOO_LATE_BLOCK_PUSH || - t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode - == ReturnCode.STALE_BLOCK_PUSH)) + BlockPushNonFatalFailure. + shouldNotRetryErrorCode(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode)); } } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala index 6f9b5e4..298ba50 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala @@ -224,6 +224,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { ReturnCode.TOO_LATE_BLOCK_PUSH, ""))) assert( !errorHandler.shouldRetryError(new BlockPushNonFatalFailure( + ReturnCode.TOO_OLD_ATTEMPT_PUSH, ""))) + assert( + !errorHandler.shouldRetryError(new BlockPushNonFatalFailure( ReturnCode.STALE_BLOCK_PUSH, ""))) assert(errorHandler.shouldRetryError(new RuntimeException(new ConnectException()))) assert( @@ -240,6 +243,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach { ReturnCode.TOO_LATE_BLOCK_PUSH, ""))) assert( !errorHandler.shouldLogError(new BlockPushNonFatalFailure( + ReturnCode.TOO_OLD_ATTEMPT_PUSH, ""))) + assert( + !errorHandler.shouldLogError(new BlockPushNonFatalFailure( ReturnCode.STALE_BLOCK_PUSH, ""))) assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure( ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org