ankuriitg commented on a change in pull request #23453: [SPARK-26089][CORE]
Handle corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r253148984
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -461,10 +485,68 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
Int.MaxValue,
Int.MaxValue,
true,
+ true,
+ taskContext.taskMetrics.createTempShuffleReadMetrics())
+ // Only one block should be returned which has corruption after
maxBytesInFlight/3
+ val (id, st) = iterator.next()
+ assert(id === ShuffleBlockId(0, 1, 0))
+ intercept[FetchFailedException] { iterator.next(); iterator.next() }
Review comment:
Actually it is not needed, I will remove the extra call. I was confused
because of below:
During the call here: `val (id, st) = iterator.next()`, there are two
possibilities:
1. Corrupt block is retrieved, we get an exception, we refetch that block
and return the other block
2. Non corrupt block (which has corruption after maxBytesInFlight/3) is
retrieved
I though that If scenario 2 occurred, we will need to call `iterator.next()`
twice to cause FetchFailedException (first call will cause a retry) but since
there are no other blocks, the first call itself will retry, refetch and then
throw FetchFailedException.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]