[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22001


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209662081
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -402,6 +422,19 @@ class DAGScheduler(
 }
   }
 
+  /**
+   * Check whether the barrier stage requires more slots (to be able to 
launch all tasks in the
+   * barrier stage together) than the total number of active slots 
currently. Fail current check
+   * if trying to submit a barrier stage that requires more slots than 
current total number. If
+   * the check fails consecutively for three times for a job, then fail 
current job submission.
--- End diff --

Seems I do not find the code about `"consecutively for three times"`, but 
only `maxFailureNumTasksCheck ` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209658945
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
--- End diff --

@kiszk IIUC, there's exactly only  one thread in `eventLoop`, so, the 
scenario mentioned above will not happen. And I even feel it is no need to use 
`ConcurrentHashMap` for `jobIdToNumTasksCheckFailures` at all. @jiangxb1987  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209460397
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,11 +963,38 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage.contains(
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
 =>
+logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+  "than the total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, 
Int, Int] {
+  override def apply(key: Int, value: Int): Int = value + 1
+})
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId)
+if (numCheckFailures <= maxFailureNumTasksCheck) {
+  messageScheduler.schedule(
+new Runnable {
+  override def run(): Unit = 
eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
+partitions, callSite, listener, properties))
+},
+timeIntervalNumTasksCheck * 1000,
--- End diff --

minor: how about removing `1000` and changing the time unit to `SECONDS`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209460279
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,11 +963,38 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage.contains(
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
 =>
+logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+  "than the total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, 
Int, Int] {
--- End diff --

minor: Should have an inline comment that mentions the implicit 
conversation from `null` to `0: Int` to handle new keys.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209460309
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,11 +963,38 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage.contains(
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
 =>
+logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+  "than the total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, 
Int, Int] {
+  override def apply(key: Int, value: Int): Int = value + 1
+})
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId)
--- End diff --

minor: this is the return value from `compute`. we don't need `get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209304798
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
+if (numCheckFailures < 
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) {
--- End diff --

Should make `DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES` configurable 
so users can specify unlimited retry if needed. Instead, we might want to fix 
the timeout since it is only relevant to cost.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209294774
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   super.applicationId
 }
 
+  override def maxNumConcurrentTasks(): Int = {
+// TODO support this method for MesosFineGrainedSchedulerBackend
--- End diff --

link to a JIRA


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209276818
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
--- End diff --

+1. Use atomic updates from ConcurrentHashMap. Update the counter and then 
check max failures.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209277357
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
+if (numCheckFailures < 
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) {
+  jobIdToNumTasksCheckFailures.put(jobId, numCheckFailures)
+  messageScheduler.schedule(
+new Runnable {
+  override def run(): Unit = 
eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
+partitions, callSite, listener, properties))
+},
+timeIntervalNumTasksCheck * 1000,
+TimeUnit.MILLISECONDS
+  )
+  return
+} else {
+  listener.jobFailed(e)
--- End diff --

do you expect the same job submitted again? if not, we should remove the 
key from the hashmap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209274833
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
--- End diff --

`==` -> `.contains()` in case the error message is nested


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209272468
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -577,4 +577,17 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .checkValue(v => v > 0, "The value should be a positive time value.")
   .createWithDefaultString("365d")
+
+  private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
+
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
+  .doc("Time in seconds to wait between a max concurrent tasks check 
failure and the next " +
--- End diff --

"a ... failure"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209275201
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
--- End diff --

Please include jobId, stageId, request slots, and total slots in the log 
message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209277632
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -203,6 +203,17 @@ class DAGScheduler(
 sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
   DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
 
+  /**
+   * Number of max concurrent tasks check failures for each job.
+   */
+  private[scheduler] val jobIdToNumTasksCheckFailures = new 
ConcurrentHashMap[Int, Int]
--- End diff --

How do entries in this map get cleaned?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209273451
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -577,4 +577,17 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .checkValue(v => v > 0, "The value should be a positive time value.")
   .createWithDefaultString("365d")
+
+  private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
+
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
+  .doc("Time in seconds to wait between a max concurrent tasks check 
failure and the next " +
+"check. A max concurrent tasks check ensures the cluster can 
launch more concurrent " +
+"tasks than required by a barrier stage on job submitted. The 
check can fail in case " +
+"a cluster has just started and not enough executors have 
registered, so we wait for a " +
+"little while and try to perform the check again. If the check 
fails consecutively for " +
+"three times for a job then fail current job submission. Note this 
config only applies " +
+"to jobs that contain one or more barrier stages, we won't perform 
the check on " +
+"non-barrier jobs.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("10s")
--- End diff --

Would you make the default higher like `30s`? This is to cover the case 
when applications starts immediately with a barrier while master is adding new 
executors. Let me know if this won't happen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-10 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209294652
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 assert(smaller.size === 4)
   }
 
