venkata91 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r769089430



##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##########
@@ -327,10 +338,32 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         s"stop.")
       return false
     } else {
+      if (reqsInFlight <= 0 && pushRequests.isEmpty && 
deferredPushRequests.isEmpty) {

Review comment:
       Tried waiting on the latch for one of the shuffleClient.pushBlocks call 
but this is still not working. Need to have a latch on the 
notifyDriverAboutPushCompletion so that the tests won't complete before all the 
blocks are pushed. Also we need to have the mock of 
when(shuffleClient.pushBlocks to be running in a separate thread without that 
it is hard to test the problematic behavior. So instead just checking 
assert(bytesInFlight <= 0) in notifyDriverAboutPushCompletion itself. Let me 
know if you have other thoughts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to