[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23038 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23038#discussion_r236426530 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -565,7 +571,16 @@ private[spark] class AppStatusListener( if (metricsDelta != null) { esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - conditionalLiveUpdate(esummary, now, removeStage) + + val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0 + + // If the last task of the executor finished, then update the esummary + // for both live and history events. + if (isLastTask) { + update(esummary, now) --- End diff -- indentation is off --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23038#discussion_r234741288 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala --- @@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(allJobs.head.numFailedStages == 1) } + test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { +val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + +val listener = new AppStatusListener(store, testConf, true) + +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(4, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), null)) +time += 1 +tasks(1).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), null)) + +stage.failureReason = Some("Failed") +listener.onStageCompleted(SparkListenerStageCompleted(stage)) +time += 1 +listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor" + +time += 1 +tasks(2).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) +time += 1 +tasks(3).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) + +val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) +esummary.foreach { execSummary => + assert(execSummary.failedTasks == 2) --- End diff -- Thanks @gengliangwang , I updated the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23038: [SPARK-25451][SPARK-26100][CORE]Aggregated metric...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/23038#discussion_r234728748 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala --- @@ -1275,6 +1275,49 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(allJobs.head.numFailedStages == 1) } + test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { +val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + +val listener = new AppStatusListener(store, testConf, true) + +val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") +listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) +listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + +val tasks = createTasks(4, Array("1", "2")) +tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) +} + +time += 1 +tasks(0).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), null)) +time += 1 +tasks(1).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), null)) + +stage.failureReason = Some("Failed") +listener.onStageCompleted(SparkListenerStageCompleted(stage)) +time += 1 +listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor" + +time += 1 +tasks(2).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) +time += 1 +tasks(3).markFinished(TaskState.FAILED, time) +listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) + +val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) +esummary.foreach { execSummary => + assert(execSummary.failedTasks == 2) --- End diff -- Nit: also check `succeededTasks` and `killedTasks` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org