+  test("compute max number of concurrent tasks can be launched") {
+val conf = new SparkConf()
+  .setMaster("local-cluster[4, 3, 1024]")
+  .setAppName("test")
+sc = new SparkContext(conf)
+eventually(timeout(10.seconds)) {
+  // Ensure all executors have been launched.
+  assert(sc.getExecutorIds().length == 4)
+}
+assert(sc.maxNumConcurrentTasks() == 12)
+  }
+
+  test("compute max number of concurrent tasks can be launched when 
spark.task.cpus > 1") {
+val conf = new SparkConf()
+  .set("spark.task.cpus", "2")
+  .setMaster("local-cluster[4, 3, 1024]")
+  .setAppName("test")
+sc = new SparkContext(conf)
+eventually(timeout(10.seconds)) {
+  // Ensure all executors have been launched.
+  assert(sc.getExecutorIds().length == 4)
+}
+// Each executor can only launch one task since `spark.task.cpus` is 2.
+assert(sc.maxNumConcurrentTasks() == 4)
+  }
+
+  test("compute max number of concurrent tasks can be launched when some 
executors are busy") {
+val conf = new SparkConf()
+  .set("spark.task.cpus", "2")
+  .setMaster("local-cluster[4, 3, 1024]")
+  .setAppName("test")
+sc = new SparkContext(conf)
+val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter =>
+  Thread.sleep(1000)
+  iter
+}
+var taskStarted = new AtomicBoolean(false)
+var taskEnded = new AtomicBoolean(false)
+val listener = new SparkListener() {
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+taskStarted.set(true)
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+taskEnded.set(true)
+  }
+}
+
+try {
+  sc.addSparkListener(listener)
+  eventually(timeout(10.seconds)) {
+// Ensure all executors have been launched.
+assert(sc.getExecutorIds().length == 4)
+  }
+
+  // Submit a job to trigger some tasks on active executors.
+  testSubmitJob(sc, rdd)
+
+  eventually(timeout(5.seconds)) {
--- End diff --

Maybe safer to let the task sleep longer and cancel the task one the 
conditions are met.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208950122
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
--- End diff --

Is it OK while this increment is not atomic?
In the following scenario, the value may not be correct
1. We assume `jobIdToNumTasksCheckFailures(jobId) = 1`
1. Thread A executes L963, then `numCheckFailures = 2`
1. Thread B executes L963, then `numCheckFailures = 2`
1. Thread B executes L964 and L965, then 
`jobIdToNumTasksCheckFailures(jobId)` has 2.
1. Thread A executes L964 and L965, then 
`jobIdToNumTasksCheckFailures(jobId)` has 2.

Since two threads detected failure, we expect `3`. But, it is `2`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208947201
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -203,6 +203,17 @@ class DAGScheduler(
 sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
   DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
 
+  /**
+   * Number of max concurrent tasks check failures for each job.
+   */
+  private[scheduler] val jobIdToNumTasksCheckFailures = new 
ConcurrentHashMap[Int, Int]
+
+  /**
+   * Time in seconds to wait between a max concurrent tasks check failure 
and the next check.
--- End diff --

nit: `a max` -> `max`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208946523
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -577,4 +577,17 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .checkValue(v => v > 0, "The value should be a positive time value.")
   .createWithDefaultString("365d")
+
+  private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
+
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
+  .doc("Time in seconds to wait between a max concurrent tasks check 
failure and the next " +
--- End diff --

nit: `a max` -> `max`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208945843
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends 
Logging {
 }
   }
 
+  /**
+   * Get the max number of tasks that can be concurrent launched currently.
--- End diff --

How about like this?
```
 * Get the max number of tasks that can be concurrently launched when the 
method is called.
 * Note that please don't cache the value returned by this method, because 
the number can be
 * changed due to adding/removing executors. 
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208067280
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   super.applicationId
 }
 
+  override def getNumSlots(): Int = {
+// TODO support this method for MesosFineGrainedSchedulerBackend
--- End diff --

@jiangxb1987 Could you create a JIRA and link here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208066204
  
--- Diff: 
core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala ---
@@ -185,4 +185,56 @@ class BarrierStageOnSubmittedSuite extends 
SparkFunSuite with LocalSparkContext
 testSubmitJob(sc, rdd,
   message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
   }
+
--- End diff --

We need a test verifies if total slots are good but some are running other 
jobs, we shouldn't fail the barrier job.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208067143
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -402,6 +403,18 @@ class DAGScheduler(
 }
   }
 
+  /**
+   * Check whether the barrier stage requires more slots (to be able to 
launch all tasks in the
+   * barrier stage together) than the total number of active slots 
currently. Fail fast if trying
+   * to submit a barrier stage that requires more slots than current total 
number.
+   */
+  private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
+if (rdd.isBarrier() && rdd.getNumPartitions > sc.getNumSlots) {
+  throw new SparkException(
--- End diff --

We should tolerate temporarily unavailability here by adding a wait.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208066391
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -653,6 +653,10 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   .setMaster("local-cluster[3, 1, 1024]")
   .setAppName("test-cluster")
 sc = new SparkContext(conf)
+eventually(timeout(5.seconds)) {
--- End diff --

move this wait code to barrier suite, because it is only required there


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208066484
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -653,6 +653,10 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   .setMaster("local-cluster[3, 1, 1024]")
--- End diff --

Add a unit test for `getNumSlots`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208065883
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 executorDataMap.keySet.toSeq
   }
 
+  override def getNumSlots(): Int = {
+executorDataMap.values.foldLeft(0) { (num, executor) =>
+  num + executor.totalCores / scheduler.CPUS_PER_TASK
--- End diff --

~~~scala
executorDataMap.values.map { executor =>
  executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
~~~


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r208065660
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1597,6 +1597,15 @@ class SparkContext(config: SparkConf) extends 
Logging {
 }
   }
 
+  /**
+   * Get the number of currently active slots (total number of tasks can 
be launched currently).
+   * Note that please don't cache the value returned by this method, 
because the number can change
+   * due to add/remove executors.
+   *
+   * @return The number of tasks can be launched currently.
+   */
+  private[spark] def getNumSlots(): Int = schedulerBackend.getNumSlots()
--- End diff --

How about `maxConcurrentTasks`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r207834660
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 executorDataMap.keySet.toSeq
   }
 
+  override def getNumSlots(): Int = {
+executorDataMap.values.foldLeft(0) { (num, executor) =>
+  num + executor.totalCores / scheduler.CPUS_PER_TASK
--- End diff --

As mentioned in the method description of `SchedulerBackend.getNumSlots()`:
```
   * Note that please don't cache the value returned by this method, 
because the number can change
   * due to add/remove executors.
```

It shall be fine to cache that within different stages of a job, but it 
requires a few more changes that will make the current PR more complicated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r207833047
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   super.applicationId
 }
 
+  override def getNumSlots(): Int = {
+// TODO support this method for MesosFineGrainedSchedulerBackend
--- End diff --

Only `MesosFineGrainedSchedulerBackend` shall break, we still support 
`MesosCoarseGrainedSchedulerBackend`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-05 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r207745122
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   super.applicationId
 }
 
+  override def getNumSlots(): Int = {
+// TODO support this method for MesosFineGrainedSchedulerBackend
--- End diff --

but finegrained is being deprecated...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-05 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r207745108
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   super.applicationId
 }
 
+  override def getNumSlots(): Int = {
+// TODO support this method for MesosFineGrainedSchedulerBackend
--- End diff --

so this breaks barrier execution on mesos completely? (since available slot 
is 0 it will just fail)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-05 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r207745157
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 executorDataMap.keySet.toSeq
   }
 
+  override def getNumSlots(): Int = {
+executorDataMap.values.foldLeft(0) { (num, executor) =>
+  num + executor.totalCores / scheduler.CPUS_PER_TASK
--- End diff --

should this be saved instead of re-compute on each stage?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-05 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22001

[SPARK-24819][CORE] Fail fast when no enough slots to launch the barrier 
stage on job submitted

## What changes were proposed in this pull request?

We shall check whether the barrier stage requires more slots (to be able to 
launch all tasks in the barrier stage together) than the total number of active 
slots currently, and fail fast if trying to submit a barrier stage that 
requires more slots than current total number.

This PR proposes to add a new method `getNumSlots()` to try to get the 
total number of currently active slots in `SchedulerBackend`, support of this 
new method has been added to all the first-class scheduler backends except 
`MesosFineGrainedSchedulerBackend`.

## How was this patch tested?

Added new test cases in `BarrierStageOnSubmittedSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark SPARK-24819

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22001.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22001


commit 52530052c896862748a86a1b77455f31534b6045
Author: Xingbo Jiang 
Date:   2018-08-05T15:47:05Z

Fail fast when no enough slots to launch the barrier stage on job submitted




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org