squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle 
corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r254896039
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##########
 @@ -449,54 +452,75 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
   }
 
   test("big blocks are also checked for corruption") {
-    val corruptBuffer1 = mockCorruptBuffer(10000L, true)
-
+    val streamLength = 10000L
     val blockManager = mock(classOf[BlockManager])
+
+    // This stream will throw IOException when the first byte is read
+    val localBuffer = mockCorruptBuffer(streamLength, 0)
     val localBmId = BlockManagerId("test-client", "test-client", 1)
     doReturn(localBmId).when(blockManager).blockManagerId
-    doReturn(corruptBuffer1).when(blockManager).getBlockData(ShuffleBlockId(0, 
0, 0))
+    doReturn(localBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 
0))
+    val localShuffleBlockId = ShuffleBlockId(0, 0, 0)
     val localBlockLengths = Seq[Tuple2[BlockId, Long]](
-      ShuffleBlockId(0, 0, 0) -> corruptBuffer1.size()
+      localShuffleBlockId -> localBuffer.size()
     )
 
-    val corruptBuffer2 = mockCorruptBuffer(10000L, false)
+    val streamNotCorruptTill = 8 * 1024
+    // This stream will throw exception after streamNotCorruptTill bytes are 
read
+    val remoteBuffer = mockCorruptBuffer(streamLength, streamNotCorruptTill)
     val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val remoteShuffleBlockId = ShuffleBlockId(0, 1, 0)
     val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
-      ShuffleBlockId(0, 1, 0) -> corruptBuffer2.size()
+      remoteShuffleBlockId -> remoteBuffer.size()
     )
 
     val transfer = createMockTransfer(
-      Map(ShuffleBlockId(0, 0, 0) -> corruptBuffer1, ShuffleBlockId(0, 1, 0) 
-> corruptBuffer2))
-
+      Map(localShuffleBlockId -> localBuffer, remoteShuffleBlockId -> 
remoteBuffer))
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (localBmId, localBlockLengths),
       (remoteBmId, remoteBlockLengths)
     ).toIterator
-
     val taskContext = TaskContext.empty()
+    val maxBytesInFlight = 3 * 1024
     val iterator = new ShuffleBlockFetcherIterator(
       taskContext,
       transfer,
       blockManager,
       blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 10000),
-      2048,
+      (_, in) => new LimitedInputStream(in, streamLength),
+      maxBytesInFlight,
       Int.MaxValue,
       Int.MaxValue,
       Int.MaxValue,
       true,
       true,
       taskContext.taskMetrics.createTempShuffleReadMetrics())
-    // Only one block should be returned which has corruption after 
maxBytesInFlight/3
+
+    // Only one block should be returned which has corruption after 
maxBytesInFlight/3 because the
+    // other block will be re-fetched
 
 Review comment:
   Interesting, I actually hadn't even considered the re-fetching -- is that 
what is happening here, though?  I think you get the remote block first because 
the ShuffleBlockFetcherIterator requests the remote blocks first.  Normally 
that would be async and take a while, but in the test, the shuffle client 
fetches the blocks immediately.
   
   If this is happening because of the re-fetch, I'd expand that comment, 
something like "because the other block will detect corruption on the first 
fetch, and then get added to the queue again for a retry".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to