Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20685#discussion_r172232973
--- Diff:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
intercept[FetchFailedException] { iterator.next() }
}
+ test("big corrupt blocks will not be retiried") {
+ val corruptStream = mock(classOf[InputStream])
+ when(corruptStream.read(any(), any(), any())).thenThrow(new
IOException("corrupt"))
+ val corruptBuffer = mock(classOf[ManagedBuffer])
+ when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+ doReturn(10000L).when(corruptBuffer).size()
+
+ val blockManager = mock(classOf[BlockManager])
+ val localBmId = BlockManagerId("test-client", "test-client", 1)
+ doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+ val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+ ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+ )
+
+ val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+ val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+ ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+ )
+
+ val transfer = mock(classOf[BlockTransferService])
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --
you can reuse `createMockTransfer` to simplify this a little.
(actually, a bunch of this test code looks like it could be refactored
across these tests -- but we can leave that out of this change.)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]