mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r960285819
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2242,60 +2251,57 @@ private[spark] class DAGScheduler(
val numMergers = stage.shuffleDep.getMergerLocs.length
val results = (0 until numMergers).map(_ =>
SettableFuture.create[Boolean]())
externalShuffleClient.foreach { shuffleClient =>
- if (!registerMergeResults) {
- results.foreach(_.set(true))
- // Finalize in separate thread as shuffle merge is a no-op in this case
- shuffleMergeFinalizeScheduler.schedule(new Runnable {
- override def run(): Unit = {
- stage.shuffleDep.getMergerLocs.foreach {
- case shuffleServiceLoc =>
- // Sends async request to shuffle service to finalize shuffle
merge on that host.
- // Since merge statuses will not be registered in this case,
- // we pass a no-op listener.
- shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
- shuffleServiceLoc.port, shuffleId, shuffleMergeId,
- new MergeFinalizerListener {
- override def onShuffleMergeSuccess(statuses:
MergeStatuses): Unit = {
- }
+ val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map {
+ case (shuffleServiceLoc, index) =>
+ // Sends async request to shuffle service to finalize shuffle merge
on that host
+ // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is
cancelled
+ // TODO: during shuffleMergeFinalizeWaitSec
+ shuffleSendFinalizeRPCExecutor.submit(new Runnable() {
+ override def run(): Unit = {
+ shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+ shuffleServiceLoc.port, shuffleId, shuffleMergeId,
+ new MergeFinalizerListener {
+ override def onShuffleMergeSuccess(statuses: MergeStatuses):
Unit = {
+ assert(shuffleId == statuses.shuffleId)
+ eventProcessLoop.post(RegisterMergeStatuses(stage,
MergeStatus.
+ convertMergeStatusesToMergeStatusArr(statuses,
shuffleServiceLoc)))
+ results(index).set(true)
+ }
Review Comment:
> I have a confusion about the origin code. If there is small shuffle data,
registerMergeResults will be false, so we don't wait for the merged statuses,
but these merged statuses are still useful if they are available before the
reduce tasks fetch them? This should often happen if the cluster is heavy.
This is to balance the overhead of waiting for finalization to complete - so
that really small stages dont spend most of their time waiting for finalization
to complete (so that we minimize the overheads of finalization as the benefits
at read wont be sufficient).
You can tune `spark.shuffle.push.minCompletedPushRatio` and
`spark.shuffle.push.minShuffleSizeToWait` as appropriate to your cluster env
for modifying the behavior.
> Another question, for your suggestion code, if all the finalize RPCs can
be completed within shuffleMergeResultsTimeoutSec, does the
cancelFinalizeShuffleMergeFutures task still need to wait for scheduling ?
In the snippet I pasted above, we have two conditions:
`timedOut || !registerMergeResults`
If `timedOut == true`, not all `results` were obtained - now this would
(almost always) mean the merger received finalization message but did not
respond in time, or the finalize message never reached it (the issue we
observed in this PR).
If `registerMergeResults == false`, we have not had time to send any rpc (we
just enqueued the tasks, set all `results` to true - so have not waited in the
`Futures.allAsList`) - so we do need to wait for rpc's to complete.
Note that if the task has already completed, cancelling futures is really
cheap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]