[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21656#discussion_r200804756 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -772,6 +772,12 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled) { --- End diff -- `speculationEnabled && ! isZombie` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21656#discussion_r200759539 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -772,6 +772,12 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled) { --- End diff -- yeah that is sort of what I was suggesting -- but I was thinking rather than just a flag, maybe we separate out `tasksSuccessful` into `tasksCompletedSuccessfully` (from this taskset) and `tasksNoLongerNecessary` (from any taskset), perhaps with better names. If you just had a flag, you would avoid the exception from the empty heap, but you still might decide to enable speculation prematurely as you really haven't finished enough for `SPECULATION_QUANTILE`: https://github.com/apache/spark/blob/a381bce7285ec30f58f28f523dfcfe0c13221bbf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21656#discussion_r200366359 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -772,6 +772,12 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled) { --- End diff -- IIUC in this case no task in this taskSet actually successfully finishes, it's another task attempt from another taskSet for the same stage that succeeded. In stead of changing this code path, I'd suggest we have another flag to show whether any task succeeded in current taskSet, and if no task have succeeded, skip L987. WDYT @squito ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...
Github user cxzl25 commented on a diff in the pull request: https://github.com/apache/spark/pull/21656#discussion_r200236204 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -772,6 +772,12 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled) { + taskAttempts(index).headOption.map { info => +info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) +successfulTaskDurations.insert(info.duration) --- End diff -- TaskSetManager#handleSuccessfulTask update successful task durations, and write to successfulTaskDurations. When there are multiple tasksets for this stage, markPartitionCompletedInAllTaskSets is accumulate the value of tasksSuccessful. In this case, when checkSpeculatableTasks is called, the value of tasksSuccessful matches the condition, but successfulTaskDurations is empty. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L723 ```scala def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) if (speculationEnabled) { successfulTaskDurations.insert(info.duration) } // ... // There may be multiple tasksets for this stage -- we let all of them know that the partition // was completed. This may result in some of the tasksets getting completed. sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) ``` https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L987 ```scala override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { //... if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() val medianDuration = successfulTaskDurations.median ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21656#discussion_r200231460 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -772,6 +772,12 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled) { + taskAttempts(index).headOption.map { info => +info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) +successfulTaskDurations.insert(info.duration) --- End diff -- what's the normal code path to update task durations? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21656: [SPARK-24677][Core]MedianHeap is empty when specu...
GitHub user cxzl25 opened a pull request: https://github.com/apache/spark/pull/21656 [SPARK-24677][Core]MedianHeap is empty when speculation is enabled, causing the SparkContext to stop ## What changes were proposed in this pull request? When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cxzl25/spark fix_MedianHeap_empty Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21656.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21656 commit 467f0bccb7d1940bed4f1b2e633c9374b0e654f2 Author: sychen Date: 2018-06-28T07:34:38Z MedianHeap is empty when speculation is enabled, causing the SparkContext to stop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org