This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new d1cd110 [SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks from push-based shuffle d1cd110 is described below commit d1cd110c20817eb1ccd716e099be5712df1f670c Author: Cheng Pan <cheng...@apache.org> AuthorDate: Thu Dec 23 12:00:45 2021 +0800 [SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks from push-based shuffle ### What changes were proposed in this pull request? Skip diagnosis ob merged blocks from push-based shuffle ### Why are the changes needed? Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions. ``` 21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043) at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308) at scala.Option.map(Option.scala:230) at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110) at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? Yes, suppress the exceptions. ### How was this patch tested? Run 1T TPCDS manually. Closes #34961 from pan3793/SPARK-37695. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: yi.wu <yi...@databricks.com> (cherry picked from commit 57ca75f3f0b2695434e47464b2201210edd58fde) Signed-off-by: yi.wu <yi...@databricks.com> --- .../storage/ShuffleBlockFetcherIterator.scala | 69 ++++++++++++---------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 3eb8acd..1f96579 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -1035,40 +1035,49 @@ final class ShuffleBlockFetcherIterator( address: BlockManagerId, blockId: BlockId): String = { logInfo("Start corruption diagnosis.") - val startTimeNs = System.nanoTime() - assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId, but got $blockId") - val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId] - val buffer = new Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER) - // consume the remaining data to calculate the checksum - var cause: Cause = null - try { - while (checkedIn.read(buffer) != -1) {} - val checksum = checkedIn.getChecksum.getValue - cause = shuffleClient.diagnoseCorruption(address.host, address.port, address.executorId, - shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, checksum, - checksumAlgorithm) - } catch { - case e: Exception => - logWarning("Unable to diagnose the corruption cause of the corrupted block", e) - cause = Cause.UNKNOWN_ISSUE - } - val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs) - val diagnosisResponse = cause match { - case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM => - s"Block $blockId is corrupted but corruption diagnosis failed due to " + - s"unsupported checksum algorithm: $checksumAlgorithm" + blockId match { + case shuffleBlock: ShuffleBlockId => + val startTimeNs = System.nanoTime() + val buffer = new Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER) + // consume the remaining data to calculate the checksum + var cause: Cause = null + try { + while (checkedIn.read(buffer) != -1) {} + val checksum = checkedIn.getChecksum.getValue + cause = shuffleClient.diagnoseCorruption(address.host, address.port, address.executorId, + shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, checksum, + checksumAlgorithm) + } catch { + case e: Exception => + logWarning("Unable to diagnose the corruption cause of the corrupted block", e) + cause = Cause.UNKNOWN_ISSUE + } + val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs) + val diagnosisResponse = cause match { + case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM => + s"Block $blockId is corrupted but corruption diagnosis failed due to " + + s"unsupported checksum algorithm: $checksumAlgorithm" - case Cause.CHECKSUM_VERIFY_PASS => - s"Block $blockId is corrupted but checksum verification passed" + case Cause.CHECKSUM_VERIFY_PASS => + s"Block $blockId is corrupted but checksum verification passed" - case Cause.UNKNOWN_ISSUE => - s"Block $blockId is corrupted but the cause is unknown" + case Cause.UNKNOWN_ISSUE => + s"Block $blockId is corrupted but the cause is unknown" - case otherCause => - s"Block $blockId is corrupted due to $otherCause" + case otherCause => + s"Block $blockId is corrupted due to $otherCause" + } + logInfo(s"Finished corruption diagnosis in $duration ms. $diagnosisResponse") + diagnosisResponse + case shuffleBlockChunk: ShuffleBlockChunkId => + // TODO SPARK-36284 Add shuffle checksum support for push-based shuffle + val diagnosisResponse = s"BlockChunk $shuffleBlockChunk is corrupted but corruption " + + s"diagnosis is skipped due to lack of shuffle checksum support for push-based shuffle." + logWarning(diagnosisResponse) + diagnosisResponse + case unexpected: BlockId => + throw new IllegalArgumentException(s"Unexpected type of BlockId, $unexpected") } - logInfo(s"Finished corruption diagnosis in $duration ms. $diagnosisResponse") - diagnosisResponse } def toCompletionIterator: Iterator[(BlockId, InputStream)] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org