squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle
corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r254896039
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -449,54 +452,75 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
}
test("big blocks are also checked for corruption") {
- val corruptBuffer1 = mockCorruptBuffer(10000L, true)
-
+ val streamLength = 10000L
val blockManager = mock(classOf[BlockManager])
+
+ // This stream will throw IOException when the first byte is read
+ val localBuffer = mockCorruptBuffer(streamLength, 0)
val localBmId = BlockManagerId("test-client", "test-client", 1)
doReturn(localBmId).when(blockManager).blockManagerId
- doReturn(corruptBuffer1).when(blockManager).getBlockData(ShuffleBlockId(0,
0, 0))
+ doReturn(localBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0,
0))
+ val localShuffleBlockId = ShuffleBlockId(0, 0, 0)
val localBlockLengths = Seq[Tuple2[BlockId, Long]](
- ShuffleBlockId(0, 0, 0) -> corruptBuffer1.size()
+ localShuffleBlockId -> localBuffer.size()
)
- val corruptBuffer2 = mockCorruptBuffer(10000L, false)
+ val streamNotCorruptTill = 8 * 1024
+ // This stream will throw exception after streamNotCorruptTill bytes are
read
+ val remoteBuffer = mockCorruptBuffer(streamLength, streamNotCorruptTill)
val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+ val remoteShuffleBlockId = ShuffleBlockId(0, 1, 0)
val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
- ShuffleBlockId(0, 1, 0) -> corruptBuffer2.size()
+ remoteShuffleBlockId -> remoteBuffer.size()
)
val transfer = createMockTransfer(
- Map(ShuffleBlockId(0, 0, 0) -> corruptBuffer1, ShuffleBlockId(0, 1, 0)
-> corruptBuffer2))
-
+ Map(localShuffleBlockId -> localBuffer, remoteShuffleBlockId ->
remoteBuffer))
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(localBmId, localBlockLengths),
(remoteBmId, remoteBlockLengths)
).toIterator
-
val taskContext = TaskContext.empty()
+ val maxBytesInFlight = 3 * 1024
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
blockManager,
blocksByAddress,
- (_, in) => new LimitedInputStream(in, 10000),
- 2048,
+ (_, in) => new LimitedInputStream(in, streamLength),
+ maxBytesInFlight,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true,
true,
taskContext.taskMetrics.createTempShuffleReadMetrics())
- // Only one block should be returned which has corruption after
maxBytesInFlight/3
+
+ // Only one block should be returned which has corruption after
maxBytesInFlight/3 because the
+ // other block will be re-fetched
Review comment:
Interesting, I actually hadn't even considered the re-fetching -- is that
what is happening here, though? I think you get the remote block first because
the ShuffleBlockFetcherIterator requests the remote blocks first. Normally
that would be async and take a while, but in the test, the shuffle client
fetches the blocks immediately.
If this is happening because of the re-fetch, I'd expand that comment,
something like "because the other block will detect corruption on the first
fetch, and then get added to the queue again for a retry".
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]