[ 
https://issues.apache.org/jira/browse/SPARK-38987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-38987.
-----------------------------------------
    Fix Version/s: 3.4.0
         Assignee: Aravind Patnam
       Resolution: Fixed

> Handle fallback when merged shuffle blocks are corrupted and 
> spark.shuffle.detectCorrupt is set to true
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38987
>                 URL: https://issues.apache.org/jira/browse/SPARK-38987
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Shuffle
>    Affects Versions: 3.2.0
>            Reporter: Ye Zhou
>            Assignee: Aravind Patnam
>            Priority: Major
>             Fix For: 3.4.0
>
>
> Current implementation doesn't handle the corruption cases well for merged 
> shuffle blocks, and if there is a corrupted shuffle chunk, the task will 
> directly fail with the error message shown below, without triggering the 
> expected fallback, which will try fetching original unmerged shuffle blocks:
> org.apache.spark.SparkException: Failed to get block 
> shuffleChunk_30_0_1070_0, which is not a shuffle block
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1189)
>       at 
> org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1352)
>       at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>       at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
>       at java.io.DataInputStream.readInt(DataInputStream.java:387)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
>       at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
>       at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
>       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage138.sort_addToSorter_0$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage138.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>       at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>       at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:783)
>       at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:742)
>       at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:908)
>       at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:944)
>       at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage140.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>       at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>       at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:783)
>       at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:742)
>       at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:908)
>       at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:944)
>       at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage142.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>       at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>       at



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to