Ngone51 commented on a change in pull request #32287:
URL: https://github.com/apache/spark/pull/32287#discussion_r625163412
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -708,6 +785,15 @@ final class ShuffleBlockFetcherIterator(
}
private def fetchUpToMaxBytes(): Unit = {
+ if (isNettyOOMOnShuffle.get()) {
+ if (reqsInFlight > 0) {
+ // Return immediately if Netty is still OOMed and there're ongoing
fetch requests
+ return
+ } else {
+ ShuffleBlockFetcherIterator.resetNettyOOMFlagIfPossible(0)
+ }
+ }
+
Review comment:
> Should it again check at line 805 that freeDirectMemory >
maxReqSizeShuffleToMem?
No. Otherwise, for the first invocation of `fetchUpToMaxBytes`, the fetching
can hang if `isNettyOOMOnShuffle=true.` Because if no requests were sent in the
first invocation, there would be no callback on `fetchUpToMaxBytes` later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]