[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...

2018-11-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23038


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23038#discussion_r236426530
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -565,7 +571,16 @@ private[spark] class AppStatusListener(
   if (metricsDelta != null) {
 esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, 
metricsDelta)
   }
-  conditionalLiveUpdate(esummary, now, removeStage)
+
+  val isLastTask = 
stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0
+
+  // If the last task of the executor finished, then update the 
esummary
+  // for both live and history events.
+  if (isLastTask) {
+ update(esummary, now)
--- End diff --

indentation is off


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...

2018-11-19 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23038#discussion_r234741288
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
 assert(allJobs.head.numFailedStages == 1)
   }
 
+  test("SPARK-25451: total tasks in the executor summary should match 
total stage tasks") {
+val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+
+val listener = new AppStatusListener(store, testConf, true)
+
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(4, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), 
null))
+time += 1
+tasks(1).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), 
null))
+
+stage.failureReason = Some("Failed")
+listener.onStageCompleted(SparkListenerStageCompleted(stage))
+time += 1
+listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new 
RuntimeException("Bad Executor"
+
+time += 1
+tasks(2).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), 
null))
+time += 1
+tasks(3).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), 
null))
+
+val esummary = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+esummary.foreach { execSummary =>
+  assert(execSummary.failedTasks == 2)
--- End diff --

Thanks @gengliangwang , I updated the test. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...

2018-11-19 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/23038#discussion_r234728748
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
 assert(allJobs.head.numFailedStages == 1)
   }
 
+  test("SPARK-25451: total tasks in the executor summary should match 
total stage tasks") {
+val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+
+val listener = new AppStatusListener(store, testConf, true)
+
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(4, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), 
null))
+time += 1
+tasks(1).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), 
null))
+
+stage.failureReason = Some("Failed")
+listener.onStageCompleted(SparkListenerStageCompleted(stage))
+time += 1
+listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new 
RuntimeException("Bad Executor"
+
+time += 1
+tasks(2).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), 
null))
+time += 1
+tasks(3).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptId, "taskType",
+  ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), 
null))
+
+val esummary = 
store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+esummary.foreach { execSummary =>
+  assert(execSummary.failedTasks == 2)
--- End diff --

Nit: also check `succeededTasks` and `killedTasks`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org