Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18487#discussion_r126303433
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
    @@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
       }
     
       private def fetchUpToMaxBytes(): Unit = {
    -    // Send fetch requests up to maxBytesInFlight
    -    while (fetchRequests.nonEmpty &&
    -      (bytesInFlight == 0 ||
    -        (reqsInFlight + 1 <= maxReqsInFlight &&
    -          bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
    -      sendRequest(fetchRequests.dequeue())
    +    // Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
    +    // immediately, defer the request until the next time it can be 
processed.
    +
    +    // Process any outstanding deferred fetch requests if possible.
    +    if (deferredFetchRequests.nonEmpty) {
    +      for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
    --- End diff --
    
    Oh yes. That was the first choice and I gave it a try to avoid adding any 
extra bookkeeping. The are issues with that approach. Say you have a request 
which has to be deferred. You just remove it and push at the end and continue. 
    * This is good as far as you don't meet the deferred request again. 
    * Now if you meet the deferred request again, it may or may not be 
schedulable based on whether the remote finished processing earlier request. 
This would lead going up in circles (wasted effort). To avoid this we have to 
know when to stop.  We would have to keep a marker for request which was 
already deferred so that we know we have to stop. But this marker would be only 
for a single request which corresponds to one remote. In the meanwhile other 
remotes could have finished processing their earlier requests and we can 
schedule requests to them. For this we can no longer stop at the first marker 
for a single address. We would have to check the requests again. 
    
    This makes it more complicated than scheduling all that's possible in a 
single shot and deferring what it encounters on its way. The next time we try 
to clear any backlog from previous run and after doing so proceed normally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to