venkata91 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r769089430
##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##########
@@ -327,10 +338,32 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf)
extends Logging {
s"stop.")
return false
} else {
+ if (reqsInFlight <= 0 && pushRequests.isEmpty &&
deferredPushRequests.isEmpty) {
Review comment:
Tried waiting on the latch for one of the shuffleClient.pushBlocks call
but this is still not working. Need to have a latch on the
notifyDriverAboutPushCompletion so that the tests won't complete before all the
blocks are pushed. Also we need to have the mock of
when(shuffleClient.pushBlocks to be running in a separate thread without that
it is hard to test the problematic behavior. So instead just checking
assert(bytesInFlight <= 0) in notifyDriverAboutPushCompletion itself. Let me
know if you have other thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]