This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new d8922f36a42 [SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset under tryOrFetchFailedException d8922f36a42 is described below commit d8922f36a420c47da48b3214d941ac50bab7cf63 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Fri Oct 27 19:21:04 2023 -0700 [SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset under tryOrFetchFailedException ### What changes were proposed in this pull request? This patch proposes to wrap `BufferReleasingInputStream.available/reset` under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` call will be rethrown as `FetchFailedException`. ### Why are the changes needed? We have encountered shuffle data corruption issue: ``` Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504) at org.xerial.snappy.Snappy.uncompress(Snappy.java:543) at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450) at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497) at org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356) ``` Spark shuffle has capacity to detect corruption for a few stream op like `read` and `skip`, such `IOException` in the stack trace will be rethrown as `FetchFailedException` that will re-try the failed shuffle task. But in the stack trace it is `available` that is not covered by the mechanism. So no-retry has been happened and the Spark application just failed. As the `available`/`reset` op will also involve data decompression and throw `IOException`, we should be able to check it like `read` and `skip` do. ### Does this PR introduce _any_ user-facing change? Yes. Data corruption during `available`/`reset` op is now causing `FetchFailedException` like `read` and `skip` that can be retried instead of `IOException`. ### How was this patch tested? Added test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43543 from viirya/add_available. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../storage/ShuffleBlockFetcherIterator.scala | 8 ++- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 64 +++++++++++++++++++++- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f648b189f04..e36ebc02405 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -1353,7 +1353,8 @@ private class BufferReleasingInputStream( } } - override def available(): Int = delegate.available() + override def available(): Int = + tryOrFetchFailedException(delegate.available()) override def mark(readlimit: Int): Unit = delegate.mark(readlimit) @@ -1368,12 +1369,13 @@ private class BufferReleasingInputStream( override def read(b: Array[Byte], off: Int, len: Int): Int = tryOrFetchFailedException(delegate.read(b, off, len)) - override def reset(): Unit = delegate.reset() + override def reset(): Unit = tryOrFetchFailedException(delegate.reset()) /** * Execute a block of code that returns a value, close this stream quietly and re-throwing * IOException as FetchFailedException when detectCorruption is true. This method is only - * used by the `read` and `skip` methods inside `BufferReleasingInputStream` currently. + * used by the `available`, `read` and `skip` methods inside `BufferReleasingInputStream` + * currently. */ private def tryOrFetchFailedException[T](block: => T): T = { try { diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index e16ffde5d7a..e3c47c12b8f 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -183,6 +183,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT blocksByAddress: Map[BlockManagerId, Seq[(BlockId, Long, Int)]], taskContext: Option[TaskContext] = None, streamWrapperLimitSize: Option[Long] = None, + corruptAtAvailableReset: Boolean = false, blockManager: Option[BlockManager] = None, maxBytesInFlight: Long = Long.MaxValue, maxReqsInFlight: Int = Int.MaxValue, @@ -202,7 +203,14 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT blockManager.getOrElse(createMockBlockManager()), mapOutputTracker, blocksByAddress.iterator, - (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in), + (_, in) => { + val limited = streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in) + if (corruptAtAvailableReset) { + new CorruptAvailableResetStream(limited) + } else { + limited + } + }, maxBytesInFlight, maxReqsInFlight, maxBlocksInFlightPerAddress, @@ -714,6 +722,16 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT corruptBuffer } + private class CorruptAvailableResetStream(in: InputStream) extends InputStream { + override def read(): Int = in.read() + + override def read(dest: Array[Byte], off: Int, len: Int): Int = in.read(dest, off, len) + + override def available(): Int = throw new IOException("corrupt at available") + + override def reset(): Unit = throw new IOException("corrupt at reset") + } + private class CorruptStream(corruptAt: Long = 0L) extends InputStream { var pos = 0 var closed = false @@ -1881,4 +1899,48 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT blockManager = Some(blockManager), streamWrapperLimitSize = Some(100)) verifyLocalBlocksFromFallback(iterator) } + + test("SPARK-45678: retry corrupt blocks on available() and reset()") { + val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) + val blocks = Map[BlockId, ManagedBuffer]( + ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer() + ) + + // Semaphore to coordinate event sequence in two different threads. + val sem = new Semaphore(0) + + answerFetchBlocks { invocation => + val listener = invocation.getArgument[BlockFetchingListener](4) + Future { + listener.onBlockFetchSuccess( + ShuffleBlockId(0, 0, 0).toString, createMockManagedBuffer()) + sem.release() + } + } + + val iterator = createShuffleBlockIteratorWithDefaults( + Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)), + streamWrapperLimitSize = Some(100), + detectCorruptUseExtraMemory = false, // Don't use `ChunkedByteBufferInputStream`. + corruptAtAvailableReset = true, + checksumEnabled = false + ) + + sem.acquire() + + val (id1, stream) = iterator.next() + assert(id1 === ShuffleBlockId(0, 0, 0)) + + val err1 = intercept[FetchFailedException] { + stream.available() + } + + assert(err1.getMessage.contains("corrupt at available")) + + val err2 = intercept[FetchFailedException] { + stream.reset() + } + + assert(err2.getMessage.contains("corrupt at reset")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org