[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21653


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r204199580
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
 val info = taskInfos(tid)
 val index = info.index
+// Check if any other attempt succeeded before this and this attempt 
has not been handled
+if (successful(index) && killedByOtherAttempt.contains(tid)) {
+  calculatedTasks -= 1
+
+  val resultSizeAcc = result.accumUpdates.find(a =>
+a.name == Some(InternalAccumulator.RESULT_SIZE))
+  if (resultSizeAcc.isDefined) {
+totalResultSize -= 
resultSizeAcc.get.asInstanceOf[LongAccumulator].value
--- End diff --

I agree, I dont see a better option.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-20 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r20409
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
 val info = taskInfos(tid)
 val index = info.index
+// Check if any other attempt succeeded before this and this attempt 
has not been handled
+if (successful(index) && killedByOtherAttempt.contains(tid)) {
+  calculatedTasks -= 1
--- End diff --

comment here about cleaning up things from incremented earlier while 
handling it as successful


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-20 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r204177708
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
 val info = taskInfos(tid)
 val index = info.index
+// Check if any other attempt succeeded before this and this attempt 
has not been handled
+if (successful(index) && killedByOtherAttempt.contains(tid)) {
+  calculatedTasks -= 1
+
+  val resultSizeAcc = result.accumUpdates.find(a =>
+a.name == Some(InternalAccumulator.RESULT_SIZE))
+  if (resultSizeAcc.isDefined) {
+totalResultSize -= 
resultSizeAcc.get.asInstanceOf[LongAccumulator].value
--- End diff --

the downside here is we already incremented and other tasks could have 
checked and failed before we decrement, but unless someone else has a better 
idea this is better then it is now. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r200805005
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -723,6 +723,13 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
 val info = taskInfos(tid)
 val index = info.index
+// Check if any other attempt succeeded before this and this attempt 
has not been handled
+if (successful(index) && killedByOtherAttempt(index)) {
--- End diff --

For completeness, we will also need to 'undo' the changes in 
`enqueueSuccessfulTask` : to reverse the counters in `canFetchMoreResults`.


(Orthogonal to this PR): Looking at use of `killedByOtherAttempt`, I see 
that there is a bug in `executorLost` w.r.t how it is updated - hopefully a fix 
for SPARK-24755 wont cause issues here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r199550367
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val valueSer = SparkEnv.get.serializer.newInstance()
 new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
   }
+
+  test("SPARK-13343 speculative tasks that didn't commit shouldn't be 
marked as success") {
+sc = new SparkContext("local", "test")
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val taskSet = FakeTask.createTaskSet(4)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
+val clock = new ManualClock()
+val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+  task.metrics.internalAccums
+}
+
+// Offer resources for 4 tasks to start
+for ((k, v) <- List(
+  "exec1" -> "host1",
+  "exec1" -> "host1",
+  "exec2" -> "host2",
+  "exec2" -> "host2")) {
+  val taskOption = manager.resourceOffer(k, v, NO_PREF)
+  assert(taskOption.isDefined)
+  val task = taskOption.get
+  assert(task.executorId === k)
+}
+assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+clock.advance(1)
+// Complete the 3 tasks and leave 1 task in running
+for (id <- Set(0, 1, 2)) {
+  manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+  assert(sched.endedTasks(id) === Success)
+}
+
+// checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
+// speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
+// > 0ms, so advance the clock by 1ms here.
+clock.advance(1)
+assert(manager.checkSpeculatableTasks(0))
+assert(sched.speculativeTasks.toSet === Set(3))
+
+// Offer resource to start the speculative attempt for the running task
+val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
+assert(taskOption5.isDefined)
+val task5 = taskOption5.get
+assert(task5.index === 3)
+assert(task5.taskId === 4)
+assert(task5.executorId === "exec1")
+assert(task5.attemptNumber === 1)
+sched.backend = mock(classOf[SchedulerBackend])
+
+// Complete one attempt for the running task
+manager.handleSuccessfulTask(3, createTaskResult(3, 
accumUpdatesByTask(3)))
+// Verify that it kills other running attempt
+verify(sched.backend).killTask(4, "exec1", true, "another attempt 
succeeded")
+// Complete another attempt for the running task
--- End diff --

can you expand this comment to explain why you're doing this?  without 
looking at the bug, it's easy to think this part is wrong, but in fact its the 
most important part of your test.  eg:

There is a race between the scheduler asking to kill the other task, and 
that task actually finishing.  We simulate what happens if the other task 
finishes before we kill it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r199550557
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val valueSer = SparkEnv.get.serializer.newInstance()
 new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
   }
+
+  test("SPARK-13343 speculative tasks that didn't commit shouldn't be 
marked as success") {
+sc = new SparkContext("local", "test")
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val taskSet = FakeTask.createTaskSet(4)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
+val clock = new ManualClock()
+val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+  task.metrics.internalAccums
+}
+
+// Offer resources for 4 tasks to start
+for ((k, v) <- List(
+  "exec1" -> "host1",
+  "exec1" -> "host1",
+  "exec2" -> "host2",
+  "exec2" -> "host2")) {
+  val taskOption = manager.resourceOffer(k, v, NO_PREF)
+  assert(taskOption.isDefined)
+  val task = taskOption.get
+  assert(task.executorId === k)
+}
+assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+clock.advance(1)
+// Complete the 3 tasks and leave 1 task in running
+for (id <- Set(0, 1, 2)) {
+  manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+  assert(sched.endedTasks(id) === Success)
+}
+
+// checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
+// speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
+// > 0ms, so advance the clock by 1ms here.
+clock.advance(1)
+assert(manager.checkSpeculatableTasks(0))
+assert(sched.speculativeTasks.toSet === Set(3))
+
+// Offer resource to start the speculative attempt for the running task
+val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
+assert(taskOption5.isDefined)
+val task5 = taskOption5.get
+assert(task5.index === 3)
+assert(task5.taskId === 4)
+assert(task5.executorId === "exec1")
+assert(task5.attemptNumber === 1)
+sched.backend = mock(classOf[SchedulerBackend])
+
+// Complete one attempt for the running task
+manager.handleSuccessfulTask(3, createTaskResult(3, 
accumUpdatesByTask(3)))
+// Verify that it kills other running attempt
+verify(sched.backend).killTask(4, "exec1", true, "another attempt 
succeeded")
+// Complete another attempt for the running task
+manager.handleSuccessfulTask(4, createTaskResult(3, 
accumUpdatesByTask(3)))
+
+assert(manager.taskInfos(3).successful == true)
+assert(manager.taskInfos(4).killed == true)
--- End diff --

it seems the main thing you're trying to change here is what gets passed to 
`DAGScheduler.taskEnded`, so shouldn't you be verifying that here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r199549772
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val valueSer = SparkEnv.get.serializer.newInstance()
 new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
   }
+
+  test("SPARK-13343 speculative tasks that didn't commit shouldn't be 
marked as success") {
+sc = new SparkContext("local", "test")
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val taskSet = FakeTask.createTaskSet(4)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
+val clock = new ManualClock()
+val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+  task.metrics.internalAccums
+}
+
+// Offer resources for 4 tasks to start
+for ((k, v) <- List(
+  "exec1" -> "host1",
+  "exec1" -> "host1",
+  "exec2" -> "host2",
+  "exec2" -> "host2")) {
--- End diff --

nit: double indent the contents of the `List`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-06-27 Thread hthuynh2
GitHub user hthuynh2 opened a pull request:

https://github.com/apache/spark/pull/21653

[SPARK-13343] speculative tasks that didn't commit shouldn't be marked as 
success

**Description**
Currently Speculative tasks that didn't commit can show up as success of 
failures (depending on timing of commit). This is a bit confusing because that 
task didn't really succeed in the sense it didn't write anything.
I think these tasks should be marked as KILLED or something that is more 
obvious to the user exactly what happened. it is happened to hit the timing 
where it got a commit denied exception then it shows up as failed and counts 
against your task failures. It shouldn't count against task failures since that 
failure really doesn't matter.
MapReduce handles these situation so perhaps we can look there for a model.

https://user-images.githubusercontent.com/15680678/42013170-99db48c2-7a61-11e8-8c7b-ef94c84e36ea.png;>

**How can this issue happen?**
When both attempts of a task finish before the driver sends command to kill 
one of them, both of them send the status update FINISHED to the driver. The 
driver calls TaskSchedulerImpl to handle one successful task at a time. When it 
handles the first successful task, it sends the command to kill the other copy 
of the task, however, because that task is already finished, the executor will 
ignore the command. After finishing handling the first attempt, it processes 
the second one, although all actions on the result of this task are skipped, 
this copy of the task is still marked as SUCCESS. As a result, even though this 
issue does not affect the result of the job, it might cause confusing to user 
because both of them appear to be successful.

**How does this PR fix the issue?**
The simple way to fix this issue is that when taskSetManager handles 
successful task, it checks if any other attempt succeeded. If this is the case, 
it will call handleFailedTask with state==KILLED and 
reason==TaskKilled(“another attempt succeeded”) to handle this task as 
begin killed.

**How was this patch tested?**
I tested this manually by running applications, that caused the issue 
before, a few times, and observed that the issue does not happen again. Also, I 
added a unit test in TaskSetManagerSuite to test that if we call 
handleSuccessfulTask to handle status update for 2 copies of a task, only the 
one that is handled first will be mark as SUCCESS


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hthuynh2/spark SPARK_13343

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21653


commit 8f7d98177816e11659cf79a2b28f96bd4b7173d5
Author: Hieu Huynh <“hieu.huynh@...>
Date:   2018-06-28T04:19:14Z

Fixed issue and added unit test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org