squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle 
corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r263603012
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##########
 @@ -425,28 +450,97 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     intercept[FetchFailedException] { iterator.next() }
   }
 
-  test("big blocks are not checked for corruption") {
-    val corruptBuffer = mockCorruptBuffer(10000L)
-
+  test("big blocks are also checked for corruption") {
+    val streamLength = 10000L
     val blockManager = mock(classOf[BlockManager])
-    val localBmId = BlockManagerId("test-client", "test-client", 1)
-    doReturn(localBmId).when(blockManager).blockManagerId
-    doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 
0, 0))
-    val localBlockLengths = Seq[Tuple2[BlockId, Long]](
-      ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+    val localBlockManagerId = BlockManagerId("local-client", "local-client", 1)
+    doReturn(localBlockManagerId).when(blockManager).blockManagerId
+
+    // This stream will throw IOException when the first byte is read
+    val corruptBuffer1 = mockCorruptBuffer(streamLength, 0)
+    val blockManagerId1 = BlockManagerId("remote-client-1", "remote-client-1", 
1)
+    val shuffleBlockId1 = ShuffleBlockId(0, 1, 0)
+    val blockLengths1 = Seq[Tuple2[BlockId, Long]](
+      shuffleBlockId1 -> corruptBuffer1.size()
     )
 
-    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
-    val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
-      ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+    val streamNotCorruptTill = 8 * 1024
+    // This stream will throw exception after streamNotCorruptTill bytes are 
read
+    val corruptBuffer2 = mockCorruptBuffer(streamLength, streamNotCorruptTill)
+    val blockManagerId2 = BlockManagerId("remote-client-2", "remote-client-2", 
2)
+    val shuffleBlockId2 = ShuffleBlockId(0, 2, 0)
+    val blockLengths2 = Seq[Tuple2[BlockId, Long]](
+      shuffleBlockId2 -> corruptBuffer2.size()
     )
 
     val transfer = createMockTransfer(
-      Map(ShuffleBlockId(0, 0, 0) -> corruptBuffer, ShuffleBlockId(0, 1, 0) -> 
corruptBuffer))
+      Map(shuffleBlockId1 -> corruptBuffer1, shuffleBlockId2 -> 
corruptBuffer2))
+    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
+      (blockManagerId1, blockLengths1),
+      (blockManagerId2, blockLengths2)
+    ).toIterator
+    val taskContext = TaskContext.empty()
+    val maxBytesInFlight = 3 * 1024
+    val iterator = new ShuffleBlockFetcherIterator(
+      taskContext,
+      transfer,
+      blockManager,
+      blocksByAddress,
+      (_, in) => new LimitedInputStream(in, streamLength),
+      maxBytesInFlight,
+      Int.MaxValue,
+      Int.MaxValue,
+      Int.MaxValue,
+      true,
+      true,
+      taskContext.taskMetrics.createTempShuffleReadMetrics())
+
+    // Only one block should be returned which has corruption after 
maxBytesInFlight/3 because the
+    // other block will detect corruption on first fetch, and then get added 
to the queue again for
+    // a retry
 
 Review comment:
   I'd reword this -- you get one block because you call next() once.  You 
really want to explain why you get a certain block back.
   
   ```We'll get back the block which has corruption after maxBytesInFlight/3 
because ...```
   
   (assuming I understand this correctly...)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to