[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...

2018-07-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21656#discussion_r200804756
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -772,6 +772,12 @@ private[spark] class TaskSetManager(
   private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled) {
--- End diff --

`speculationEnabled && ! isZombie`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...

2018-07-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21656#discussion_r200759539
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -772,6 +772,12 @@ private[spark] class TaskSetManager(
   private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled) {
--- End diff --

yeah that is sort of what I was suggesting -- but I was thinking rather 
than just a flag, maybe we separate out `tasksSuccessful` into 
`tasksCompletedSuccessfully` (from this taskset) and `tasksNoLongerNecessary` 
(from any taskset), perhaps with better names.  If you just had a flag, you 
would avoid the exception from the empty heap, but you still might decide to 
enable speculation prematurely as you really haven't finished enough for 
`SPECULATION_QUANTILE`: 
https://github.com/apache/spark/blob/a381bce7285ec30f58f28f523dfcfe0c13221bbf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L987


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...

2018-07-05 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21656#discussion_r200366359
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -772,6 +772,12 @@ private[spark] class TaskSetManager(
   private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled) {
--- End diff --

IIUC in this case no task in this taskSet actually successfully finishes, 
it's another task attempt from another taskSet for the same stage that 
succeeded. In stead of changing this code path, I'd suggest we have another 
flag to show whether any task succeeded in current taskSet, and if no task have 
succeeded, skip L987.

WDYT @squito ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...

2018-07-04 Thread cxzl25
Github user cxzl25 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21656#discussion_r200236204
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -772,6 +772,12 @@ private[spark] class TaskSetManager(
   private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled) {
+  taskAttempts(index).headOption.map { info =>
+info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
+successfulTaskDurations.insert(info.duration)
--- End diff --

TaskSetManager#handleSuccessfulTask update successful task durations, and 
write to successfulTaskDurations.

When there are multiple tasksets for this stage, 
markPartitionCompletedInAllTaskSets is
accumulate the value of tasksSuccessful.

In this case, when checkSpeculatableTasks is called, the value of 
tasksSuccessful matches the condition, but successfulTaskDurations is empty.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L723
```scala
  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
  successfulTaskDurations.insert(info.duration)
}
   // ...
   // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
   // was completed.  This may result in some of the tasksets getting 
completed.
sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
```

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L987
```scala
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
//...
  if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
  val time = clock.getTimeMillis()
  val medianDuration = successfulTaskDurations.median
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...

2018-07-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21656#discussion_r200231460
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -772,6 +772,12 @@ private[spark] class TaskSetManager(
   private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled) {
+  taskAttempts(index).headOption.map { info =>
+info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
+successfulTaskDurations.insert(info.duration)
--- End diff --

what's the normal code path to update task durations?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...

2018-06-28 Thread cxzl25
GitHub user cxzl25 opened a pull request:

https://github.com/apache/spark/pull/21656

[SPARK-24677][Core]MedianHeap is empty when speculation is enabled, causing 
the SparkContext to stop

## What changes were proposed in this pull request?
When speculation is enabled, 
TaskSetManager#markPartitionCompleted should write successful task duration 
to MedianHeap, 
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful 
non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median 
java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
manual tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cxzl25/spark fix_MedianHeap_empty

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21656.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21656


commit 467f0bccb7d1940bed4f1b2e633c9374b0e654f2
Author: sychen 
Date:   2018-06-28T07:34:38Z

MedianHeap is empty when speculation is enabled, causing the SparkContext 
to stop.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org