Github user jinxing64 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20685#discussion_r172492938
--- Diff:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
intercept[FetchFailedException] { iterator.next() }
}
+ test("big corrupt blocks will not be retiried") {
+ val corruptStream = mock(classOf[InputStream])
+ when(corruptStream.read(any(), any(), any())).thenThrow(new
IOException("corrupt"))
+ val corruptBuffer = mock(classOf[ManagedBuffer])
+ when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+ doReturn(10000L).when(corruptBuffer).size()
+
+ val blockManager = mock(classOf[BlockManager])
+ val localBmId = BlockManagerId("test-client", "test-client", 1)
+ doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+ val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+ ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+ )
+
+ val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+ val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+ ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+ )
+
+ val transfer = mock(classOf[BlockTransferService])
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --
Thanks a lot~ Imran, I can file another pr for the refine :)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]