otterc commented on a change in pull request #32287:
URL: https://github.com/apache/spark/pull/32287#discussion_r623425119



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -708,6 +785,15 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
+    if (isNettyOOMOnShuffle.get()) {
+      if (reqsInFlight > 0) {
+        // Return immediately if Netty is still OOMed and there're ongoing 
fetch requests
+        return
+      } else {
+        ShuffleBlockFetcherIterator.resetNettyOOMFlagIfPossible(0)
+      }
+    }
+

Review comment:
       Was thinking more about this solution and there is a potential problem I 
see. Once a netty OOM is encountered for some responses, the corresponding 
requests are deferred and no more remote requests are sent. Now this helps to 
recover. But we don't change any in-flight remote requests limits. So after the 
 `isNettyOOMOnShuffle` is reset with a successful remote response, the next 
burst of remote requests will be sent out at the same rate. This means again 
there are chances to see netty OOMs and again some of the blocks will be 
deferred. This introduces more delay and increases the load on shuffle server.
   
   I think solving this maybe more complex and right now this is just a 
workaround. But maybe we can do something simpler to reduce the number of 
requests made after a netty OOM is encountered?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to