Repository: spark
Updated Branches:
  refs/heads/master c32dbd6bd -> 92b48842b


[SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic 
resource allocation enabled

## What changes were proposed in this pull request?

We don't support run a barrier stage with dynamic resource allocation enabled, 
it shall lead to some confusing behaviors (eg. with dynamic resource allocation 
enabled, it may happen that we acquire some executors (but not enough to launch 
all the tasks in a barrier stage) and later release them due to executor idle 
time expire, and then acquire again).

We perform the check on job submit and fail fast if running a barrier stage 
with dynamic resource allocation enabled.

## How was this patch tested?

Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast 
cases that submitted a job containing one or more barrier stages.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #21915 from jiangxb1987/SPARK-24954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b48842
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b48842
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b48842

Branch: refs/heads/master
Commit: 92b48842b944a3e430472294cdc3c481bad6b804
Parents: c32dbd6
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Fri Aug 3 09:36:56 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri Aug 3 09:36:56 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 25 +++++++++
 .../spark/BarrierStageOnSubmittedSuite.scala    | 57 ++++++++++++++++----
 2 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3dd0718..cf1fcbc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -364,6 +364,7 @@ class DAGScheduler(
    */
   def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: 
Int): ShuffleMapStage = {
     val rdd = shuffleDep.rdd
+    checkBarrierStageWithDynamicAllocation(rdd)
     checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
     val numTasks = rdd.partitions.length
     val parents = getOrCreateParentStages(rdd, jobId)
@@ -385,6 +386,23 @@ class DAGScheduler(
   }
 
   /**
+   * We don't support run a barrier stage with dynamic resource allocation 
enabled, it shall lead
+   * to some confusing behaviors (eg. with dynamic resource allocation 
enabled, it may happen that
+   * we acquire some executors (but not enough to launch all the tasks in a 
barrier stage) and
+   * later release them due to executor idle time expire, and then acquire 
again).
+   *
+   * We perform the check on job submit and fail fast if running a barrier 
stage with dynamic
+   * resource allocation enabled.
+   *
+   * TODO SPARK-24942 Improve cluster resource management with jobs containing 
barrier stage
+   */
+  private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
+    if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
+      throw new 
SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+    }
+  }
+
+  /**
    * Create a ResultStage associated with the provided jobId.
    */
   private def createResultStage(
@@ -393,6 +411,7 @@ class DAGScheduler(
       partitions: Array[Int],
       jobId: Int,
       callSite: CallSite): ResultStage = {
+    checkBarrierStageWithDynamicAllocation(rdd)
     checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
@@ -2001,4 +2020,10 @@ private[spark] object DAGScheduler {
       "PartitionPruningRDD). A workaround for first()/take() can be 
barrierRdd.collect().head " +
       "(scala) or barrierRdd.collect()[0] (python).\n" +
       "2. An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2))."
+
+  // Error message when running a barrier stage with dynamic resource 
allocation enabled.
+  val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
+    "[SPARK-24942]: Barrier execution mode does not support dynamic resource 
allocation for " +
+      "now. You can disable dynamic resource allocation by setting Spark conf 
" +
+      "\"spark.dynamicAllocation.enabled\" to \"false\"."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala 
b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index f2b3884..75e13a9 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.scalatest.BeforeAndAfterEach
-
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
 import org.apache.spark.scheduler.DAGScheduler
 import org.apache.spark.util.ThreadUtils
@@ -30,16 +28,13 @@ import org.apache.spark.util.ThreadUtils
  * This test suite covers all the cases that shall fail fast on job submitted 
that contains one
  * of more barrier stages.
  */
-class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
BeforeAndAfterEach
-    with LocalSparkContext {
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
+class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
LocalSparkContext {
 
-    val conf = new SparkConf()
-      .setMaster("local[4]")
-      .setAppName("test")
-    sc = new SparkContext(conf)
+  private def createSparkContext(conf: Option[SparkConf] = None): SparkContext 
= {
+    new SparkContext(conf.getOrElse(
+      new SparkConf()
+        .setMaster("local[4]")
+        .setAppName("test")))
   }
 
   private def testSubmitJob(
@@ -62,6 +57,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
BeforeAndAfterEach
   }
 
   test("submit a barrier ResultStage that contains PartitionPruningRDD") {
+    sc = createSparkContext()
     val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index 
=> index > 1)
     val rdd = prunedRdd
       .barrier()
@@ -71,6 +67,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
BeforeAndAfterEach
   }
 
   test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
+    sc = createSparkContext()
     val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index 
=> index > 1)
     val rdd = prunedRdd
       .barrier()
@@ -82,6 +79,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
BeforeAndAfterEach
   }
 
   test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
+    sc = createSparkContext()
     val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index 
=> index > 1)
     val rdd = prunedRdd
       .repartition(2)
@@ -93,6 +91,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
BeforeAndAfterEach
   }
 
   test("submit a barrier stage with partial partitions") {
+    sc = createSparkContext()
     val rdd = sc.parallelize(1 to 10, 4)
       .barrier()
       .mapPartitions((iter, context) => iter)
@@ -101,6 +100,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with BeforeAndAfterEach
   }
 
   test("submit a barrier stage with union()") {
+    sc = createSparkContext()
     val rdd1 = sc.parallelize(1 to 10, 2)
       .barrier()
       .mapPartitions((iter, context) => iter)
@@ -114,6 +114,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with BeforeAndAfterEach
   }
 
   test("submit a barrier stage with coalesce()") {
+    sc = createSparkContext()
     val rdd = sc.parallelize(1 to 10, 4)
       .barrier()
       .mapPartitions((iter, context) => iter)
@@ -125,6 +126,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with BeforeAndAfterEach
   }
 
   test("submit a barrier stage that contains an RDD that depends on multiple 
barrier RDDs") {
+    sc = createSparkContext()
     val rdd1 = sc.parallelize(1 to 10, 4)
       .barrier()
       .mapPartitions((iter, context) => iter)
@@ -139,6 +141,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with BeforeAndAfterEach
   }
 
   test("submit a barrier stage with zip()") {
+    sc = createSparkContext()
     val rdd1 = sc.parallelize(1 to 10, 4)
       .barrier()
       .mapPartitions((iter, context) => iter)
@@ -150,4 +153,36 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with BeforeAndAfterEach
     val result = rdd3.collect().sorted
     assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30))
   }
+
+  test("submit a barrier ResultStage with dynamic resource allocation 
enabled") {
+    val conf = new SparkConf()
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.testing", "true")
+      .setMaster("local[4]")
+      .setAppName("test")
+    sc = createSparkContext(Some(conf))
+
+    val rdd = sc.parallelize(1 to 10, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    testSubmitJob(sc, rdd,
+      message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+  }
+
+  test("submit a barrier ShuffleMapStage with dynamic resource allocation 
enabled") {
+    val conf = new SparkConf()
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.testing", "true")
+      .setMaster("local[4]")
+      .setAppName("test")
+    sc = createSparkContext(Some(conf))
+
+    val rdd = sc.parallelize(1 to 10, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+      .repartition(2)
+      .map(x => x + 1)
+    testSubmitJob(sc, rdd,
+      message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to