This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d1cd110  [SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks 
from push-based shuffle
d1cd110 is described below

commit d1cd110c20817eb1ccd716e099be5712df1f670c
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Thu Dec 23 12:00:45 2021 +0800

    [SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks from 
push-based shuffle
    
    ### What changes were proposed in this pull request?
    
    Skip diagnosis ob merged blocks from push-based shuffle
    
    ### Why are the changes needed?
    
    Because SPARK-36284 has not been addressed yet, skip it to suppress 
exceptions.
    
    ```
    21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: 
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
        at scala.Predef$.assert(Predef.scala:223)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
        at 
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
        at 
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, suppress the exceptions.
    
    ### How was this patch tested?
    
    Run 1T TPCDS manually.
    
    Closes #34961 from pan3793/SPARK-37695.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: yi.wu <yi...@databricks.com>
    (cherry picked from commit 57ca75f3f0b2695434e47464b2201210edd58fde)
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../storage/ShuffleBlockFetcherIterator.scala      | 69 ++++++++++++----------
 1 file changed, 39 insertions(+), 30 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 3eb8acd..1f96579 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1035,40 +1035,49 @@ final class ShuffleBlockFetcherIterator(
       address: BlockManagerId,
       blockId: BlockId): String = {
     logInfo("Start corruption diagnosis.")
-    val startTimeNs = System.nanoTime()
-    assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId, 
but got $blockId")
-    val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
-    val buffer = new 
Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
-    // consume the remaining data to calculate the checksum
-    var cause: Cause = null
-    try {
-      while (checkedIn.read(buffer) != -1) {}
-      val checksum = checkedIn.getChecksum.getValue
-      cause = shuffleClient.diagnoseCorruption(address.host, address.port, 
address.executorId,
-        shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, 
checksum,
-        checksumAlgorithm)
-    } catch {
-      case e: Exception =>
-        logWarning("Unable to diagnose the corruption cause of the corrupted 
block", e)
-        cause = Cause.UNKNOWN_ISSUE
-    }
-    val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)
-    val diagnosisResponse = cause match {
-      case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM =>
-        s"Block $blockId is corrupted but corruption diagnosis failed due to " 
+
-          s"unsupported checksum algorithm: $checksumAlgorithm"
+    blockId match {
+      case shuffleBlock: ShuffleBlockId =>
+        val startTimeNs = System.nanoTime()
+        val buffer = new 
Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
+        // consume the remaining data to calculate the checksum
+        var cause: Cause = null
+        try {
+          while (checkedIn.read(buffer) != -1) {}
+          val checksum = checkedIn.getChecksum.getValue
+          cause = shuffleClient.diagnoseCorruption(address.host, address.port, 
address.executorId,
+            shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, 
checksum,
+            checksumAlgorithm)
+        } catch {
+          case e: Exception =>
+            logWarning("Unable to diagnose the corruption cause of the 
corrupted block", e)
+            cause = Cause.UNKNOWN_ISSUE
+        }
+        val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)
+        val diagnosisResponse = cause match {
+          case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM =>
+            s"Block $blockId is corrupted but corruption diagnosis failed due 
to " +
+              s"unsupported checksum algorithm: $checksumAlgorithm"
 
-      case Cause.CHECKSUM_VERIFY_PASS =>
-        s"Block $blockId is corrupted but checksum verification passed"
+          case Cause.CHECKSUM_VERIFY_PASS =>
+            s"Block $blockId is corrupted but checksum verification passed"
 
-      case Cause.UNKNOWN_ISSUE =>
-        s"Block $blockId is corrupted but the cause is unknown"
+          case Cause.UNKNOWN_ISSUE =>
+            s"Block $blockId is corrupted but the cause is unknown"
 
-      case otherCause =>
-        s"Block $blockId is corrupted due to $otherCause"
+          case otherCause =>
+            s"Block $blockId is corrupted due to $otherCause"
+        }
+        logInfo(s"Finished corruption diagnosis in $duration ms. 
$diagnosisResponse")
+        diagnosisResponse
+      case shuffleBlockChunk: ShuffleBlockChunkId =>
+        // TODO SPARK-36284 Add shuffle checksum support for push-based shuffle
+        val diagnosisResponse = s"BlockChunk $shuffleBlockChunk is corrupted 
but corruption " +
+          s"diagnosis is skipped due to lack of shuffle checksum support for 
push-based shuffle."
+        logWarning(diagnosisResponse)
+        diagnosisResponse
+      case unexpected: BlockId =>
+        throw new IllegalArgumentException(s"Unexpected type of BlockId, 
$unexpected")
     }
-    logInfo(s"Finished corruption diagnosis in $duration ms. 
$diagnosisResponse")
-    diagnosisResponse
   }
 
   def toCompletionIterator: Iterator[(BlockId, InputStream)] = {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to