Ngone51 commented on a change in pull request #32287:
URL: https://github.com/apache/spark/pull/32287#discussion_r627264481



##########
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -245,9 +253,21 @@ final class ShuffleBlockFetcherIterator(
       case FetchBlockInfo(blockId, size, mapIndex) => (blockId.toString, 
(size, mapIndex))
     }.toMap
     val remainingBlocks = new HashSet[String]() ++= infoMap.keys
+    val deferredBlocks = new ArrayBuffer[String]()
     val blockIds = req.blocks.map(_.blockId.toString)
     val address = req.address
 
+    @inline def enqueueDeferredFetchRequestIfNecessary(): Unit = {
+      if (remainingBlocks.isEmpty && deferredBlocks.nonEmpty) {
+        val blocks = deferredBlocks.map { blockId =>
+          val (size, mapIndex) = infoMap(blockId)
+          FetchBlockInfo(BlockId(blockId), size, mapIndex)
+        }
+        results.put(DeferFetchRequestResult(FetchRequest(address, 
blocks.toSeq)))
+        deferredBlocks.clear()
+      }
+    }

Review comment:
       I think there are two different cases here. For example, assuming 
there're 100 requests.
   
   For case a), the OOM threshold might be 80 requests. In this case, after 
OOMed, the deferred 20 requests shouldn't hit the OOM again. 
   
   For case b), the OOM threshold might be 20 requests. In this case, there're 
still 80 deferred requests, which would hit the OOM soon as you mentioned. That 
being said, I think the current fix would work around the issue in the end. 
Note that the application would fail before the fix.
   
   To improve the current fix further, I think we can do it in a separate PR as 
I think it's not an easy thing[1] to do (or do you have any other ideas?) and 
must require more discussion. WDTY?
   
   1. Even if we skip the case of request > maxReqSizeShuffleToMem, note that 
the in-memory request size is not strictly equal to the consumed memory size in 
Netty due to Netty's memory management mechanism 
(https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919).
 For example, Netty may allocate 16MB for a 9MB block.
   And there could be multiple tasks fetching concurrently. So we may need to 
track the total `bytesInFlight` of all tasks rather than the single task 
itself. And it would require more synchronization among tasks and make the 
thing more complex.
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to