otterc commented on a change in pull request #32287:
URL: https://github.com/apache/spark/pull/32287#discussion_r623425119
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -708,6 +785,15 @@ final class ShuffleBlockFetcherIterator(
}
private def fetchUpToMaxBytes(): Unit = {
+ if (isNettyOOMOnShuffle.get()) {
+ if (reqsInFlight > 0) {
+ // Return immediately if Netty is still OOMed and there're ongoing
fetch requests
+ return
+ } else {
+ ShuffleBlockFetcherIterator.resetNettyOOMFlagIfPossible(0)
+ }
+ }
+
Review comment:
Was thinking more about this solution and there is a potential problem I
see. Once a netty OOM is encountered for some responses, the corresponding
requests are deferred and no more remote requests are sent. Now this helps to
recover. But we don't change any in-flight remote requests limits. So after the
`isNettyOOMOnShuffle` is reset with a successful remote response, the next
burst of remote requests will be sent out at the same rate. This means again
there are chances to see netty OOMs and again some of the blocks will be
deferred. This introduces more delay and increases the load on shuffle server.
I think solving this maybe more complex and right now this is just a
workaround. But maybe we can do something simpler to reduce the number of
requests made after a netty OOM is encountered?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]