This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 01ee46e  [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire 
DAG when submitting job to DAGScheduler
01ee46e is described below

commit 01ee46e03a7f5c6f8656690fae96f39a897b9799
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Thu Oct 14 14:34:24 2021 -0700

    [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when 
submitting job to DAGScheduler
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a longstanding issue where the `DAGScheduler'`s 
single-threaded event processing loop could become blocked by slow 
`RDD.getPartitions()` calls, preventing other events (like task completions and 
concurrent job submissions) from being processed in a timely manner.
    
    With this patch's change, Spark will now call `.partitions` on every RDD in 
the DAG before submitting a job to the scheduler, ensuring that the expensive 
`getPartitions()` calls occur outside of the scheduler event loop.
    
    #### Background
    
    The `RDD.partitions` method lazily computes an RDD's partitions by calling 
`RDD.getPartitions()`. The `getPartitions()` method is invoked only once per 
RDD and its result is cached in the `RDD.partitions_` private field. Sometimes 
the `getPartitions()` call can be expensive: for example, 
`HadoopRDD.getPartitions()` performs file listing operations.
    
    The `.partitions` method is invoked at many different places in Spark's 
code, including many existing call sites that are outside of the scheduler 
event loop. As a result, it's _often_ the case that an RDD's partitions will 
have been computed before the RDD is submitted to the DAGScheduler. For 
example, [`submitJob` calls 
`rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.sc
 [...]
    
    However, there's still some cases where `partitions` gets evaluated for the 
first time inside of the `DAGScheduler` internals. For example, 
[`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94)
 doesn't call `.partitions` on the RDD being shuffled, so a plan with a 
ShuffledRDD at the root won't necessarily result in `.partitions` having been 
called on all RDDs prior [...]
    
    #### Correctness: proving that we make no excess `.partitions` calls
    
    This PR adds code to traverse the DAG prior to job submission and call 
`.partitions` on every RDD encountered.
    
    I'd like to argue that this results in no _excess_ `.partitions` calls: in 
every case where the new code calls `.partitions` there is existing code which 
would have called `.partitions` at some point during a successful job execution:
    
    - Assume that this is the first time we are computing every RDD in the DAG.
    - Every RDD appears in some stage.
    - [`submitStage` will call 
`submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438)
 on every stage root RDD.
    - [`submitStage` calls 
`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696)
 on every stage root RDD.
    - 
[`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043)
 visits the RDD and all of its parents RDDs that are computed in the same stage 
(via narrow dependencies) and calls `.partitions` on each RDD visited.
    - Therefore `.partitions` is invoked on every RDD in the DAG by the time 
the job has successfully completed.
    - Therefore this patch's change does not introduce any new calls to 
`.partitions` which would not have otherwise occurred (assuming the job 
succeeded).
    
    #### Ordering of `.partitions` calls
    
    I don't think the order in which `.partitions` calls occur matters for 
correctness: the DAGScheduler happens to invoke `.partitions` in a particular 
order today (defined by the DAG traversal order in internal scheduler methods), 
but there's many  lots of out-of-order `.partition` calls occurring elsewhere 
in the codebase.
    
    #### Handling of exceptions in `.partitions`
    
    I've chosen **not** to add special error-handling for the new `.partitions` 
calls: if exceptions occur then they'll bubble up, unwrapped, to the user code 
submitting the Spark job.
    
    It's sometimes important to preserve exception wrapping behavior, but I 
don't think that concern is warranted in this particular case: whether 
`getPartitions` occurred inside or outside of the scheduler (impacting whether 
exceptions manifest in wrapped or unwrapped form, and impacting whether failed 
jobs appear in the Spark UI) was not crisply defined (and in some rare cases 
could even be [influenced by Spark settings in non-obvious 
ways](https://github.com/apache/spark/blob/10d530317 [...]
    
    #### Should this have a configuration flag?
    
    Per discussion from a previous PR trying to solve this problem 
(https://github.com/apache/spark/pull/24438#pullrequestreview-232692586), I've 
decided to skip adding a configuration flag for this.
    
    ### Why are the changes needed?
    
    This fixes a longstanding scheduler performance problem which has been 
reported by multiple users.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a regression test in `BasicSchedulerIntegrationSuite` to cover the 
regular job submission codepath (`DAGScheduler.submitJob`)This test uses 
CountDownLatches to simulate the submission of a job containing an RDD with a 
slow `getPartitions()` call and checks that a concurrently-submitted job is not 
blocked.
    
    I have **not** added separate integration tests for the `runApproximateJob` 
and `submitMapStage` codepaths (both of which also received the same fix).
    
    Closes #34265 from JoshRosen/SPARK-23626.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
    (cherry picked from commit c4e975e175c01f67ece7ae492a79554ad1b44106)
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  45 ++++++++
 .../scheduler/HealthTrackerIntegrationSuite.scala  |   4 +-
 .../scheduler/SchedulerIntegrationSuite.scala      | 120 ++++++++++++++++++---
 3 files changed, 155 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index be3744d..bb77a58d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -730,6 +730,35 @@ private[spark] class DAGScheduler(
     missing.toList
   }
 
+  /** Invoke `.partitions` on the given RDD and all of its ancestors  */
+  private def eagerlyComputePartitionsForRddAndAncestors(rdd: RDD[_]): Unit = {
+    val startTime = System.nanoTime
+    val visitedRdds = new HashSet[RDD[_]]
+    // We are manually maintaining a stack here to prevent StackOverflowError
+    // caused by recursively visiting
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += rdd
+
+    def visit(rdd: RDD[_]): Unit = {
+      if (!visitedRdds(rdd)) {
+        visitedRdds += rdd
+
+        // Eagerly compute:
+        rdd.partitions
+
+        for (dep <- rdd.dependencies) {
+          waitingForVisit.prepend(dep.rdd)
+        }
+      }
+    }
+
+    while (waitingForVisit.nonEmpty) {
+      visit(waitingForVisit.remove(0))
+    }
+    logDebug("eagerlyComputePartitionsForRddAndAncestors for RDD %d took %f 
seconds"
+      .format(rdd.id, (System.nanoTime - startTime) / 1e9))
+  }
+
   /**
    * Registers the given jobId among the jobs that need the given stage and
    * all of that stage's ancestors.
@@ -839,6 +868,11 @@ private[spark] class DAGScheduler(
           "Total number of partitions: " + maxPartitions)
     }
 
+    // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
+    // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
+    // is evaluated outside of the DAGScheduler's single-threaded event loop:
+    eagerlyComputePartitionsForRddAndAncestors(rdd)
+
     val jobId = nextJobId.getAndIncrement()
     if (partitions.isEmpty) {
       val clonedProperties = Utils.cloneProperties(properties)
@@ -928,6 +962,12 @@ private[spark] class DAGScheduler(
       listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
       return new PartialResult(evaluator.currentResult(), true)
     }
+
+    // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
+    // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
+    // is evaluated outside of the DAGScheduler's single-threaded event loop:
+    eagerlyComputePartitionsForRddAndAncestors(rdd)
+
     val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     eventProcessLoop.post(JobSubmitted(
@@ -960,6 +1000,11 @@ private[spark] class DAGScheduler(
       throw new SparkException("Can't run submitMapStage on RDD with 0 
partitions")
     }
 
+    // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
+    // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
+    // is evaluated outside of the DAGScheduler's single-threaded event loop:
+    eagerlyComputePartitionsForRddAndAncestors(rdd)
+
     // We create a JobWaiter with only one "task", which will be marked as 
complete when the whole
     // map stage has completed, and will be passed the MapOutputStatistics for 
that stage.
     // This makes it easier to avoid race conditions between the user code and 
the map output
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala
index 29a8f4b..fd05ff9 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerIntegrationSuite.scala
@@ -112,7 +112,7 @@ class HealthTrackerIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecu
       backend.taskFailed(taskDescription, new RuntimeException("test task 
failure"))
     }
     withBackend(runBackend _) {
-      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 
10).toArray)
       awaitJobTermination(jobFuture, duration)
       val pattern = (
         s"""|Aborting TaskSet 0.0 because task .*
@@ -150,7 +150,7 @@ class MockRDDWithLocalityPrefs(
     sc: SparkContext,
     numPartitions: Int,
     shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
-    val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
+    val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps, 
Nil) {
   override def getPreferredLocations(split: Partition): Seq[String] = {
     Seq(preferredLoc)
   }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 88d2868..874abce 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
-import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -205,7 +205,13 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
   def shuffle(nParts: Int, input: MockRDD): MockRDD = {
     val partitioner = new HashPartitioner(nParts)
     val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, 
partitioner)
-    new MockRDD(sc, nParts, List(shuffleDep))
+    new MockRDD(sc, nParts, List(shuffleDep), Nil)
+  }
+
+  /** models a one-to-one dependency within a stage, like a map or filter */
+  def oneToOne(input: MockRDD): MockRDD = {
+    val dep = new OneToOneDependency[(Int, Int)](input)
+    new MockRDD(sc, input.numPartitions, Nil, Seq(dep))
   }
 
   /** models a stage boundary with multiple dependencies, like a join */
@@ -214,7 +220,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: 
ClassTag] extends Spa
     val shuffleDeps = inputs.map { inputRDD =>
       new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner)
     }
-    new MockRDD(sc, nParts, shuffleDeps)
+    new MockRDD(sc, nParts, shuffleDeps, Nil)
   }
 
   val backendException = new AtomicReference[Exception](null)
@@ -449,10 +455,11 @@ case class ExecutorTaskStatus(host: String, executorId: 
String, var freeCores: I
 class MockRDD(
   sc: SparkContext,
   val numPartitions: Int,
-  val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]]
-) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable {
+  val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
+  val oneToOneDeps: Seq[OneToOneDependency[(Int, Int)]]
+) extends RDD[(Int, Int)](sc, deps = shuffleDeps ++ oneToOneDeps) with 
Serializable {
 
-  MockRDD.validate(numPartitions, shuffleDeps)
+  MockRDD.validate(numPartitions, shuffleDeps, oneToOneDeps)
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] =
     throw new RuntimeException("should not be reached")
@@ -468,14 +475,25 @@ class MockRDD(
 object MockRDD extends AssertionsHelper with TripleEquals with Assertions {
   /**
    * make sure all the shuffle dependencies have a consistent number of output 
partitions
+   * and that one-to-one dependencies have the same partition counts as their 
parents
    * (mostly to make sure the test setup makes sense, not that Spark itself 
would get this wrong)
    */
-  def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, 
_]]): Unit = {
-    dependencies.foreach { dependency =>
+  def validate(
+      numPartitions: Int,
+      shuffleDependencies: Seq[ShuffleDependency[_, _, _]],
+      oneToOneDependencies: Seq[OneToOneDependency[_]]): Unit = {
+    shuffleDependencies.foreach { dependency =>
       val partitioner = dependency.partitioner
       assert(partitioner != null)
       assert(partitioner.numPartitions === numPartitions)
     }
+    oneToOneDependencies.foreach { dependency =>
+      // In order to support the SPARK-23626 testcase, we cast to MockRDD
+      // and access `numPartitions` instead of just calling `getNumPartitions`:
+      // `getNumPartitions` would call `getPartitions`, undermining the 
intention
+      // of the SPARK-23626 testcase.
+      assert(dependency.rdd.asInstanceOf[MockRDD].numPartitions === 
numPartitions)
+    }
   }
 }
 
@@ -539,7 +557,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
       backend.taskSuccess(taskDescription, 42)
     }
     withBackend(runBackend _) {
-      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 
10).toArray)
       awaitJobTermination(jobFuture, duration)
     }
     assert(results === (0 until 10).map { _ -> 42 }.toMap)
@@ -564,7 +582,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
       }
     }
 
-    val a = new MockRDD(sc, 2, Nil)
+    val a = new MockRDD(sc, 2, Nil, Nil)
     val b = shuffle(10, a)
     val c = shuffle(20, a)
     val d = join(30, b, c)
@@ -604,7 +622,7 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
    * (b) we get a second attempt for stage 0 & stage 1
    */
   testScheduler("job with fetch failure") {
-    val input = new MockRDD(sc, 2, Nil)
+    val input = new MockRDD(sc, 2, Nil, Nil)
     val shuffledRdd = shuffle(10, input)
     val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId
 
@@ -646,10 +664,88 @@ class BasicSchedulerIntegrationSuite extends 
SchedulerIntegrationSuite[SingleCor
       backend.taskFailed(taskDescription, new RuntimeException("test task 
failure"))
     }
     withBackend(runBackend _) {
-      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 
10).toArray)
       awaitJobTermination(jobFuture, duration)
       assert(failure.getMessage.contains("test task failure"))
     }
     assertDataStructuresEmpty(noFailure = false)
   }
+
+  testScheduler("SPARK-23626: RDD with expensive getPartitions() doesn't block 
scheduler loop") {
+    // Before SPARK-23626, expensive `RDD.getPartitions()` calls might occur 
inside of the
+    // DAGScheduler event loop, causing concurrently-submitted jobs to block. 
This test case
+    // reproduces a scenario where that blocking could occur.
+
+    // We'll use latches to simulate an RDD with a slow getPartitions() call.
+    import MockRDDWithSlowGetPartitions._
+
+    // DAGScheduler.submitJob calls `.partitions` on the RDD passed to it.
+    // Therefore to write a proper regression test for SPARK-23626 we must
+    // ensure that the slow getPartitions() call occurs deeper in the RDD DAG:
+    val rddWithSlowGetPartitions = oneToOne(new 
MockRDDWithSlowGetPartitions(sc, 1))
+
+    // A RDD whose execution should not be blocked by the other RDD's slow 
getPartitions():
+    val simpleRdd = new MockRDD(sc, 1, Nil, Nil)
+
+    getPartitionsShouldNotHaveBeenCalledYet.set(false)
+
+    def runBackend(): Unit = {
+      val (taskDescription, _) = backend.beginTask()
+      backend.taskSuccess(taskDescription, 42)
+    }
+
+    withBackend(runBackend _) {
+      // Submit a job containing an RDD which will hang in getPartitions() 
until we release
+      // the countdown latch:
+      import scala.concurrent.ExecutionContext.Implicits.global
+      val slowJobFuture = Future { submit(rddWithSlowGetPartitions, Array(0)) 
}.flatten
+
+      // Block the current thread until the other thread has started the 
getPartitions() call:
+      beginGetPartitionsLatch.await(duration.toSeconds, SECONDS)
+
+      // Submit a concurrent job. This job's execution should not be blocked 
by the other job:
+      val fastJobFuture = submit(simpleRdd, Array(0))
+      awaitJobTermination(fastJobFuture, duration)
+
+      // The slow job should still be blocked in the getPartitions() call:
+      assert(!slowJobFuture.isCompleted)
+
+      // Allow it to complete:
+      endGetPartitionsLatch.countDown()
+      awaitJobTermination(slowJobFuture, duration)
+    }
+
+    assertDataStructuresEmpty()
+  }
+}
+
+/** Helper class used in SPARK-23626 test case */
+private object MockRDDWithSlowGetPartitions {
+  // Latch for blocking the test execution thread until getPartitions() has 
been called:
+  val beginGetPartitionsLatch = new CountDownLatch(1)
+
+  // Latch for blocking the getPartitions() call from completing:
+  val endGetPartitionsLatch = new CountDownLatch(1)
+
+  // Atomic boolean which is used to fail the test in case getPartitions() is 
called earlier
+  // than expected. This guards against false-negatives (e.g. the test passing 
because
+  // `.getPartitions()` was called in the test setup before we even submitted 
a job):
+  val getPartitionsShouldNotHaveBeenCalledYet = new AtomicBoolean(true)
+}
+
+/** Helper class used in SPARK-23626 test case */
+private class MockRDDWithSlowGetPartitions(
+    sc: SparkContext,
+    numPartitions: Int) extends MockRDD(sc, numPartitions, Nil, Nil) {
+  import MockRDDWithSlowGetPartitions._
+
+  override def getPartitions: Array[Partition] = {
+    if (getPartitionsShouldNotHaveBeenCalledYet.get()) {
+      throw new Exception("getPartitions() should not have been called at this 
point")
+    }
+    beginGetPartitionsLatch.countDown()
+    val partitions = super.getPartitions
+    endGetPartitionsLatch.await()
+    partitions
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to