[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-09 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044581470


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -674,22 +674,30 @@ private[spark] class AppStatusListener(
   delta
 }.orNull
 
-val (completedDelta, failedDelta, killedDelta) = event.reason match {
+// SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, 
which is raised by
+// executor lost, it can lead to negative `LiveStage.activeTasks` since 
there's no
+// corresponding `SparkListenerTaskStart` event for each of them. The 
negative activeTasks
+// will make the stage always remains in the live stage list as it can 
never meet the
+// condition activeTasks == 0. This in turn causes the dead executor to 
never be retained
+// if that live stage's submissionTime is less than the dead executor's 
removeTime.
+val (completedDelta, failedDelta, killedDelta, activeTasksDelta) = 
event.reason match {

Review Comment:
   nit:
   ```suggestion
   val (completedDelta, failedDelta, killedDelta, activeDelta) = 
event.reason match {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-08 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044071110


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"SPARK-41187: Stage should be removed from liveStages to avoid 
deadExecutors accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted
+time += 1
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Resubmitted, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, running task will be failed and rerun
+time += 1
+tasks(1).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new 
ExecutorMetrics,
+  null))
+
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+time += 1
+tasks(1).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))

Review Comment:
   Shouldn't be `tasks(1)` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-08 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044064944


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"SPARK-41187: Stage should be removed from liveStages to avoid 
deadExecutors accumulated") {

Review Comment:
   ```suggestion
 test("SPARK-41187: Stage should be removed from liveStages to avoid 
deadExecutors accumulated") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-08 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044064601


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -689,7 +689,15 @@ private[spark] class AppStatusListener(
   if (metricsDelta != null) {
 stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, 
metricsDelta)
   }
-  stage.activeTasks -= 1
+  // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, 
which is raised by
+  // executor lost, it can lead to negative `LiveStage.activeTasks` since 
there's no
+  // corresponding `SparkListenerTaskStart` event for each of them. The 
negative activeTasks
+  // will make the stage always remains in the live stage list as it can 
never meet the
+  // condition activeTasks == 0. This in turn causes the dead executor to 
never be retained
+  // if that live stage's submissionTime is less than the dead executor's 
removeTime.
+  if (event.reason != Resubmitted) {

Review Comment:
   Can we extend xxxDelta above to include`Resubmitted` reason (e.g., 
activeTaskDelta) so we don't have to check `event.reason` every time below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-05 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1040532427


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > Can the change not simply be limited to...
   
   Ok..if our target is to only be able to remove the dead executors, I think 
this's good enough. But if we also want to correct the metrics, it's not enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-05 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1040350789


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors
+if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   @wineternity  Thanks for the detailed analysis. Seems like the only metric 
that the `Resubmmitted` task end event could affect is the failed/completed 
tasks counters. And other metrics like executor summary, and shuffle read/write 
metrics are duplicated. So I tend to agree to recognize the `Resubmmitted` task 
end event as a signal rather than a real task end event. 
   
   @mridulm WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-30 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035883778


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,15 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?

Review Comment:
   Yea. I checked. Null taskInfo is not possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035472239


##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,15 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, 
which is raised by executor lost,
+// it can lead to negative `LiveStage.activeTasks` since there's no 
corresponding `SparkListenerTaskStart`
+// event for each of them. The negative activeTasks will make the stage 
always remains in the live stage list
+// as it can never meet the condition activeTasks == 0. This in turn 
causes the dead executor to never be
+// retained if that live stage's submissionTime is less than the dead 
executor's removeTime( see

Review Comment:
   nit:
   ```suggestion
   // cleaned up if that live stage's submissionTime is less than the dead 
executor's removeTime( see
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-11-29 Thread GitBox


Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034655159


##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {
+
+val listener = new AppStatusListener(store, conf, true)
+
+listener.onExecutorAdded(createExecutorAddedEvent(1))
+listener.onExecutorAdded(createExecutorAddedEvent(2))
+val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+  resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+time += 1
+stage.submissionTime = Some(time)
+listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
+
+val tasks = createTasks(2, Array("1", "2"))
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, success task will be resubmitted
+time += 1
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Resubmitted, tasks(0), new ExecutorMetrics, null))
+
+// executor lost, running task will be failed and rerun
+time += 1
+tasks(1).markFinished(TaskState.FAILED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new 
ExecutorMetrics,
+  null))
+
+tasks.foreach { task =>
+  listener.onTaskStart(SparkListenerTaskStart(stage.stageId, 
stage.attemptNumber, task))
+}
+
+time += 1
+tasks(0).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+time += 1
+tasks(1).markFinished(TaskState.FINISHED, time)
+listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, 
stage.attemptNumber, "taskType",
+  Success, tasks(0), new ExecutorMetrics, null))
+
+listener.onStageCompleted(SparkListenerStageCompleted(stage))
+time += 1
+listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded ))
+
+time += 1
+listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test"))
+time += 1
+listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test"))
+
+assert( listener.deadExecutors.size === 0 )

Review Comment:
   ```suggestion
   assert(listener.deadExecutors.size === 0)
   ```



##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends 
SparkFunSuite with BeforeAndAfter
 checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors 
accumulated") {

Review Comment:
   ```suggestion
 test(s"SPARK-41187: Stage should be removed from liveStages to avoid 
deadExecutors accumulated") {
   ```



##
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-// TODO: can this really happen?
-if (event.taskInfo == null) {
+// TODO: can taskInfo null really happen?
+// For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd 
is useless and
+// will make activeTask in stage to be negative, this will cause stage not 
be removed in
+// liveStages, and finally cause executor not removed in deadExecutors

Review Comment:
   ```suggestion
   // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, 
which is raised by executor lost,
   // it can lead to negative `LiveStage.activeTasks` since there's no 
corresponding `SparkListenerTaskStart`
   // event for each of them. The negative activeTasks will make the stage 
always remains in the live stage list
   // as it can never meet the condition activeTasks == 0. This in turn 
causes the dead executor to never be
   // retained if that live stage's submissionTime is less than the dead 
executor's removeTime( see
   // isExecutorActiveForLiveStages). Since this kind of 
`SparkListenerTaskEnd` is useless here, we simply
   // ignore it.
   ```



##
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##
@@ -1849,6 +1849,68 @@ ab