squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle
corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r253975742
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -461,10 +485,68 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
Int.MaxValue,
Int.MaxValue,
true,
+ true,
taskContext.taskMetrics.createTempShuffleReadMetrics())
- // Blocks should be returned without exceptions.
- assert(Set(iterator.next()._1, iterator.next()._1) ===
- Set(ShuffleBlockId(0, 0, 0), ShuffleBlockId(0, 1, 0)))
+ // Only one block should be returned which has corruption after
maxBytesInFlight/3
+ val (id, st) = iterator.next()
+ assert(id === ShuffleBlockId(0, 1, 0))
+ intercept[FetchFailedException] { iterator.next() }
+ // Following will succeed as it reads the first part of the stream which
is not corrupt
+ st.read(new Array[Byte](8 * 1024), 0, 8 * 1024)
+ // Following will fail as it reads the remaining part of the stream which
is corrupt
+ intercept[FetchFailedException] { st.read() }
+ intercept[FetchFailedException] { st.read(new Array[Byte](8 * 1024)) }
+ intercept[FetchFailedException] { st.read(new Array[Byte](8 * 1024), 0, 8
* 1024) }
+ }
Review comment:
I finally went through this test carefully, and I think its pretty confusing
for a few reasons. I think I only discovered one real bug -- the original
input streams are not getting closed properly. With my suggested change to the
corrupt stream, you can add a check to check that the stream is closed.
A few things which make this wrong / confusing:
(a) `st.read(new Array[Byte](8 * 1024), 0, 8 * 1024)` is not guaranteed to
read a full 8K. In fact, SequenceInputStream returns after reading the first
buffered stream. An easy fix is to use `new DataInputStream(st).readFully(...)`
(b) minor: I'd rename your blockmanager ids to somethign like "local" and
"remote"
(c) when I stepped through this carefully, I was surprised that the *remote*
block is the one that is returned first (corruptStream2). This is due to a
wrinkle in the inner workings of the ShuffleBlockFetcherIterator -- but I'd at
least put in a comment about that.
(d) there are a bunch of magical constants around, might help to give those
names, eg. `maxBytesInFlight`, `corruptionCheckLimit`, `readLen`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]