Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/18393#discussion_r125676544
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1133,33 +1152,27 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
- // Reconstruct task metrics. Note: this may be null if the task has
failed.
- val taskMetrics: TaskMetrics =
- if (event.accumUpdates.nonEmpty) {
- try {
- TaskMetrics.fromAccumulators(event.accumUpdates)
- } catch {
- case NonFatal(e) =>
- logError(s"Error when attempting to reconstruct metrics for
task $taskId", e)
- null
- }
- } else {
- null
- }
-
- // The stage may have already finished when we get this event -- eg.
maybe it was a
- // speculative task. It is important that we send the TaskEnd event in
any case, so listeners
- // are properly notified and can chose to handle it. For instance,
some listeners are
- // doing their own accounting and if they don't get the task end event
they think
- // tasks are still running when they really aren't.
- listenerBus.post(SparkListenerTaskEnd(
- stageId, task.stageAttemptId, taskType, event.reason,
event.taskInfo, taskMetrics))
-
if (!stageIdToStage.contains(task.stageId)) {
+ // The stage may have already finished when we get this event -- eg.
maybe it was a
+ // speculative task. It is important that we send the TaskEnd event
in any case, so listeners
+ // are properly notified and can chose to handle it. For instance,
some listeners are
+ // doing their own accounting and if they don't get the task end
event they think
+ // tasks are still running when they really aren't.
+ postTaskEnd(event)
+
// Skip all the actions if the stage has been cancelled.
return
}
+ // Make sure the task's accumulators are updated before any other
processing happens, so that
+ // we can post a task end event before any jobs or stages are updated.
The accumulators are
+ // only updated in certain cases.
+ event.reason match {
+ case Success | _: ExceptionFailure => updateAccumulators(event)
--- End diff --
Does this means the `resultStage.activeJob` should not be empty by this
time?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]