Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20685#discussion_r172492938
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
    @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
         intercept[FetchFailedException] { iterator.next() }
       }
     
    +  test("big corrupt blocks will not be retiried") {
    +    val corruptStream = mock(classOf[InputStream])
    +    when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
    +    val corruptBuffer = mock(classOf[ManagedBuffer])
    +    when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
    +    doReturn(10000L).when(corruptBuffer).size()
    +
    +    val blockManager = mock(classOf[BlockManager])
    +    val localBmId = BlockManagerId("test-client", "test-client", 1)
    +    doReturn(localBmId).when(blockManager).blockManagerId
    +    
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
    +    val localBlockLengths = Seq[Tuple2[BlockId, Long]](
    +      ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
    +    )
    +
    +    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
    +    val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
    +      ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
    +    )
    +
    +    val transfer = mock(classOf[BlockTransferService])
    +    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
    --- End diff --
    
    Thanks a lot~ Imran, I can file another pr for the refine :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to