JoshRosen opened a new pull request #34265:
URL: https://github.com/apache/spark/pull/34265


   ### What changes were proposed in this pull request?
   
   This PR fixes a longstanding issue where the `DAGScheduler'`s 
single-threaded event processing loop could become blocked by slow 
`RDD.getPartitions()` calls, preventing other events (like task completions and 
concurrent job submissions) from being processed in a timely manner.
   
   With this patch's change, Spark will now call `.partitions` on every RDD in 
the DAG before submitting a job to the scheduler, ensuring that the expensive 
`getPartitions()` calls occur outside of the scheduler event loop.
   
   #### Background
   
   The `RDD.partitions` method lazily computes an RDD's partitions by calling 
`RDD.getPartitions()`. The `getPartitions()` method is invoked only once per 
RDD and its result is cached in the `RDD.partitions_` private field. Sometimes 
the `getPartitions()` call can be expensive: for example, 
`HadoopRDD.getPartitions()` performs file listing operations. 
   
   The `.partitions` method is invoked at many different places in Spark's 
code, including many existing call sites that are outside of the scheduler 
event loop. As a result, it's _often_ the case that an RDD's partitions will 
have been computed before the RDD is submitted to the DAGScheduler. For 
example, [`submitJob` calls 
`rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L837),
 so the DAG root's partitions will be computed outside of the scheduler event 
loop.
   
   However, there's still some cases where `partitions` gets evaluated for the 
first time inside of the `DAGScheduler` internals. For example, 
[`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94)
 doesn't call `.partitions` on the RDD being shuffled, so a plan with a 
ShuffledRDD at the root won't necessarily result in `.partitions` having been 
called on all RDDs prior to scheduler job submission.
   
   #### Correctness: proving that we make no excess `.partitions` calls 
   
   This PR adds code to traverse the DAG prior to job submission and call 
`.partitions` on every RDD encountered. 
   
   I'd like to argue that this results in no _excess_ `.partitions` calls: in 
every case where the new code calls `.partitions` there is existing code which 
would have called `.partitions` at some point during a successful job execution:
   
   - Assume that this is the first time we are computing every RDD in the DAG.
   - Every RDD appears in some stage.
   - [`submitStage` will call 
`submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438)
 on every stage root RDD.
   - [`submitStage` calls 
`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696)
 on every stage root RDD.
   - 
[`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043)
 visits the RDD and all of its parents RDDs that are computed in the same stage 
(via narrow dependencies) and calls `.partitions` on each RDD visited.
   - Therefore `.partitions` is invoked on every RDD in the DAG by the time the 
job has successfully completed.
   - Therefore this patch's change does not introduce any new calls to 
`.partitions` which would not have otherwise occurred (assuming the job 
succeeded).
   
   #### Ordering of `.partitions` calls
   
   I don't think the order in which `.partitions` calls occur matters for 
correctness: the DAGScheduler happens to invoke `.partitions` in a particular 
order today (defined by the DAG traversal order in internal scheduler methods), 
but there's many  lots of out-of-order `.partition` calls occurring elsewhere 
in the codebase.
   
   #### Handling of exceptions in `.partitions`
   
   I've chosen **not** to add special error-handling for the new `.partitions` 
calls: if exceptions occur then they'll bubble up, unwrapped, to the user code 
submitting the Spark job.
   
   It's sometimes important to preserve exception wrapping behavior, but I 
don't think that concern is warranted in this particular case: whether 
`getPartitions` occurred inside or outside of the scheduler (impacting whether 
exceptions manifest in wrapped or unwrapped form, and impacting whether failed 
jobs appear in the Spark UI) was not crisply defined (and in some rare cases 
could even be [influenced by Spark settings in non-obvious 
ways](https://github.com/apache/spark/blob/10d5303174bf4a47508f6227bbdb1eaf4c92fcdb/core/src/main/scala/org/apache/spark/Partitioner.scala#L75-L79)),
 so I think it's both unlikely that users were relying on the old behavior and 
very difficult to preserve it.
   
   ### Why are the changes needed?
   
   This fixes a longstanding scheduler performance problem which has been 
reported by multiple users.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   I added a regression test in `BasicSchedulerIntegrationSuite` to cover the 
regular job submission codepath (`DAGScheduler.submitJob`)This test uses 
CountDownLatches to simulate the submission of a job containing an RDD with a 
slow `getPartitions()` call and checks that a concurrently-submitted job is not 
blocked.
   
   I have **not** added separate integration tests for the `runApproximateJob` 
and `submitMapStage` codepaths (both of which also received the same fix).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to