squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle 
corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r253975742
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##########
 @@ -461,10 +485,68 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       Int.MaxValue,
       Int.MaxValue,
       true,
+      true,
       taskContext.taskMetrics.createTempShuffleReadMetrics())
-    // Blocks should be returned without exceptions.
-    assert(Set(iterator.next()._1, iterator.next()._1) ===
-        Set(ShuffleBlockId(0, 0, 0), ShuffleBlockId(0, 1, 0)))
+    // Only one block should be returned which has corruption after 
maxBytesInFlight/3
+    val (id, st) = iterator.next()
+    assert(id === ShuffleBlockId(0, 1, 0))
+    intercept[FetchFailedException] { iterator.next() }
+    // Following will succeed as it reads the first part of the stream which 
is not corrupt
+    st.read(new Array[Byte](8 * 1024), 0, 8 * 1024)
+    // Following will fail as it reads the remaining part of the stream which 
is corrupt
+    intercept[FetchFailedException] { st.read() }
+    intercept[FetchFailedException] { st.read(new Array[Byte](8 * 1024)) }
+    intercept[FetchFailedException] { st.read(new Array[Byte](8 * 1024), 0, 8 
* 1024) }
+  }
 
 Review comment:
   I finally went through this test carefully, and I think its pretty confusing 
for a few reasons.  I think I only discovered one real bug -- the original 
input streams are not getting closed properly.  With my suggested change to the 
corrupt stream, you can add a check to check that the stream is closed.
   
   A few things which make this wrong / confusing:
   
   (a) `st.read(new Array[Byte](8 * 1024), 0, 8 * 1024)` is not guaranteed to 
read a full 8K.  In fact, SequenceInputStream returns after reading the first 
buffered stream.  An easy fix is to use `new DataInputStream(st).readFully(...)`
   (b) minor: I'd rename your blockmanager ids to somethign like "local" and 
"remote"
   (c) when I stepped through this carefully, I was surprised that the *remote* 
block is the one that is returned first (corruptStream2).  This is due to a 
wrinkle in the inner workings of the ShuffleBlockFetcherIterator -- but I'd at 
least put in a comment about that.
   (d) there are a bunch of magical constants around, might help to give those 
names, eg. `maxBytesInFlight`, `corruptionCheckLimit`, `readLen`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to